From a641c0453a6cce27041a6576de84449620a6f896 Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Thu, 26 Dec 2024 01:07:41 +0400 Subject: [PATCH] some name changes --- brain.proto | 29 +++++++++++---------- src/data.rs | 73 +++++++++++++++++++++++++++++++---------------------- src/grpc.rs | 46 ++++++++++++++++----------------- 3 files changed, 81 insertions(+), 67 deletions(-) diff --git a/brain.proto b/brain.proto index e88444e..a33c211 100644 --- a/brain.proto +++ b/brain.proto @@ -8,7 +8,7 @@ message NodePubkey { string node_pubkey = 1; } -message RegisterNodeRequest { +message RegisterNodeReq { string node_pubkey = 1; string owner_pubkey = 2; string ip = 3; @@ -23,7 +23,7 @@ message RegisterNodeRequest { uint32 max_ports_per_vm = 12; } -message NewVMRequest { +message NewVMReq { string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID string hostname = 2; string admin_pubkey = 3; @@ -40,7 +40,7 @@ message NewVMRequest { string dtrfs_sha = 14; } -message UpdateVMRequest { +message UpdateVMReq { string uuid = 1; string node_pubkey = 2; uint32 disk_size_gb = 3; @@ -79,7 +79,7 @@ message ListVMContractsReq { string node_pubkey = 2; } -message NewVMConfirmation { +message NewVMResp { string uuid = 1; repeated uint32 exposed_ports = 2; string public_ipv4 = 3; @@ -87,20 +87,21 @@ message NewVMConfirmation { string error = 5; } -message DeletedVMUpdate { +message DeleteVMReq { string uuid = 1; } service BrainDaemonService { - rpc RegisterNode (RegisterNodeRequest) returns (Empty); - rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); - rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); - rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); + rpc RegisterNode (RegisterNodeReq) returns (Empty); + rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); + rpc SendNewVMResp (stream NewVMResp) returns (Empty); + rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); - rpc GetUpdateVM (NodePubkey) returns (stream UpdateVMRequest); - rpc SendUpdateVM (stream UpdateVMResp) returns (Empty); + rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); + rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); } + message NodeFilters { uint32 free_ports = 1; bool offers_ipv4 = 2; @@ -121,9 +122,9 @@ message NodeListResp { } service BrainCliService { - rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); + rpc CreateVMContract (NewVMReq) returns (NewVMResp); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListNodes (NodeFilters) returns (stream NodeListResp); - rpc DeleteVM (DeletedVMUpdate) returns (Empty); - rpc UpdateVM (UpdateVMRequest) returns (UpdateVMResp); + rpc DeleteVM (DeleteVMReq) returns (Empty); + rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); } diff --git a/src/data.rs b/src/data.rs index af42da0..67ca875 100644 --- a/src/data.rs +++ b/src/data.rs @@ -22,8 +22,8 @@ pub struct Node { pub max_ports_per_vm: u32, } -impl From for Node { - fn from(node: grpc::RegisterNodeRequest) -> Self { +impl From for Node { + fn from(node: grpc::RegisterNodeReq) -> Self { Node { public_key: node.node_pubkey, owner_key: node.owner_pubkey, @@ -95,11 +95,11 @@ impl Into for Contract { pub struct BrainData { nodes: RwLock>, contracts: RwLock>, - tmp_vmrequests: DashMap)>, - tmp_updatevmrequests: DashMap)>, - daemon_deletevm_tx: DashMap>, - daemon_newvm_tx: DashMap>, - daemon_updatevm_tx: DashMap>, + tmp_vmrequests: DashMap)>, + tmp_updatevmrequests: DashMap)>, + daemon_deletevm_tx: DashMap>, + daemon_newvm_tx: DashMap>, + daemon_updatevm_tx: DashMap>, } #[derive(Debug)] @@ -122,7 +122,7 @@ impl BrainData { } } - pub fn insert_node(&self, node: grpc::RegisterNodeRequest) { + pub fn insert_node(&self, node: grpc::RegisterNodeReq) { info!("Registering node {node:?}"); let mut nodes = self.nodes.write().unwrap(); for n in nodes.iter_mut() { @@ -135,18 +135,19 @@ impl BrainData { nodes.push(node.into()); } - pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender) { + pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender) { self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); } - pub async fn add_daemon_updatevm_tx(&self, node_pubkey: &str, tx: Sender) { + pub async fn add_daemon_updatevm_tx(&self, node_pubkey: &str, tx: Sender) { log::debug!("Adding daemon updatevm tx for node_pubkey: {}", node_pubkey); self.tmp_updatevmrequests .retain(|_, req| req.0.node_pubkey != node_pubkey); self.daemon_updatevm_tx.insert(node_pubkey.to_string(), tx); + info!("Added daemon TX for {}", node_pubkey); } - pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { + pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) { if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) { info!("Found vm {}. Deleting...", delete_vm.uuid); if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) { @@ -166,13 +167,13 @@ impl BrainData { } } - pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender) { + pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender) { self.tmp_vmrequests .retain(|_, req| req.0.node_pubkey != node_pubkey); self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); } - pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) { + pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmResp) { let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) { Some((_, r)) => r, None => { @@ -247,8 +248,8 @@ impl BrainData { pub async fn submit_newvmrequest( &self, - mut req: grpc::NewVmRequest, - tx: OneshotSender, + mut req: grpc::NewVmReq, + tx: OneshotSender, ) { let uuid = uuid::Uuid::new_v4().to_string(); @@ -268,7 +269,7 @@ impl BrainData { "Daemon {} RX dropped before sending update. Cleaning memory...", req.node_pubkey ); - self.submit_vmconfirmation(grpc::NewVmConfirmation { + self.submit_vmconfirmation(grpc::NewVmResp { error: "Daemon is offline.".to_string(), uuid, exposed_ports: Vec::new(), @@ -282,7 +283,7 @@ impl BrainData { pub async fn submit_updatevmrequest( &self, - req: grpc::UpdateVmRequest, + req: grpc::UpdateVmReq, tx: OneshotSender, ) { let uuid = req.uuid.clone(); @@ -294,20 +295,32 @@ impl BrainData { "Found daemon TX for {}. Sending updateVMReq {}", req.node_pubkey, req.uuid ); - if server_tx.send(req.clone()).await.is_ok() { - return; - } else { - warn!( - "Daemon {} RX dropped before sending update. Cleaning memory...", - req.node_pubkey - ); - self.submit_update_vmconfirmation(grpc::UpdateVmResp { - uuid, - timestamp: chrono::Utc::now().to_rfc3339(), - error: "Daemon is offline.".to_string(), - }) - .await; + match server_tx.send(req.clone()).await { + Ok(_) => { + debug!("Successfully sent updateVMReq to {}", req.node_pubkey); + return; + } + Err(e) => { + warn!( + "Failed to send updateVMReq to {}: {}. Cleaning memory...", + req.node_pubkey, e + ); + self.submit_update_vmconfirmation(grpc::UpdateVmResp { + uuid, + timestamp: chrono::Utc::now().to_rfc3339(), + error: "Daemon is offline.".to_string(), + }) + .await; + } } + } else { + warn!("No daemon TX found for {}", req.node_pubkey); + self.submit_update_vmconfirmation(grpc::UpdateVmResp { + uuid, + timestamp: chrono::Utc::now().to_rfc3339(), + error: "Daemon is offline.".to_string(), + }) + .await; } } diff --git a/src/grpc.rs b/src/grpc.rs index bed0966..2ff2688 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -41,13 +41,13 @@ impl BrainCliMock { impl BrainDaemonService for BrainDaemonMock { async fn register_node( &self, - req: Request, + req: Request, ) -> Result, Status> { self.data.insert_node(req.into_inner()); Ok(Response::new(Empty {})) } - type GetNewVMReqsStream = Pin> + Send>>; + type GetNewVMReqsStream = Pin> + Send>>; async fn get_new_vm_reqs( &self, @@ -67,7 +67,7 @@ impl BrainDaemonService for BrainDaemonMock { debug!("Sending NewVMRequest to {}: {newvmreq:?}", req.node_pubkey); if let Err(e) = grpc_tx.send(Ok(newvmreq)).await { warn!("Could not send NewVMRequest to {}: {e:?}", req.node_pubkey); - data.submit_vmconfirmation(NewVmConfirmation { + data.submit_vmconfirmation(NewVmResp { error: "Daemon not connected.".to_string(), uuid, ..Default::default() @@ -83,11 +83,11 @@ impl BrainDaemonService for BrainDaemonMock { )) } - async fn send_vm_confirmations( + async fn send_new_vm_resp( &self, - req: Request>, + req: Request>, ) -> Result, Status> { - debug!("Some node connected to stream NewVmConfirmation"); + debug!("Some node connected to stream NewVMResp"); let mut confirmations = req.into_inner(); while let Some(confirmation) = confirmations.next().await { match confirmation { @@ -96,21 +96,21 @@ impl BrainDaemonService for BrainDaemonMock { self.data.submit_vmconfirmation(c).await; } Err(e) => { - log::warn!("Daemon disconnected from Streaming: {e:?}") + log::warn!("Daemon disconnected from Streaming: {e:?}") } } } Ok(Response::new(Empty {})) } - type DeletedVMUpdatesStream = Pin> + Send>>; + type GetDeleteVMReqStream = Pin> + Send>>; - async fn deleted_vm_updates( + async fn get_delete_vm_req( &self, req: Request, - ) -> Result, Status> { + ) -> Result, Status> { let node_pubkey = req.into_inner().node_pubkey; - info!("Daemon {node_pubkey} requested DeletedVMUpdatesStream"); + info!("Daemon {node_pubkey} requested GetDeleteVMReqStream"); let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); self.data @@ -134,7 +134,7 @@ impl BrainDaemonService for BrainDaemonMock { }); let output_stream = ReceiverStream::new(grpc_rx); Ok(Response::new( - Box::pin(output_stream) as Self::DeletedVMUpdatesStream + Box::pin(output_stream) as Self::GetDeleteVMReqStream )) } @@ -159,12 +159,12 @@ impl BrainDaemonService for BrainDaemonMock { )) } - type GetUpdateVMStream = Pin> + Send>>; + type GetUpdateVMReqStream = Pin> + Send>>; - async fn get_update_vm( + async fn get_update_vm_req( &self, req: Request, - ) -> Result, Status> { + ) -> Result, Status> { let req = req.into_inner(); info!("Daemon {} requested GetUpdateVMStream", req.node_pubkey); let (grpc_tx, grpc_rx) = mpsc::channel(6); @@ -190,15 +190,15 @@ impl BrainDaemonService for BrainDaemonMock { }); let output_stream = ReceiverStream::new(grpc_rx); Ok(Response::new( - Box::pin(output_stream) as Self::GetUpdateVMStream + Box::pin(output_stream) as Self::GetUpdateVMReqStream )) } - async fn send_update_vm( + async fn send_update_vm_resp( &self, req: Request>, ) -> Result, Status> { - debug!("Some node connected to stream NewVmConfirmation"); + debug!("Some node connected to stream NewVMResp"); let mut confirmations = req.into_inner(); while let Some(confirmation) = confirmations.next().await { match confirmation { @@ -207,7 +207,7 @@ impl BrainDaemonService for BrainDaemonMock { self.data.submit_update_vmconfirmation(c).await; } Err(e) => { - log::warn!("Daemon disconnected from Streaming: {e:?}") + log::warn!("Daemon disconnected from Streaming: {e:?}") } } } @@ -219,8 +219,8 @@ impl BrainDaemonService for BrainDaemonMock { impl BrainCliService for BrainCliMock { async fn create_vm_contract( &self, - req: Request, - ) -> Result, Status> { + req: Request, + ) -> Result, Status> { let req = req.into_inner(); info!("New VM requested via CLI: {req:?}"); let admin_pubkey = req.admin_pubkey.clone(); @@ -242,7 +242,7 @@ impl BrainCliService for BrainCliMock { async fn update_vm( &self, - req: Request, + req: Request, ) -> Result, Status> { let req = req.into_inner(); info!("Update VM requested via CLI: {req:?}"); @@ -303,7 +303,7 @@ impl BrainCliService for BrainCliMock { )) } - async fn delete_vm(&self, req: Request) -> Result, Status> { + async fn delete_vm(&self, req: Request) -> Result, Status> { let req = req.into_inner(); info!("Unknown CLI requested to delete vm {}", req.uuid); self.data.delete_vm(req).await;