diff --git a/cli-mock/brain.proto b/cli-mock/brain.proto index 9a9e84f..3bb4bf4 100644 --- a/cli-mock/brain.proto +++ b/cli-mock/brain.proto @@ -11,16 +11,17 @@ message NodePubkey { message RegisterNodeReq { string node_pubkey = 1; string owner_pubkey = 2; - string ip = 3; - string country = 4; - string city = 5; - uint32 avail_ports = 6; - uint32 avail_ipv4 = 7; - uint32 avail_ipv6 = 8; - uint32 avail_vcpus = 9; - uint32 avail_memory_mb = 10; - uint32 avail_storage_gb = 11; - uint32 max_ports_per_vm = 12; +} + +message NodeResourceReq { + string node_pubkey = 1; + uint32 avail_ports = 2; + uint32 avail_ipv4 = 3; + uint32 avail_ipv6 = 4; + uint32 avail_vcpus = 5; + uint32 avail_memory_mb = 6; + uint32 avail_storage_gb = 7; + uint32 max_ports_per_vm = 8; } message NewVMReq { @@ -92,6 +93,7 @@ message DeleteVMReq { service BrainDaemonService { rpc RegisterNode (RegisterNodeReq) returns (Empty); + rpc SendNodeResources (stream NodeResourceReq) returns (Empty); rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); rpc SendNewVMResp (stream NewVMResp) returns (Empty); rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); @@ -100,7 +102,6 @@ service BrainDaemonService { rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); } - message NodeFilters { uint32 free_ports = 1; bool offers_ipv4 = 2; @@ -114,10 +115,11 @@ message NodeFilters { message NodeListResp { string node_pubkey = 1; string country = 2; - string city = 3; - string ip = 4; // required for latency test - uint32 server_rating = 5; - uint32 provider_rating = 6; + string region = 3; + string city = 4; + string ip = 5; // required for latency test + uint32 server_rating = 6; + uint32 provider_rating = 7; } service BrainCliService { diff --git a/cli-mock/src/main.rs b/cli-mock/src/main.rs index 2b4655b..0a06df9 100644 --- a/cli-mock/src/main.rs +++ b/cli-mock/src/main.rs @@ -37,8 +37,8 @@ async fn get_node_list(mut client: BrainCliServiceClient) -> Result, + rx: Receiver, +) -> Result<()> { + debug!("starting send_newvm_resp stream"); + let rx_stream = ReceiverStream::new(rx); + client.send_node_resources(rx_stream).await?; + debug!("send_newvm_resp is about to exit"); + Ok(()) +} + async fn register_node(mut client: BrainDaemonServiceClient) { debug!("Starting node registration..."); let req = RegisterNodeReq { node_pubkey: SECURE_PUBLIC_KEY.clone(), owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY, - ip: "10.0.10.1".to_string(), - country: "Cyrodiil".to_string(), - city: "Bruma".to_string(), - avail_ports: 10000, - avail_ipv4: 10, - avail_ipv6: 100_000, - avail_vcpus: 16, - avail_memory_mb: 20_000, - avail_storage_gb: 700, - max_ports_per_vm: 5, }; match client.register_node(req).await { Ok(_) => info!( @@ -142,8 +143,8 @@ async fn listen_for_update_vm_reqs( async fn handle_update_vm_requests( mut req: Receiver, - resp_chan: Sender - ) { + resp_chan: Sender, +) { info!("Started to handle update vm requests."); while let Some(update_vm) = req.recv().await { let update_vm_resp = UpdateVmResp { @@ -152,7 +153,7 @@ async fn handle_update_vm_requests( }; info!("Sending UpdateVmResp: {update_vm_resp:?}"); let _ = resp_chan.send(update_vm_resp).await; - }; + } warn!("update vm request handler is ending"); } @@ -167,7 +168,11 @@ async fn send_updatevm_resp( Ok(()) } -async fn handle_vm_requests(mut req: Receiver, resp: Sender) { +async fn handle_vm_requests( + mut req: Receiver, + resp: Sender, + resource_tx: Sender, +) { info!("Started to handle vm requests. 1 out of 5 requests will return error."); let mut i = 0; while let Some(new_vm) = req.recv().await { @@ -184,7 +189,7 @@ async fn handle_vm_requests(mut req: Receiver, resp: Sender false => String::new(), }; if i != 3 { - let confirmation = NewVmResp{ + let confirmation = NewVmResp { uuid: new_vm.uuid, exposed_ports, public_ipv4, @@ -193,6 +198,7 @@ async fn handle_vm_requests(mut req: Receiver, resp: Sender }; info!("Sending NewVmConfirmation: {confirmation:?}"); let _ = resp.send(confirmation).await; + info!("Sending NodeResourceReq"); } else { let confirmation = NewVmResp { uuid: new_vm.uuid, @@ -203,6 +209,19 @@ async fn handle_vm_requests(mut req: Receiver, resp: Sender }; info!("Sending error for NewVmConfirmation: {confirmation:?}"); let _ = resp.send(confirmation).await; + info!("Sending updated about used resources."); + let _ = resource_tx + .send(NodeResourceReq { + node_pubkey: SECURE_PUBLIC_KEY.clone(), + avail_ipv4: 5, + avail_ipv6: 5, + avail_memory_mb: 5000, + avail_vcpus: 32, + avail_storage_gb: 100, + avail_ports: 5000, + max_ports_per_vm: 5, + }) + .await; } i += 1; if i == 5 { @@ -223,11 +242,25 @@ async fn connect_and_run() -> Result<()> { streaming_tasks.spawn(listen_for_new_vm_reqs(newvm_client, tx)); let confirm_client = client.clone(); - let (confirm_tx, rx) = tokio::sync::mpsc::channel(6); - streaming_tasks.spawn(send_newvm_resp(confirm_client, rx)); - + let resource_client = client.clone(); + let (resource_tx, resource_rx) = tokio::sync::mpsc::channel(6); + let _ = resource_tx + .send(NodeResourceReq { + node_pubkey: SECURE_PUBLIC_KEY.clone(), + avail_ipv4: 10, + avail_ipv6: 10, + avail_memory_mb: 2500, + avail_vcpus: 60, + avail_storage_gb: 200, + avail_ports: 5002, + max_ports_per_vm: 5, + }) + .await; + let (confirm_tx, confirm_rx) = tokio::sync::mpsc::channel(6); + streaming_tasks.spawn(send_newvm_resp(confirm_client, confirm_rx)); + streaming_tasks.spawn(send_node_resources(resource_client, resource_rx)); tokio::spawn(async move { - handle_vm_requests(newvm_rx, confirm_tx).await; + handle_vm_requests(newvm_rx, confirm_tx, resource_tx).await; }); let updatevm_client = client.clone(); @@ -242,7 +275,6 @@ async fn connect_and_run() -> Result<()> { handle_update_vm_requests(updatevm_rx, resp_tx).await; }); - let deletevms_client = client.clone(); let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6); streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));