diff --git a/src/data.rs b/src/data.rs index 697c4c9..d082a57 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,7 +1,6 @@ #![allow(dead_code)] use crate::grpc::brain as grpc; use dashmap::DashMap; -use std::sync::Arc; use std::sync::RwLock; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Sender as OneshotSender; @@ -131,36 +130,34 @@ impl BrainData { nodes.push(node.into()); } - pub fn add_daemon_deletevm_tx( - self: Arc, - node_pubkey: &str, - tx: Sender, - ) { + pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender) { self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); } - pub fn del_daemon_deletevm_tx(&self, node_pubkey: &str) { - self.daemon_deletevm_tx.remove(&node_pubkey.to_string()); - } - pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) { if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) { - let _ = daemon_tx.send(delete_vm.clone()).await; + if daemon_tx.send(delete_vm.clone()).await.is_err() { + self.memory_cleanup(); + } } let mut contracts = self.contracts.write().unwrap(); contracts.retain(|c| c.uuid != delete_vm.uuid); } } - pub fn add_daemon_newvm_tx(self: Arc, node_pubkey: &str, tx: Sender) { + pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender) { + for dangling_vm_request in self + .tmp_vmrequests + .iter() + .filter(|req| req.node_pubkey == node_pubkey) + .map(|entry| entry.value().clone()) + { + let _ = tx.send(dangling_vm_request).await; + } self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); } - pub fn del_daemon_newvm_tx(&self, node_pubkey: &str) { - self.daemon_newvm_tx.remove(&node_pubkey.to_string()); - } - pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) { let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) { Some((_, r)) => r, @@ -191,7 +188,7 @@ impl BrainData { } pub async fn submit_newvmrequest( - self: Arc, + &self, mut req: grpc::NewVmRequest, tx: OneshotSender, ) -> bool { @@ -199,12 +196,14 @@ impl BrainData { self.tmp_vmrequests.insert(req.uuid.clone(), req.clone()); self.cli_vmcontract_tx .insert(req.admin_pubkey.to_string(), tx); - if let Some(server_tx) = self.clone().daemon_newvm_tx.get(&req.node_pubkey) { - if server_tx.send(req.clone()).await.is_err() { - return false; + if let Some(server_tx) = self.daemon_newvm_tx.get(&req.node_pubkey) { + if server_tx.send(req.clone()).await.is_ok() { + return true; + } else { + self.memory_cleanup(); } } - true + false } pub fn del_cli_vmcontract_tx(&self, admin_pubkey: &str) { @@ -264,4 +263,11 @@ impl BrainData { .filter(|c| c.node_pubkey == node_pubkey) .collect() } + + pub fn memory_cleanup(&self) { + self.daemon_newvm_tx + .retain(|_, server_tx| !server_tx.is_closed()); + self.daemon_deletevm_tx + .retain(|_, server_tx| !server_tx.is_closed()); + } } diff --git a/src/grpc.rs b/src/grpc.rs index 29cc19d..03ae715 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -61,9 +61,7 @@ impl BrainDaemonService for BrainDaemonMock { let (data_tx, mut data_rx) = mpsc::channel(6); self.data .clone() - .add_daemon_newvm_tx(&req.node_pubkey, data_tx); - let node_pubkey = req.node_pubkey.clone(); - let data = self.data.clone(); + .add_daemon_newvm_tx(&req.node_pubkey, data_tx).await; tokio::spawn(async move { while let Some(newvmreq) = data_rx.recv().await { debug!( @@ -72,7 +70,6 @@ impl BrainDaemonService for BrainDaemonMock { ); let _ = grpc_tx.send(Ok(newvmreq)).await; } - data.del_daemon_newvm_tx(&node_pubkey); }); let output_stream = ReceiverStream::new(grpc_rx); Ok(Response::new( @@ -106,12 +103,10 @@ impl BrainDaemonService for BrainDaemonMock { self.data .clone() .add_daemon_deletevm_tx(&node_pubkey, data_tx); - let data = self.data.clone(); tokio::spawn(async move { while let Some(deleted_vm) = data_rx.recv().await { let _ = grpc_tx.send(Ok(deleted_vm)).await; } - data.del_daemon_deletevm_tx(&node_pubkey); }); let output_stream = ReceiverStream::new(grpc_rx); Ok(Response::new( @@ -149,7 +144,7 @@ impl BrainCliService for BrainCliMock { let req = req.into_inner(); let admin_pubkey = req.admin_pubkey.clone(); let (engine_tx, engine_rx) = tokio::sync::oneshot::channel(); - if !self.data.clone().submit_newvmrequest(req, engine_tx).await { + if !self.data.submit_newvmrequest(req, engine_tx).await { return Err(Status::unavailable( "The node you picked is currently offline.", ));