From 59d4e25bb1edcddac1105b8b1c4cdcd76f6361d8 Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Tue, 24 Dec 2024 20:40:32 +0400 Subject: [PATCH] created update functionality?? ??????? --- brain.proto | 5 ++- src/data.rs | 44 +++++++-------------- src/grpc.rs | 112 ++++++++++++++++++++++++++++++++++++---------------- 3 files changed, 96 insertions(+), 65 deletions(-) diff --git a/brain.proto b/brain.proto index 4d93e73..e88444e 100644 --- a/brain.proto +++ b/brain.proto @@ -97,7 +97,8 @@ service BrainDaemonService { rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); - rpc UpdateVMs (NodePubkey) returns (stream UpdateVMRequest); + rpc GetUpdateVM (NodePubkey) returns (stream UpdateVMRequest); + rpc SendUpdateVM (stream UpdateVMResp) returns (Empty); } message NodeFilters { @@ -125,4 +126,4 @@ service BrainCliService { rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc DeleteVM (DeletedVMUpdate) returns (Empty); rpc UpdateVM (UpdateVMRequest) returns (UpdateVMResp); -} \ No newline at end of file +} diff --git a/src/data.rs b/src/data.rs index 23b59ec..5951faa 100644 --- a/src/data.rs +++ b/src/data.rs @@ -96,7 +96,6 @@ pub struct BrainData { nodes: RwLock>, contracts: RwLock>, tmp_vmrequests: DashMap)>, - tmp_updatevmrequests: DashMap)>, daemon_deletevm_tx: DashMap>, daemon_newvm_tx: DashMap>, daemon_updatevm_tx: DashMap>, @@ -115,7 +114,6 @@ impl BrainData { nodes: RwLock::new(Vec::new()), contracts: RwLock::new(Vec::new()), tmp_vmrequests: DashMap::new(), - tmp_updatevmrequests: DashMap::new(), daemon_deletevm_tx: DashMap::new(), daemon_newvm_tx: DashMap::new(), daemon_updatevm_tx: DashMap::new(), @@ -139,6 +137,10 @@ impl BrainData { self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); } + pub fn add_daemon_updatevm_tx(&self, node_pubkey: &str, _tx: Sender) { + self.daemon_updatevm_tx.insert(node_pubkey.to_string(), _tx); + } + pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) { info!("Found vm {}. Deleting...", delete_vm.uuid); @@ -241,35 +243,17 @@ impl BrainData { } } - pub async fn submit_updatevmrequest( - &self, - req: grpc::UpdateVmRequest, - tx: OneshotSender, - ) { - let uuid = req.uuid.clone(); - info!("Inserting update vm request in memory: {:?}", req); - self.tmp_updatevmrequests.insert(uuid.clone(), (req.clone(), tx)); - - if let Some(server_tx) = self.daemon_updatevm_tx.get(&req.node_pubkey) { - if server_tx.send(req.clone()).await.is_err() { - warn!("Daemon {} RX dropped before sending update VM. Cleaning memory...", &req.node_pubkey); - if let Some((_, oneshot_tx)) = self.tmp_updatevmrequests.remove(&uuid) { - let _ = oneshot_tx.1.send(grpc::UpdateVmResp { - uuid, - timestamp: "".to_string(), - error: "Daemon is offline.".to_string(), - }); - } - } + pub fn update_vm(&self, req: grpc::UpdateVmRequest) -> Result<(), String> { + let mut contracts = self.contracts.write().unwrap(); + if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == req.uuid) { + contract.disk_size_gb = req.disk_size_gb; + contract.vcpus = req.vcpus; + contract.memory_mb = req.memory_mb; + contract.kernel_sha = req.kernel_sha; + contract.dtrfs_sha = req.dtrfs_sha; + Ok(()) } else { - warn!("No daemon TX found for {}", req.node_pubkey); - if let Some((_, oneshot_tx)) = self.tmp_updatevmrequests.remove(&uuid) { - let _ = oneshot_tx.1.send(grpc::UpdateVmResp { - uuid, - timestamp: "".to_string(), - error: "Daemon is offline.".to_string(), - }); - } + Err(format!("Contract {} not found", req.uuid)) } } diff --git a/src/grpc.rs b/src/grpc.rs index fe51c9e..c0aa4f9 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -103,8 +103,7 @@ impl BrainDaemonService for BrainDaemonMock { Ok(Response::new(Empty {})) } - type DeletedVMUpdatesStream = - Pin> + Send>>; + type DeletedVMUpdatesStream = Pin> + Send>>; async fn deleted_vm_updates( &self, @@ -160,27 +159,73 @@ impl BrainDaemonService for BrainDaemonMock { )) } - type UpdateVMsStream = Pin> + Send>>; + type GetUpdateVMStream = Pin> + Send>>; - async fn update_v_ms( + async fn get_update_vm( &self, req: Request, - ) -> Result, Status> { - let node_pubkey = req.into_inner().node_pubkey; + ) -> Result, Status> { + let req = req.into_inner(); + info!("Node {} requested GetUpdateVMsStream", req.node_pubkey); let (grpc_tx, grpc_rx) = mpsc::channel(6); - let (_, mut data_rx) = mpsc::channel(6); - + let (data_tx, mut data_rx) = mpsc::channel(6); + self.data + .clone() + .add_daemon_updatevm_tx(&req.node_pubkey, data_tx); + let data = self.data.clone(); tokio::spawn(async move { - while let Some(update_req) = data_rx.recv().await { - if let Err(e) = grpc_tx.send(Ok(update_req)).await { - warn!("Could not send UpdateVmRequest to {node_pubkey}: {e:?}"); - break; + while let Some(updatevmreq) = data_rx.recv().await { + debug!("Sending UpdateVMRequest to {}: {:?}", req.node_pubkey.clone(), updatevmreq.clone()); + if let Err(e) = grpc_tx.send(Ok(updatevmreq.clone())).await { + warn!("Could not send UpdateVMRequest to {}: {e:?}", req.node_pubkey.clone()); + continue; } + data.update_vm(UpdateVmRequest { + uuid: updatevmreq.uuid, + node_pubkey: updatevmreq.node_pubkey, + disk_size_gb: updatevmreq.disk_size_gb, + memory_mb: updatevmreq.memory_mb, + vcpus: updatevmreq.vcpus, + kernel_sha: updatevmreq.kernel_sha, + dtrfs_sha: updatevmreq.dtrfs_sha, + kernel_url: updatevmreq.kernel_url, + dtrfs_url: updatevmreq.dtrfs_url, + }); + break; } }); - let output_stream = ReceiverStream::new(grpc_rx); - Ok(Response::new(Box::pin(output_stream) as Self::UpdateVMsStream)) + Ok(Response::new( + Box::pin(output_stream) as Self::GetUpdateVMStream + )) + } + + async fn send_update_vm( + &self, + req: Request>, + ) -> Result, Status> { + let req = req.into_inner(); + info!("Received UpdateVMs request"); + + // Perform the update operation + let result = self.data.update_all_vms().await; + + match result { + Ok(_) => { + let response = UpdateVmResp { + success: true, + message: "All VMs updated successfully".to_string(), + }; + Ok(Response::new(response)) + } + Err(e) => { + let response = UpdateVmResp { + success: false, + message: format!("Failed to update VMs: {:?}", e), + }; + Err(Status::internal(response.message)) + } + } } } @@ -209,6 +254,25 @@ impl BrainCliService for BrainCliMock { } } + async fn update_vm( + &self, + req: Request, + ) -> Result, Status> { + let req = req.into_inner(); + match self.data.update_vm(req.clone()) { + Ok(_) => Ok(Response::new(UpdateVmResp { + uuid: req.uuid, + timestamp: format!("{:?}", std::time::SystemTime::now()), + error: "".to_string(), + })), + Err(e) => Ok(Response::new(UpdateVmResp { + uuid: req.uuid, + timestamp: "".to_string(), + error: e, + })), + } + } + type ListVMContractsStream = Pin> + Send>>; async fn list_vm_contracts( &self, @@ -256,23 +320,5 @@ impl BrainCliService for BrainCliMock { Ok(Response::new(Empty {})) } - async fn update_vm( - &self, - req: Request, - ) -> Result, Status> { - let req = req.into_inner(); - info!("CLI requested UpdateVM: {req:?}"); - let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); - self.data.submit_updatevmrequest(req, oneshot_tx).await; - match oneshot_rx.await { - Ok(resp) => { - info!("Returning UpdateVmResp: {resp:?}"); - Ok(Response::new(resp)) - }, - Err(e) => { - log::error!("Error waiting for UpdateVmResp: {e:?}"); - Err(Status::unknown("Daemon not responding.")) - } - } - } + }