fixed deadlock for dead nodes

This commit is contained in:
ghe0 2024-12-22 19:59:32 +02:00
parent cc06a9bc00
commit efc3fb81b2
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
2 changed files with 64 additions and 68 deletions

@ -95,8 +95,7 @@ impl Into<grpc::VmContract> for Contract {
pub struct BrainData { pub struct BrainData {
nodes: RwLock<Vec<Node>>, nodes: RwLock<Vec<Node>>,
contracts: RwLock<Vec<Contract>>, contracts: RwLock<Vec<Contract>>,
tmp_vmrequests: DashMap<String, grpc::NewVmRequest>, tmp_vmrequests: DashMap<String, (grpc::NewVmRequest, OneshotSender<grpc::NewVmConfirmation>)>,
cli_vmcontract_tx: DashMap<String, OneshotSender<grpc::NewVmConfirmation>>,
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>, daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>,
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>, daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>,
} }
@ -114,7 +113,6 @@ impl BrainData {
nodes: RwLock::new(Vec::new()), nodes: RwLock::new(Vec::new()),
contracts: RwLock::new(Vec::new()), contracts: RwLock::new(Vec::new()),
tmp_vmrequests: DashMap::new(), tmp_vmrequests: DashMap::new(),
cli_vmcontract_tx: DashMap::new(),
daemon_deletevm_tx: DashMap::new(), daemon_deletevm_tx: DashMap::new(),
daemon_newvm_tx: DashMap::new(), daemon_newvm_tx: DashMap::new(),
} }
@ -150,7 +148,6 @@ impl BrainData {
"Failed to send deletion request to {}. Triggering memory cleanup.", "Failed to send deletion request to {}. Triggering memory cleanup.",
contract.node_pubkey contract.node_pubkey
); );
self.memory_cleanup();
} }
} }
let mut contracts = self.contracts.write().unwrap(); let mut contracts = self.contracts.write().unwrap();
@ -159,14 +156,8 @@ impl BrainData {
} }
pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmRequest>) { pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmRequest>) {
for dangling_vm_request in self self.tmp_vmrequests
.tmp_vmrequests .retain(|_, req| req.0.node_pubkey != node_pubkey);
.iter()
.filter(|req| req.node_pubkey == node_pubkey)
.map(|entry| entry.value().clone())
{
let _ = tx.send(dangling_vm_request).await;
}
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
} }
@ -174,18 +165,21 @@ impl BrainData {
let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) { let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) {
Some((_, r)) => r, Some((_, r)) => r,
None => { None => {
log::error!("Received confirmation for ghost NewVMReq {}", confirmation.uuid); log::error!(
return "Received confirmation for ghost NewVMReq {}",
}, confirmation.uuid
);
return;
}
}; };
if let Some((_, client_tx)) = self.cli_vmcontract_tx.remove(&newvmreq.admin_pubkey) { if let Err(e) = newvmreq.1.send(confirmation.clone()) {
if let Err(e) = client_tx.send(confirmation.clone()) {
log::error!( log::error!(
"CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", "CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}",
&newvmreq.admin_pubkey, confirmation, e &newvmreq.0.admin_pubkey,
confirmation,
e
); );
} }
}
if confirmation.error == "" { if confirmation.error == "" {
return; return;
} }
@ -195,14 +189,14 @@ impl BrainData {
public_ipv4: confirmation.public_ipv4, public_ipv4: confirmation.public_ipv4,
public_ipv6: confirmation.public_ipv6, public_ipv6: confirmation.public_ipv6,
created_at: format!("{:?}", std::time::SystemTime::now()), created_at: format!("{:?}", std::time::SystemTime::now()),
hostname: newvmreq.hostname, hostname: newvmreq.0.hostname,
admin_pubkey: newvmreq.admin_pubkey, admin_pubkey: newvmreq.0.admin_pubkey,
node_pubkey: newvmreq.node_pubkey, node_pubkey: newvmreq.0.node_pubkey,
disk_size_gb: newvmreq.disk_size_gb, disk_size_gb: newvmreq.0.disk_size_gb,
vcpus: newvmreq.vcpus, vcpus: newvmreq.0.vcpus,
memory_mb: newvmreq.memory_mb, memory_mb: newvmreq.0.memory_mb,
kernel_sha: newvmreq.kernel_sha, kernel_sha: newvmreq.0.kernel_sha,
dtrfs_sha: newvmreq.dtrfs_sha, dtrfs_sha: newvmreq.0.dtrfs_sha,
}; };
info!("Created new contract: {contract:?}"); info!("Created new contract: {contract:?}");
self.contracts.write().unwrap().push(contract); self.contracts.write().unwrap().push(contract);
@ -212,26 +206,25 @@ impl BrainData {
&self, &self,
mut req: grpc::NewVmRequest, mut req: grpc::NewVmRequest,
tx: OneshotSender<grpc::NewVmConfirmation>, tx: OneshotSender<grpc::NewVmConfirmation>,
) -> bool { ) {
req.uuid = uuid::Uuid::new_v4().to_string(); req.uuid = uuid::Uuid::new_v4().to_string();
info!("Inserting new vm request in memory: {req:?}"); info!("Inserting new vm request in memory: {req:?}");
self.tmp_vmrequests.insert(req.uuid.clone(), req.clone()); self.tmp_vmrequests
self.cli_vmcontract_tx .insert(req.uuid.clone(), (req.clone(), tx));
.insert(req.admin_pubkey.to_string(), tx);
if let Some(server_tx) = self.daemon_newvm_tx.get(&req.node_pubkey) { if let Some(server_tx) = self.daemon_newvm_tx.get(&req.node_pubkey) {
debug!("Found daemon TX for {}. Sending newVMReq {}", req.node_pubkey, req.uuid); debug!(
"Found daemon TX for {}. Sending newVMReq {}",
req.node_pubkey, req.uuid
);
if server_tx.send(req.clone()).await.is_ok() { if server_tx.send(req.clone()).await.is_ok() {
return true; return;
} else { } else {
warn!("Daemon {} RX dropped before sending update. Cleaning memory...", req.node_pubkey); warn!(
self.memory_cleanup(); "Daemon {} RX dropped before sending update. Cleaning memory...",
req.node_pubkey
);
} }
} }
false
}
pub fn del_cli_vmcontract_tx(&self, admin_pubkey: &str) {
self.cli_vmcontract_tx.remove(admin_pubkey);
} }
pub fn insert_contract(&self, contract: Contract) { pub fn insert_contract(&self, contract: Contract) {
@ -287,11 +280,4 @@ impl BrainData {
.filter(|c| c.node_pubkey == node_pubkey) .filter(|c| c.node_pubkey == node_pubkey)
.collect() .collect()
} }
pub fn memory_cleanup(&self) {
self.daemon_newvm_tx
.retain(|_, server_tx| !server_tx.is_closed());
self.daemon_deletevm_tx
.retain(|_, server_tx| !server_tx.is_closed());
}
} }

