Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
f050e58176 | |||
b6e7fa0311 | |||
20e1ff64fa |
@ -11,10 +11,9 @@ message NodePubkey {
|
|||||||
message RegisterNodeReq {
|
message RegisterNodeReq {
|
||||||
string node_pubkey = 1;
|
string node_pubkey = 1;
|
||||||
string owner_pubkey = 2;
|
string owner_pubkey = 2;
|
||||||
string ip = 3;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message NodeRes {
|
message NodeResourceReq {
|
||||||
string node_pubkey = 1;
|
string node_pubkey = 1;
|
||||||
uint32 avail_ports = 2;
|
uint32 avail_ports = 2;
|
||||||
uint32 avail_ipv4 = 3;
|
uint32 avail_ipv4 = 3;
|
||||||
@ -44,7 +43,6 @@ message NewVMReq {
|
|||||||
|
|
||||||
message UpdateVMReq {
|
message UpdateVMReq {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
string node_pubkey = 2;
|
|
||||||
uint32 disk_size_gb = 3;
|
uint32 disk_size_gb = 3;
|
||||||
uint32 vcpus = 4;
|
uint32 vcpus = 4;
|
||||||
uint32 memory_mb = 5;
|
uint32 memory_mb = 5;
|
||||||
@ -81,12 +79,21 @@ message ListVMContractsReq {
|
|||||||
string node_pubkey = 2;
|
string node_pubkey = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message NewVmRespIP {
|
||||||
|
uint32 nic_index = 1;
|
||||||
|
string address = 2;
|
||||||
|
string mask = 3;
|
||||||
|
string gateway = 4;
|
||||||
|
}
|
||||||
|
|
||||||
message NewVMResp {
|
message NewVMResp {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
repeated uint32 exposed_ports = 2;
|
repeated uint32 exposed_ports = 2;
|
||||||
string public_ipv4 = 3;
|
string ovmf_hash = 5;
|
||||||
string public_ipv6 = 4;
|
// This is needed to allow the CLI to build the kernel params from known data.
|
||||||
string error = 5;
|
// The CLI will use the kernel params to get the measurement.
|
||||||
|
repeated NewVmRespIP ips = 6;
|
||||||
|
string error = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
message DeleteVMReq {
|
message DeleteVMReq {
|
||||||
@ -95,7 +102,7 @@ message DeleteVMReq {
|
|||||||
|
|
||||||
service BrainDaemonService {
|
service BrainDaemonService {
|
||||||
rpc RegisterNode (RegisterNodeReq) returns (Empty);
|
rpc RegisterNode (RegisterNodeReq) returns (Empty);
|
||||||
rpc NodeResUpdate (stream NodeRes) returns (Empty);
|
rpc SendNodeResources (stream NodeResourceReq) returns (Empty);
|
||||||
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
|
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
|
||||||
rpc SendNewVMResp (stream NewVMResp) returns (Empty);
|
rpc SendNewVMResp (stream NewVMResp) returns (Empty);
|
||||||
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
|
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
|
||||||
@ -117,10 +124,11 @@ message NodeFilters {
|
|||||||
message NodeListResp {
|
message NodeListResp {
|
||||||
string node_pubkey = 1;
|
string node_pubkey = 1;
|
||||||
string country = 2;
|
string country = 2;
|
||||||
string city = 3;
|
string region = 3;
|
||||||
string ip = 4; // required for latency test
|
string city = 4;
|
||||||
uint32 server_rating = 5;
|
string ip = 5; // required for latency test
|
||||||
uint32 provider_rating = 6;
|
uint32 server_rating = 6;
|
||||||
|
uint32 provider_rating = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
service BrainCliService {
|
service BrainCliService {
|
||||||
|
@ -5,8 +5,8 @@ pub mod brain {
|
|||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use brain::{
|
use brain::{
|
||||||
brain_cli_service_client::BrainCliServiceClient, DeleteVmReq, ListVmContractsReq,
|
brain_cli_service_client::BrainCliServiceClient, DeleteVmReq, ListVmContractsReq, NewVmReq,
|
||||||
NewVmReq, NodeFilters, NodeListResp, VmContract, UpdateVmReq,
|
NodeFilters, NodeListResp, UpdateVmReq, VmContract,
|
||||||
};
|
};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use log::{debug, info, warn};
|
use log::{debug, info, warn};
|
||||||
@ -37,8 +37,8 @@ async fn get_node_list(mut client: BrainCliServiceClient<Channel>) -> Result<Vec
|
|||||||
let mut grpc_stream = client
|
let mut grpc_stream = client
|
||||||
.list_nodes(NodeFilters {
|
.list_nodes(NodeFilters {
|
||||||
free_ports: 0,
|
free_ports: 0,
|
||||||
offers_ipv4: true,
|
offers_ipv4: false,
|
||||||
offers_ipv6: true,
|
offers_ipv6: false,
|
||||||
vcpus: 0,
|
vcpus: 0,
|
||||||
memory_mb: 0,
|
memory_mb: 0,
|
||||||
storage_gb: 0,
|
storage_gb: 0,
|
||||||
@ -84,12 +84,12 @@ async fn submit_vm_request(
|
|||||||
info!("Creating VM {req:?}");
|
info!("Creating VM {req:?}");
|
||||||
let result = client.create_vm_contract(req).await;
|
let result = client.create_vm_contract(req).await;
|
||||||
match result {
|
match result {
|
||||||
Ok(confirmation) => {
|
Ok(resp) => {
|
||||||
let confirmation = confirmation.into_inner();
|
let resp = resp.into_inner();
|
||||||
if confirmation.error.is_empty() {
|
if resp.error.is_empty() {
|
||||||
info!("Got VM confirmation: {confirmation:?}");
|
info!("Got NewVMResp: {resp:?}");
|
||||||
} else {
|
} else {
|
||||||
warn!("Got VM confirmation error: {}", confirmation.error);
|
warn!("Got new VM error: {}", resp.error);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
Err(e) => log::error!("Could not create vm: {e:?}"),
|
Err(e) => log::error!("Could not create vm: {e:?}"),
|
||||||
@ -137,12 +137,10 @@ async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Re
|
|||||||
|
|
||||||
async fn update_vm_request(
|
async fn update_vm_request(
|
||||||
mut client: BrainCliServiceClient<Channel>,
|
mut client: BrainCliServiceClient<Channel>,
|
||||||
node_pubkey: &str,
|
|
||||||
uuid: &str,
|
uuid: &str,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let req = UpdateVmReq {
|
let req = UpdateVmReq {
|
||||||
uuid: uuid.to_string(),
|
uuid: uuid.to_string(),
|
||||||
node_pubkey: node_pubkey.to_string(),
|
|
||||||
vcpus: 4,
|
vcpus: 4,
|
||||||
memory_mb: 4096,
|
memory_mb: 4096,
|
||||||
disk_size_gb: 40,
|
disk_size_gb: 40,
|
||||||
@ -154,12 +152,12 @@ async fn update_vm_request(
|
|||||||
info!("Updating VM {req:?}");
|
info!("Updating VM {req:?}");
|
||||||
let result = client.update_vm(req).await;
|
let result = client.update_vm(req).await;
|
||||||
match result {
|
match result {
|
||||||
Ok(confirmation) => {
|
Ok(resp) => {
|
||||||
let confirmation = confirmation.into_inner();
|
let resp = resp.into_inner();
|
||||||
if confirmation.error.is_empty() {
|
if resp.error.is_empty() {
|
||||||
info!("Got VM update confirmation: {confirmation:?}");
|
info!("Got VM update response: {resp:?}");
|
||||||
} else {
|
} else {
|
||||||
warn!("Got VM update confirmation error: {}", confirmation.error);
|
warn!("Got VM update error: {}", resp.error);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
Err(e) => log::error!("Could not update vm: {e:?}"),
|
Err(e) => log::error!("Could not update vm: {e:?}"),
|
||||||
@ -186,16 +184,13 @@ async fn main() -> Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let nodes = get_node_list(client.clone()).await?;
|
let contracts = list_contracts(client.clone()).await?;
|
||||||
for node in nodes {
|
for contract in contracts {
|
||||||
let contracts = list_contracts(client.clone()).await?;
|
if let Err(e) = update_vm_request(client.clone(), &contract.uuid).await {
|
||||||
for contract in contracts {
|
log::error!(
|
||||||
if let Err(e) = update_vm_request(client.clone(), &node.node_pubkey, &contract.uuid).await {
|
"Received error when updating VM on node {}: {e:?}",
|
||||||
log::error!(
|
&contract.node_pubkey
|
||||||
"Received error when updating VM on node {}: {e:?}",
|
);
|
||||||
&node.node_pubkey
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,10 +11,9 @@ message NodePubkey {
|
|||||||
message RegisterNodeReq {
|
message RegisterNodeReq {
|
||||||
string node_pubkey = 1;
|
string node_pubkey = 1;
|
||||||
string owner_pubkey = 2;
|
string owner_pubkey = 2;
|
||||||
string ip = 3;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message NodeRes {
|
message NodeResourceReq {
|
||||||
string node_pubkey = 1;
|
string node_pubkey = 1;
|
||||||
uint32 avail_ports = 2;
|
uint32 avail_ports = 2;
|
||||||
uint32 avail_ipv4 = 3;
|
uint32 avail_ipv4 = 3;
|
||||||
@ -44,7 +43,6 @@ message NewVMReq {
|
|||||||
|
|
||||||
message UpdateVMReq {
|
message UpdateVMReq {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
string node_pubkey = 2;
|
|
||||||
uint32 disk_size_gb = 3;
|
uint32 disk_size_gb = 3;
|
||||||
uint32 vcpus = 4;
|
uint32 vcpus = 4;
|
||||||
uint32 memory_mb = 5;
|
uint32 memory_mb = 5;
|
||||||
@ -81,12 +79,21 @@ message ListVMContractsReq {
|
|||||||
string node_pubkey = 2;
|
string node_pubkey = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message NewVmRespIP {
|
||||||
|
uint32 nic_index = 1;
|
||||||
|
string address = 2;
|
||||||
|
string mask = 3;
|
||||||
|
string gateway = 4;
|
||||||
|
}
|
||||||
|
|
||||||
message NewVMResp {
|
message NewVMResp {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
repeated uint32 exposed_ports = 2;
|
repeated uint32 exposed_ports = 2;
|
||||||
string public_ipv4 = 3;
|
string ovmf_hash = 5;
|
||||||
string public_ipv6 = 4;
|
// This is needed to allow the CLI to build the kernel params from known data.
|
||||||
string error = 5;
|
// The CLI will use the kernel params to get the measurement.
|
||||||
|
repeated NewVmRespIP ips = 6;
|
||||||
|
string error = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
message DeleteVMReq {
|
message DeleteVMReq {
|
||||||
@ -95,7 +102,7 @@ message DeleteVMReq {
|
|||||||
|
|
||||||
service BrainDaemonService {
|
service BrainDaemonService {
|
||||||
rpc RegisterNode (RegisterNodeReq) returns (Empty);
|
rpc RegisterNode (RegisterNodeReq) returns (Empty);
|
||||||
rpc NodeResUpdate (stream NodeRes) returns (Empty);
|
rpc SendNodeResources (stream NodeResourceReq) returns (Empty);
|
||||||
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
|
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
|
||||||
rpc SendNewVMResp (stream NewVMResp) returns (Empty);
|
rpc SendNewVMResp (stream NewVMResp) returns (Empty);
|
||||||
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
|
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
|
||||||
@ -117,10 +124,11 @@ message NodeFilters {
|
|||||||
message NodeListResp {
|
message NodeListResp {
|
||||||
string node_pubkey = 1;
|
string node_pubkey = 1;
|
||||||
string country = 2;
|
string country = 2;
|
||||||
string city = 3;
|
string region = 3;
|
||||||
string ip = 4; // required for latency test
|
string city = 4;
|
||||||
uint32 server_rating = 5;
|
string ip = 5; // required for latency test
|
||||||
uint32 provider_rating = 6;
|
uint32 server_rating = 6;
|
||||||
|
uint32 provider_rating = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
service BrainCliService {
|
service BrainCliService {
|
||||||
@ -130,4 +138,3 @@ service BrainCliService {
|
|||||||
rpc DeleteVM (DeleteVMReq) returns (Empty);
|
rpc DeleteVM (DeleteVMReq) returns (Empty);
|
||||||
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
|
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,8 +5,8 @@ pub mod brain {
|
|||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use brain::{
|
use brain::{
|
||||||
brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, NewVmResp,
|
brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, NewVmReq, NewVmResp,
|
||||||
NewVmReq, NodePubkey, RegisterNodeReq, UpdateVmReq, UpdateVmResp, NodeRes,
|
NewVmRespIp, NodePubkey, NodeResourceReq, RegisterNodeReq, UpdateVmReq, UpdateVmResp,
|
||||||
};
|
};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
@ -65,46 +65,32 @@ async fn send_newvm_resp(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn send_node_resources(
|
||||||
|
mut client: BrainDaemonServiceClient<Channel>,
|
||||||
|
rx: Receiver<NodeResourceReq>,
|
||||||
|
) -> Result<()> {
|
||||||
|
debug!("starting send_newvm_resp stream");
|
||||||
|
let rx_stream = ReceiverStream::new(rx);
|
||||||
|
client.send_node_resources(rx_stream).await?;
|
||||||
|
debug!("send_newvm_resp is about to exit");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn register_node(mut client: BrainDaemonServiceClient<Channel>) {
|
async fn register_node(mut client: BrainDaemonServiceClient<Channel>) {
|
||||||
debug!("Starting node registration...");
|
debug!("Starting node registration...");
|
||||||
let req = RegisterNodeReq {
|
let req = RegisterNodeReq {
|
||||||
node_pubkey: SECURE_PUBLIC_KEY.clone(),
|
node_pubkey: SECURE_PUBLIC_KEY.clone(),
|
||||||
owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY,
|
owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY,
|
||||||
ip: "10.0.10.1".to_string(),
|
|
||||||
};
|
};
|
||||||
match client.register_node(req).await {
|
match client.register_node(req).await {
|
||||||
Ok(_) => info!(
|
Ok(_) => info!(
|
||||||
"Registered as 10.0.10.1 {}",
|
"Registered as 10.0.10.1 from Bruma/Cyrodiil with ID {}",
|
||||||
SECURE_PUBLIC_KEY.clone()
|
SECURE_PUBLIC_KEY.clone()
|
||||||
),
|
),
|
||||||
Err(e) => error!("Could not register node data: {e:?}"),
|
Err(e) => error!("Could not register node data: {e:?}"),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update_node_res(
|
|
||||||
mut _client: BrainDaemonServiceClient<Channel>,
|
|
||||||
tx: Sender<NodeRes>,
|
|
||||||
) -> Result<()> {
|
|
||||||
debug!("Starting node registration...");
|
|
||||||
let req = NodeRes {
|
|
||||||
node_pubkey: SECURE_PUBLIC_KEY.clone(),
|
|
||||||
avail_ports: 10000,
|
|
||||||
avail_ipv4: 10,
|
|
||||||
avail_ipv6: 100_000,
|
|
||||||
avail_vcpus: 16,
|
|
||||||
avail_memory_mb: 20_000,
|
|
||||||
avail_storage_gb: 700,
|
|
||||||
max_ports_per_vm: 5,
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = tx.send(req).await {
|
|
||||||
error!("Failed to send NodeRes: {e}");
|
|
||||||
} else {
|
|
||||||
info!("NodeRes sent successfully");
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn listen_for_deleted_vms(
|
async fn listen_for_deleted_vms(
|
||||||
mut client: BrainDaemonServiceClient<Channel>,
|
mut client: BrainDaemonServiceClient<Channel>,
|
||||||
tx: Sender<DeleteVmReq>,
|
tx: Sender<DeleteVmReq>,
|
||||||
@ -157,17 +143,17 @@ async fn listen_for_update_vm_reqs(
|
|||||||
|
|
||||||
async fn handle_update_vm_requests(
|
async fn handle_update_vm_requests(
|
||||||
mut req: Receiver<UpdateVmReq>,
|
mut req: Receiver<UpdateVmReq>,
|
||||||
resp: Sender<UpdateVmResp>
|
resp_chan: Sender<UpdateVmResp>,
|
||||||
) {
|
) {
|
||||||
info!("Started to handle update vm requests.");
|
info!("Started to handle update vm requests.");
|
||||||
while let Some(update_vm) = req.recv().await {
|
while let Some(update_vm) = req.recv().await {
|
||||||
let confirmation = UpdateVmResp {
|
let update_vm_resp = UpdateVmResp {
|
||||||
uuid: update_vm.uuid,
|
uuid: update_vm.uuid,
|
||||||
error: "".to_string(),
|
error: "".to_string(),
|
||||||
};
|
};
|
||||||
info!("Sending UpdateVmResp: {confirmation:?}");
|
info!("Sending UpdateVmResp: {update_vm_resp:?}");
|
||||||
let _ = resp.send(confirmation).await;
|
let _ = resp_chan.send(update_vm_resp).await;
|
||||||
};
|
}
|
||||||
warn!("update vm request handler is ending");
|
warn!("update vm request handler is ending");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,7 +168,11 @@ async fn send_updatevm_resp(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>) {
|
async fn handle_vm_requests(
|
||||||
|
mut req: Receiver<NewVmReq>,
|
||||||
|
resp: Sender<NewVmResp>,
|
||||||
|
resource_tx: Sender<NodeResourceReq>,
|
||||||
|
) {
|
||||||
info!("Started to handle vm requests. 1 out of 5 requests will return error.");
|
info!("Started to handle vm requests. 1 out of 5 requests will return error.");
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
while let Some(new_vm) = req.recv().await {
|
while let Some(new_vm) = req.recv().await {
|
||||||
@ -190,34 +180,53 @@ async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>
|
|||||||
true => Vec::new(),
|
true => Vec::new(),
|
||||||
false => vec![20321, 20415, 25912],
|
false => vec![20321, 20415, 25912],
|
||||||
};
|
};
|
||||||
let public_ipv4 = match new_vm.public_ipv4 {
|
let mut ips = Vec::new();
|
||||||
true => "10.0.100.5".to_string(),
|
ips.push(NewVmRespIp {
|
||||||
false => String::new(),
|
nic_index: 0,
|
||||||
};
|
address: "190.0.100.5".to_string(),
|
||||||
let public_ipv6 = match new_vm.public_ipv6 {
|
gateway: "190.0.100.1".to_string(),
|
||||||
true => " 2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(),
|
mask: "24".to_string(),
|
||||||
false => String::new(),
|
});
|
||||||
};
|
ips.push(NewVmRespIp {
|
||||||
|
nic_index: 0,
|
||||||
|
address: "2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(),
|
||||||
|
gateway: "2a02:2f2d:d301:3100::1".to_string(),
|
||||||
|
mask: "64".to_string(),
|
||||||
|
});
|
||||||
if i != 3 {
|
if i != 3 {
|
||||||
let confirmation = NewVmResp{
|
let confirmation = NewVmResp {
|
||||||
uuid: new_vm.uuid,
|
uuid: new_vm.uuid,
|
||||||
exposed_ports,
|
exposed_ports,
|
||||||
public_ipv4,
|
ovmf_hash: "YouAreNotGettingHacked".to_string(),
|
||||||
public_ipv6,
|
ips,
|
||||||
error: String::new(),
|
error: String::new(),
|
||||||
};
|
};
|
||||||
info!("Sending NewVmConfirmation: {confirmation:?}");
|
info!("Sending NewVmConfirmation: {confirmation:?}");
|
||||||
let _ = resp.send(confirmation).await;
|
let _ = resp.send(confirmation).await;
|
||||||
|
info!("Sending NodeResourceReq");
|
||||||
} else {
|
} else {
|
||||||
let confirmation = NewVmResp {
|
let confirmation = NewVmResp {
|
||||||
uuid: new_vm.uuid,
|
uuid: new_vm.uuid,
|
||||||
exposed_ports: Vec::new(),
|
exposed_ports: Vec::new(),
|
||||||
public_ipv4: String::new(),
|
ovmf_hash: "YouAreNotGettingHacked".to_string(),
|
||||||
public_ipv6: String::new(),
|
ips: Vec::new(),
|
||||||
error: "No.".to_string(),
|
error: "No.".to_string(),
|
||||||
};
|
};
|
||||||
info!("Sending error for NewVmConfirmation: {confirmation:?}");
|
info!("Sending error for NewVmConfirmation: {confirmation:?}");
|
||||||
let _ = resp.send(confirmation).await;
|
let _ = resp.send(confirmation).await;
|
||||||
|
info!("Sending updated about used resources.");
|
||||||
|
let _ = resource_tx
|
||||||
|
.send(NodeResourceReq {
|
||||||
|
node_pubkey: SECURE_PUBLIC_KEY.clone(),
|
||||||
|
avail_ipv4: 5,
|
||||||
|
avail_ipv6: 5,
|
||||||
|
avail_memory_mb: 5000,
|
||||||
|
avail_vcpus: 32,
|
||||||
|
avail_storage_gb: 100,
|
||||||
|
avail_ports: 5000,
|
||||||
|
max_ports_per_vm: 5,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
i += 1;
|
i += 1;
|
||||||
if i == 5 {
|
if i == 5 {
|
||||||
@ -233,20 +242,30 @@ async fn connect_and_run() -> Result<()> {
|
|||||||
|
|
||||||
register_node(client.clone()).await;
|
register_node(client.clone()).await;
|
||||||
|
|
||||||
let update_node_res_client = client.clone();
|
|
||||||
let (node_tx, _node_rx) = tokio::sync::mpsc::channel(6);
|
|
||||||
streaming_tasks.spawn(update_node_res(update_node_res_client, node_tx));
|
|
||||||
|
|
||||||
let newvm_client = client.clone();
|
let newvm_client = client.clone();
|
||||||
let (tx, newvm_rx) = tokio::sync::mpsc::channel(6);
|
let (tx, newvm_rx) = tokio::sync::mpsc::channel(6);
|
||||||
streaming_tasks.spawn(listen_for_new_vm_reqs(newvm_client, tx));
|
streaming_tasks.spawn(listen_for_new_vm_reqs(newvm_client, tx));
|
||||||
|
|
||||||
let confirm_client = client.clone();
|
let confirm_client = client.clone();
|
||||||
let (confirm_tx, rx) = tokio::sync::mpsc::channel(6);
|
let resource_client = client.clone();
|
||||||
streaming_tasks.spawn(send_newvm_resp(confirm_client, rx));
|
let (resource_tx, resource_rx) = tokio::sync::mpsc::channel(6);
|
||||||
|
let _ = resource_tx
|
||||||
|
.send(NodeResourceReq {
|
||||||
|
node_pubkey: SECURE_PUBLIC_KEY.clone(),
|
||||||
|
avail_ipv4: 10,
|
||||||
|
avail_ipv6: 10,
|
||||||
|
avail_memory_mb: 2500,
|
||||||
|
avail_vcpus: 60,
|
||||||
|
avail_storage_gb: 200,
|
||||||
|
avail_ports: 5002,
|
||||||
|
max_ports_per_vm: 5,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
let (confirm_tx, confirm_rx) = tokio::sync::mpsc::channel(6);
|
||||||
|
streaming_tasks.spawn(send_newvm_resp(confirm_client, confirm_rx));
|
||||||
|
streaming_tasks.spawn(send_node_resources(resource_client, resource_rx));
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
handle_vm_requests(newvm_rx, confirm_tx).await;
|
handle_vm_requests(newvm_rx, confirm_tx, resource_tx).await;
|
||||||
});
|
});
|
||||||
|
|
||||||
let updatevm_client = client.clone();
|
let updatevm_client = client.clone();
|
||||||
@ -266,7 +285,7 @@ async fn connect_and_run() -> Result<()> {
|
|||||||
streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));
|
streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));
|
||||||
|
|
||||||
let task_output = streaming_tasks.join_next().await;
|
let task_output = streaming_tasks.join_next().await;
|
||||||
warn!("One stream exited: {task_output:?} {streaming_tasks:?}");
|
warn!("One stream exited: {task_output:?}");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user