diff --git a/src/data.rs b/src/data.rs index a11e929..697c4c9 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,7 +1,6 @@ #![allow(dead_code)] use crate::grpc::brain as grpc; use dashmap::DashMap; -use log::debug; use std::sync::Arc; use std::sync::RwLock; use tokio::sync::mpsc::Sender; @@ -109,23 +108,6 @@ enum TxType { DaemonNewVm, } -pub struct GuardTx { - brain_data: Arc, - key: String, - tx_type: TxType, -} - -impl Drop for GuardTx { - fn drop(&mut self) { - debug!("Dropping {:?} for {}", self.tx_type, self.key); - // match self.tx_type { - // TxType::CliContract => self.brain_data.del_cli_vmcontract_tx(&self.key), - // TxType::DaemonDeleteVm => self.brain_data.del_daemon_deletevm_tx(&self.key), - // TxType::DaemonNewVm => self.brain_data.del_daemon_newvm_tx(&self.key), - // } - } -} - impl BrainData { pub fn new() -> Self { Self { @@ -153,13 +135,8 @@ impl BrainData { self: Arc, node_pubkey: &str, tx: Sender, - ) -> GuardTx { + ) { self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); - GuardTx { - brain_data: self, - key: node_pubkey.to_string(), - tx_type: TxType::DaemonDeleteVm, - } } pub fn del_daemon_deletevm_tx(&self, node_pubkey: &str) { @@ -176,17 +153,8 @@ impl BrainData { } } - pub fn add_daemon_newvm_tx( - self: Arc, - node_pubkey: &str, - tx: Sender, - ) -> GuardTx { + pub fn add_daemon_newvm_tx(self: Arc, node_pubkey: &str, tx: Sender) { self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); - GuardTx { - brain_data: self, - key: node_pubkey.to_string(), - tx_type: TxType::DaemonNewVm, - } } pub fn del_daemon_newvm_tx(&self, node_pubkey: &str) { @@ -226,21 +194,17 @@ impl BrainData { self: Arc, mut req: grpc::NewVmRequest, tx: OneshotSender, - ) -> Option { + ) -> bool { req.uuid = uuid::Uuid::new_v4().to_string(); self.tmp_vmrequests.insert(req.uuid.clone(), req.clone()); self.cli_vmcontract_tx .insert(req.admin_pubkey.to_string(), tx); if let Some(server_tx) = self.clone().daemon_newvm_tx.get(&req.node_pubkey) { if server_tx.send(req.clone()).await.is_err() { - return None; + return false; } } - Some(GuardTx { - brain_data: self, - key: req.admin_pubkey.to_string(), - tx_type: TxType::CliContract, - }) + true } pub fn del_cli_vmcontract_tx(&self, admin_pubkey: &str) { diff --git a/src/grpc.rs b/src/grpc.rs index 49bbbfa..29cc19d 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -59,8 +59,7 @@ impl BrainDaemonService for BrainDaemonMock { ); let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); - self - .data + self.data .clone() .add_daemon_newvm_tx(&req.node_pubkey, data_tx); let node_pubkey = req.node_pubkey.clone(); @@ -73,8 +72,7 @@ impl BrainDaemonService for BrainDaemonMock { ); let _ = grpc_tx.send(Ok(newvmreq)).await; } - let _ = dropper; - data.del_cli_vmcontract_tx(&node_pubkey); + data.del_daemon_newvm_tx(&node_pubkey); }); let output_stream = ReceiverStream::new(grpc_rx); Ok(Response::new( @@ -102,16 +100,18 @@ impl BrainDaemonService for BrainDaemonMock { &self, req: Request, ) -> Result, Status> { + let node_pubkey = req.into_inner().node_pubkey; let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); - let _dropper = self - .data + self.data .clone() - .add_daemon_deletevm_tx(&req.into_inner().node_pubkey, data_tx); + .add_daemon_deletevm_tx(&node_pubkey, data_tx); + let data = self.data.clone(); tokio::spawn(async move { while let Some(deleted_vm) = data_rx.recv().await { let _ = grpc_tx.send(Ok(deleted_vm)).await; } + data.del_daemon_deletevm_tx(&node_pubkey); }); let output_stream = ReceiverStream::new(grpc_rx); Ok(Response::new( @@ -147,9 +147,9 @@ impl BrainCliService for BrainCliMock { req: Request, ) -> Result, Status> { let req = req.into_inner(); + let admin_pubkey = req.admin_pubkey.clone(); let (engine_tx, engine_rx) = tokio::sync::oneshot::channel(); - let dropper = self.data.clone().submit_newvmrequest(req, engine_tx).await; - if dropper.is_none() { + if !self.data.clone().submit_newvmrequest(req, engine_tx).await { return Err(Status::unavailable( "The node you picked is currently offline.", )); @@ -157,6 +157,7 @@ impl BrainCliService for BrainCliMock { if let Ok(response) = engine_rx.await { return Ok(Response::new(response)); } + self.data.del_cli_vmcontract_tx(&admin_pubkey); Err(Status::unknown( "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", ))