@ -10,6 +10,7 @@ use brain::brain_daemon_service_server::BrainDaemonService;
use brain::*; use brain::*;
use log::debug; use log::debug;
use log::info; use log::info;
use log::warn;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -57,16 +58,23 @@ impl BrainDaemonService for BrainDaemonMock {
let (grpc_tx, grpc_rx) = mpsc::channel(6); let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6);
self.data self.data
.clone()
.add_daemon_newvm_tx(&req.node_pubkey, data_tx) .add_daemon_newvm_tx(&req.node_pubkey, data_tx)
.await; .await;
let data = self.data.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Some(newvmreq) = data_rx.recv().await { while let Some(newvmreq) = data_rx.recv().await {
debug!( let uuid = newvmreq.uuid.clone();
"received this newvmreq to {}: {newvmreq:?}", debug!("Sending NewVMRequest to {}: {newvmreq:?}", req.node_pubkey);
req.node_pubkey if let Err(e) = grpc_tx.send(Ok(newvmreq)).await {
); warn!("Could not send NewVMRequest to {}: {e:?}", req.node_pubkey);
let _ = grpc_tx.send(Ok(newvmreq)).await; data.submit_vmconfirmation(NewVmConfirmation {
error: "Daemon not connected.".to_string(),
uuid,
..Default::default()
})
.await;
break;
}
} }
}); });
let output_stream = ReceiverStream::new(grpc_rx); let output_stream = ReceiverStream::new(grpc_rx);
@ -87,7 +95,9 @@ impl BrainDaemonService for BrainDaemonMock {
info!("Received confirmation from daemon: {c:?}"); info!("Received confirmation from daemon: {c:?}");
self.data.submit_vmconfirmation(c).await; self.data.submit_vmconfirmation(c).await;
} }
Err(e) => log::warn!("Daemon disconnected from Streaming<NewVmConfirmation>: {e:?}"), Err(e) => {
log::warn!("Daemon disconnected from Streaming<NewVmConfirmation>: {e:?}")
}
} }
} }
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
@ -160,21 +170,21 @@ impl BrainCliService for BrainCliMock {
let req = req.into_inner(); let req = req.into_inner();
info!("New VM requested via CLI: {req:?}"); info!("New VM requested via CLI: {req:?}");
let admin_pubkey = req.admin_pubkey.clone(); let admin_pubkey = req.admin_pubkey.clone();
let (engine_tx, engine_rx) = tokio::sync::oneshot::channel(); let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
if !self.data.submit_newvmrequest(req, engine_tx).await { self.data.submit_newvmrequest(req, oneshot_tx).await;
return Err(Status::unavailable( match oneshot_rx.await {
"The node you picked is currently offline.", Ok(response) => {
));
}
if let Ok(response) = engine_rx.await {
info!("Sending VM confirmation to {admin_pubkey}: {response:?}"); info!("Sending VM confirmation to {admin_pubkey}: {response:?}");
return Ok(Response::new(response)); Ok(Response::new(response))
} }
self.data.del_cli_vmcontract_tx(&admin_pubkey); Err(e) => {
log::error!("Something weird happened. Reached error {e:?}");
Err(Status::unknown( Err(Status::unknown(
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.", "Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
)) ))
} }
}
}
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>; type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn list_vm_contracts( async fn list_vm_contracts(