diff --git a/src/data.rs b/src/data.rs index eee5746..af42da0 100644 --- a/src/data.rs +++ b/src/data.rs @@ -96,6 +96,7 @@ pub struct BrainData { nodes: RwLock>, contracts: RwLock>, tmp_vmrequests: DashMap)>, + tmp_updatevmrequests: DashMap)>, daemon_deletevm_tx: DashMap>, daemon_newvm_tx: DashMap>, daemon_updatevm_tx: DashMap>, @@ -114,6 +115,7 @@ impl BrainData { nodes: RwLock::new(Vec::new()), contracts: RwLock::new(Vec::new()), tmp_vmrequests: DashMap::new(), + tmp_updatevmrequests: DashMap::new(), daemon_deletevm_tx: DashMap::new(), daemon_newvm_tx: DashMap::new(), daemon_updatevm_tx: DashMap::new(), @@ -137,8 +139,11 @@ impl BrainData { self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); } - pub async fn add_daemon_updatevm_tx(&self, node_pubkey: &str, _tx: Sender) { - self.daemon_updatevm_tx.insert(node_pubkey.to_string(), _tx); + pub async fn add_daemon_updatevm_tx(&self, node_pubkey: &str, tx: Sender) { + log::debug!("Adding daemon updatevm tx for node_pubkey: {}", node_pubkey); + self.tmp_updatevmrequests + .retain(|_, req| req.0.node_pubkey != node_pubkey); + self.daemon_updatevm_tx.insert(node_pubkey.to_string(), tx); } pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { @@ -208,6 +213,38 @@ impl BrainData { self.contracts.write().unwrap().push(contract); } + pub async fn submit_update_vmconfirmation(&self, confirmation: grpc::UpdateVmResp) { + let updatevmreq = match self.tmp_updatevmrequests.remove(&confirmation.uuid) { + Some((_, r)) => r, + None => { + log::error!( + "Received confirmation for ghost UpdateVMRequest {}", + confirmation.uuid + ); + return; + } + }; + if let Err(e) = updatevmreq.1.send(confirmation.clone()) { + log::error!( + "CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", + &updatevmreq.0.node_pubkey, + confirmation, + e + ); + } + if confirmation.error != "" { + return; + } + let mut contracts = self.contracts.write().unwrap(); + if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == confirmation.uuid) { + contract.disk_size_gb = updatevmreq.0.disk_size_gb; + contract.vcpus = updatevmreq.0.vcpus; + contract.memory_mb = updatevmreq.0.memory_mb; + contract.kernel_sha = updatevmreq.0.kernel_sha; + contract.dtrfs_sha = updatevmreq.0.dtrfs_sha; + } + } + pub async fn submit_newvmrequest( &self, mut req: grpc::NewVmRequest, @@ -243,17 +280,34 @@ impl BrainData { } } - pub async fn update_vm(&self, req: grpc::UpdateVmRequest) -> Result<(), String> { - let mut contracts = self.contracts.write().unwrap(); - if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == req.uuid) { - contract.disk_size_gb = req.disk_size_gb; - contract.vcpus = req.vcpus; - contract.memory_mb = req.memory_mb; - contract.kernel_sha = req.kernel_sha; - contract.dtrfs_sha = req.dtrfs_sha; - Ok(()) - } else { - Err(format!("Contract {} not found", req.uuid)) + pub async fn submit_updatevmrequest( + &self, + req: grpc::UpdateVmRequest, + tx: OneshotSender, + ) { + let uuid = req.uuid.clone(); + info!("Inserting new vm update request in memory: {req:?}"); + self.tmp_updatevmrequests + .insert(req.uuid.clone(), (req.clone(), tx)); + if let Some(server_tx) = self.daemon_updatevm_tx.get(&req.node_pubkey) { + debug!( + "Found daemon TX for {}. Sending updateVMReq {}", + req.node_pubkey, req.uuid + ); + if server_tx.send(req.clone()).await.is_ok() { + return; + } else { + warn!( + "Daemon {} RX dropped before sending update. Cleaning memory...", + req.node_pubkey + ); + self.submit_update_vmconfirmation(grpc::UpdateVmResp { + uuid, + timestamp: chrono::Utc::now().to_rfc3339(), + error: "Daemon is offline.".to_string(), + }) + .await; + } } } diff --git a/src/grpc.rs b/src/grpc.rs index 1f7d7f4..bed0966 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; -use chrono; pub struct BrainDaemonMock { data: Arc, @@ -165,19 +164,27 @@ impl BrainDaemonService for BrainDaemonMock { async fn get_update_vm( &self, req: Request, - ) -> Result, Status> { + ) -> Result, Status> { let req = req.into_inner(); - info!("Daemon {} requested UpdateVMReqsStream", req.node_pubkey); + info!("Daemon {} requested GetUpdateVMStream", req.node_pubkey); let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); self.data .add_daemon_updatevm_tx(&req.node_pubkey, data_tx) .await; + let data = self.data.clone(); tokio::spawn(async move { while let Some(updatevmreq) = data_rx.recv().await { debug!("Sending UpdateVMRequest to {}: {updatevmreq:?}", req.node_pubkey); - if let Err(e) = grpc_tx.send(Ok(updatevmreq)).await { + if let Err(e) = grpc_tx.send(Ok(updatevmreq.clone())).await { warn!("Could not send UpdateVMRequest to {}: {e:?}", req.node_pubkey); + data.submit_update_vmconfirmation(UpdateVmResp { + error: "Daemon not connected.".to_string(), + uuid: updatevmreq.uuid, + timestamp: chrono::Utc::now().to_rfc3339(), + }) + .await; + break; } } }); @@ -191,15 +198,16 @@ impl BrainDaemonService for BrainDaemonMock { &self, req: Request>, ) -> Result, Status> { - debug!("Some node connected to stream UpdateVmResp"); + debug!("Some node connected to stream NewVmConfirmation"); let mut confirmations = req.into_inner(); while let Some(confirmation) = confirmations.next().await { match confirmation { Ok(c) => { - info!("Received update confirmation from daemon: {c:?}") + info!("Received confirmation from daemon: {c:?}"); + self.data.submit_update_vmconfirmation(c).await; } Err(e) => { - log::warn!("Daemon disconnected from Streaming: {e:?}") + log::warn!("Daemon disconnected from Streaming: {e:?}") } } } @@ -237,17 +245,21 @@ impl BrainCliService for BrainCliMock { req: Request, ) -> Result, Status> { let req = req.into_inner(); - match self.data.update_vm(req.clone()).await { - Ok(_) => Ok(Response::new(UpdateVmResp { - uuid: req.uuid, - timestamp: chrono::Utc::now().to_rfc3339(), - error: "".to_string(), - })), - Err(e) => Ok(Response::new(UpdateVmResp { - uuid: req.uuid, - timestamp: "".to_string(), - error: e, - })), + info!("Update VM requested via CLI: {req:?}"); + let node_pubkey = req.node_pubkey.clone(); + let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); + self.data.submit_updatevmrequest(req, oneshot_tx).await; + match oneshot_rx.await { + Ok(response) => { + info!("Sending Update VM confirmation to {node_pubkey}: {response:?}"); + Ok(Response::new(response)) + } + Err(e) => { + log::error!("Something weird happened. Reached error {e:?}"); + Err(Status::unknown( + "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", + )) + } } }