From efc3fb81b2bd148e870660f9cfda75c0cae71fe6 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Sun, 22 Dec 2024 19:59:32 +0200 Subject: [PATCH] fixed deadlock for dead nodes --- src/data.rs | 82 ++++++++++++++++++++++------------------------------- src/grpc.rs | 50 +++++++++++++++++++------------- 2 files changed, 64 insertions(+), 68 deletions(-) diff --git a/src/data.rs b/src/data.rs index eb3f2e7..c239618 100644 --- a/src/data.rs +++ b/src/data.rs @@ -95,8 +95,7 @@ impl Into for Contract { pub struct BrainData { nodes: RwLock>, contracts: RwLock>, - tmp_vmrequests: DashMap, - cli_vmcontract_tx: DashMap>, + tmp_vmrequests: DashMap)>, daemon_deletevm_tx: DashMap>, daemon_newvm_tx: DashMap>, } @@ -114,7 +113,6 @@ impl BrainData { nodes: RwLock::new(Vec::new()), contracts: RwLock::new(Vec::new()), tmp_vmrequests: DashMap::new(), - cli_vmcontract_tx: DashMap::new(), daemon_deletevm_tx: DashMap::new(), daemon_newvm_tx: DashMap::new(), } @@ -150,7 +148,6 @@ impl BrainData { "Failed to send deletion request to {}. Triggering memory cleanup.", contract.node_pubkey ); - self.memory_cleanup(); } } let mut contracts = self.contracts.write().unwrap(); @@ -159,14 +156,8 @@ impl BrainData { } pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender) { - for dangling_vm_request in self - .tmp_vmrequests - .iter() - .filter(|req| req.node_pubkey == node_pubkey) - .map(|entry| entry.value().clone()) - { - let _ = tx.send(dangling_vm_request).await; - } + self.tmp_vmrequests + .retain(|_, req| req.0.node_pubkey != node_pubkey); self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); } @@ -174,17 +165,20 @@ impl BrainData { let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) { Some((_, r)) => r, None => { - log::error!("Received confirmation for ghost NewVMReq {}", confirmation.uuid); - return - }, - }; - if let Some((_, client_tx)) = self.cli_vmcontract_tx.remove(&newvmreq.admin_pubkey) { - if let Err(e) = client_tx.send(confirmation.clone()) { log::error!( - "CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", - &newvmreq.admin_pubkey, confirmation, e + "Received confirmation for ghost NewVMReq {}", + confirmation.uuid ); + return; } + }; + if let Err(e) = newvmreq.1.send(confirmation.clone()) { + log::error!( + "CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", + &newvmreq.0.admin_pubkey, + confirmation, + e + ); } if confirmation.error == "" { return; @@ -195,14 +189,14 @@ impl BrainData { public_ipv4: confirmation.public_ipv4, public_ipv6: confirmation.public_ipv6, created_at: format!("{:?}", std::time::SystemTime::now()), - hostname: newvmreq.hostname, - admin_pubkey: newvmreq.admin_pubkey, - node_pubkey: newvmreq.node_pubkey, - disk_size_gb: newvmreq.disk_size_gb, - vcpus: newvmreq.vcpus, - memory_mb: newvmreq.memory_mb, - kernel_sha: newvmreq.kernel_sha, - dtrfs_sha: newvmreq.dtrfs_sha, + hostname: newvmreq.0.hostname, + admin_pubkey: newvmreq.0.admin_pubkey, + node_pubkey: newvmreq.0.node_pubkey, + disk_size_gb: newvmreq.0.disk_size_gb, + vcpus: newvmreq.0.vcpus, + memory_mb: newvmreq.0.memory_mb, + kernel_sha: newvmreq.0.kernel_sha, + dtrfs_sha: newvmreq.0.dtrfs_sha, }; info!("Created new contract: {contract:?}"); self.contracts.write().unwrap().push(contract); @@ -212,26 +206,25 @@ impl BrainData { &self, mut req: grpc::NewVmRequest, tx: OneshotSender, - ) -> bool { + ) { req.uuid = uuid::Uuid::new_v4().to_string(); info!("Inserting new vm request in memory: {req:?}"); - self.tmp_vmrequests.insert(req.uuid.clone(), req.clone()); - self.cli_vmcontract_tx - .insert(req.admin_pubkey.to_string(), tx); + self.tmp_vmrequests + .insert(req.uuid.clone(), (req.clone(), tx)); if let Some(server_tx) = self.daemon_newvm_tx.get(&req.node_pubkey) { - debug!("Found daemon TX for {}. Sending newVMReq {}", req.node_pubkey, req.uuid); + debug!( + "Found daemon TX for {}. Sending newVMReq {}", + req.node_pubkey, req.uuid + ); if server_tx.send(req.clone()).await.is_ok() { - return true; + return; } else { - warn!("Daemon {} RX dropped before sending update. Cleaning memory...", req.node_pubkey); - self.memory_cleanup(); + warn!( + "Daemon {} RX dropped before sending update. Cleaning memory...", + req.node_pubkey + ); } } - false - } - - pub fn del_cli_vmcontract_tx(&self, admin_pubkey: &str) { - self.cli_vmcontract_tx.remove(admin_pubkey); } pub fn insert_contract(&self, contract: Contract) { @@ -287,11 +280,4 @@ impl BrainData { .filter(|c| c.node_pubkey == node_pubkey) .collect() } - - pub fn memory_cleanup(&self) { - self.daemon_newvm_tx - .retain(|_, server_tx| !server_tx.is_closed()); - self.daemon_deletevm_tx - .retain(|_, server_tx| !server_tx.is_closed()); - } } diff --git a/src/grpc.rs b/src/grpc.rs index 4f13974..72d2f69 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -10,6 +10,7 @@ use brain::brain_daemon_service_server::BrainDaemonService; use brain::*; use log::debug; use log::info; +use log::warn; use std::pin::Pin; use std::sync::Arc; use tokio::sync::mpsc; @@ -57,16 +58,23 @@ impl BrainDaemonService for BrainDaemonMock { let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); self.data - .clone() .add_daemon_newvm_tx(&req.node_pubkey, data_tx) .await; + let data = self.data.clone(); tokio::spawn(async move { while let Some(newvmreq) = data_rx.recv().await { - debug!( - "received this newvmreq to {}: {newvmreq:?}", - req.node_pubkey - ); - let _ = grpc_tx.send(Ok(newvmreq)).await; + let uuid = newvmreq.uuid.clone(); + debug!("Sending NewVMRequest to {}: {newvmreq:?}", req.node_pubkey); + if let Err(e) = grpc_tx.send(Ok(newvmreq)).await { + warn!("Could not send NewVMRequest to {}: {e:?}", req.node_pubkey); + data.submit_vmconfirmation(NewVmConfirmation { + error: "Daemon not connected.".to_string(), + uuid, + ..Default::default() + }) + .await; + break; + } } }); let output_stream = ReceiverStream::new(grpc_rx); @@ -87,7 +95,9 @@ impl BrainDaemonService for BrainDaemonMock { info!("Received confirmation from daemon: {c:?}"); self.data.submit_vmconfirmation(c).await; } - Err(e) => log::warn!("Daemon disconnected from Streaming: {e:?}"), + Err(e) => { + log::warn!("Daemon disconnected from Streaming: {e:?}") + } } } Ok(Response::new(Empty {})) @@ -160,20 +170,20 @@ impl BrainCliService for BrainCliMock { let req = req.into_inner(); info!("New VM requested via CLI: {req:?}"); let admin_pubkey = req.admin_pubkey.clone(); - let (engine_tx, engine_rx) = tokio::sync::oneshot::channel(); - if !self.data.submit_newvmrequest(req, engine_tx).await { - return Err(Status::unavailable( - "The node you picked is currently offline.", - )); + let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); + self.data.submit_newvmrequest(req, oneshot_tx).await; + match oneshot_rx.await { + Ok(response) => { + info!("Sending VM confirmation to {admin_pubkey}: {response:?}"); + Ok(Response::new(response)) + } + Err(e) => { + log::error!("Something weird happened. Reached error {e:?}"); + Err(Status::unknown( + "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", + )) + } } - if let Ok(response) = engine_rx.await { - info!("Sending VM confirmation to {admin_pubkey}: {response:?}"); - return Ok(Response::new(response)); - } - self.data.del_cli_vmcontract_tx(&admin_pubkey); - Err(Status::unknown( - "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", - )) } type ListVMContractsStream = Pin> + Send>>;