diff --git a/src/data.rs b/src/data.rs index 085cd5f..af87d1f 100644 --- a/src/data.rs +++ b/src/data.rs @@ -101,14 +101,25 @@ pub struct BrainData { daemon_newvm_tx: DashMap>, } -pub struct GuardCliTx { - brain_data: Arc, - admin_pubkey: String, +enum TxType { + CliContract, + DaemonDeleteVm, + DaemonNewVm, } -impl Drop for GuardCliTx { +pub struct GuardTx { + brain_data: Arc, + key: String, + tx_type: TxType, +} + +impl Drop for GuardTx { fn drop(&mut self) { - self.brain_data.del_cli_vmcontract_tx(&self.admin_pubkey); + match self.tx_type { + TxType::CliContract => self.brain_data.del_cli_vmcontract_tx(&self.key), + TxType::DaemonDeleteVm => self.brain_data.del_daemon_deletevm_tx(&self.key), + TxType::DaemonNewVm => self.brain_data.del_daemon_newvm_tx(&self.key), + } } } @@ -124,8 +135,21 @@ impl BrainData { nodes.push(node.into()); } - pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender) { + pub fn add_daemon_deletevm_tx( + self: Arc, + node_pubkey: &str, + tx: Sender, + ) -> GuardTx { self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); + GuardTx { + brain_data: self, + key: node_pubkey.to_string(), + tx_type: TxType::DaemonDeleteVm, + } + } + + pub fn del_daemon_deletevm_tx(&self, node_pubkey: &str) { + self.daemon_deletevm_tx.remove(&node_pubkey.to_string()); } pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { @@ -138,8 +162,21 @@ impl BrainData { } } - pub fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender) { + pub fn add_daemon_newvm_tx( + self: Arc, + node_pubkey: &str, + tx: Sender, + ) -> GuardTx { self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); + GuardTx { + brain_data: self, + key: node_pubkey.to_string(), + tx_type: TxType::DaemonNewVm, + } + } + + pub fn del_daemon_newvm_tx(&self, node_pubkey: &str) { + self.daemon_newvm_tx.remove(&node_pubkey.to_string()); } pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) { @@ -175,7 +212,7 @@ impl BrainData { self: Arc, mut req: grpc::NewVmRequest, tx: OneshotSender, - ) -> Option { + ) -> Option { req.uuid = uuid::Uuid::new_v4().to_string(); self.tmp_vmrequests.insert(req.uuid.clone(), req.clone()); self.cli_vmcontract_tx @@ -185,9 +222,10 @@ impl BrainData { return None; } } - Some(GuardCliTx { + Some(GuardTx { brain_data: self, - admin_pubkey: req.admin_pubkey.to_string(), + key: req.admin_pubkey.to_string(), + tx_type: TxType::CliContract, }) } diff --git a/src/grpc.rs b/src/grpc.rs index 063e96e..e3e79f4 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -28,7 +28,7 @@ impl BrainDaemonService for BrainDaemonMock { req: Request, ) -> Result, Status> { self.data.insert_node(req.into_inner()); - Ok(Response::new(Empty{})) + Ok(Response::new(Empty {})) } type GetNewVMReqsStream = Pin> + Send>>; @@ -39,7 +39,9 @@ impl BrainDaemonService for BrainDaemonMock { ) -> Result, Status> { let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); - self.data + let _dropper = self + .data + .clone() .add_daemon_newvm_tx(&req.into_inner().node_pubkey, data_tx); tokio::spawn(async move { while let Some(newvmreq) = data_rx.recv().await { @@ -74,7 +76,9 @@ impl BrainDaemonService for BrainDaemonMock { ) -> Result, Status> { let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); - self.data + let _dropper = self + .data + .clone() .add_daemon_deletevm_tx(&req.into_inner().node_pubkey, data_tx); tokio::spawn(async move { while let Some(deleted_vm) = data_rx.recv().await { @@ -168,11 +172,8 @@ impl BrainCliService for BrainCliMock { )) } - async fn delete_vm( - &self, - req: Request, - ) -> Result, Status> { + async fn delete_vm(&self, req: Request) -> Result, Status> { self.data.delete_vm(req.into_inner()).await; - Ok(Response::new(Empty{})) + Ok(Response::new(Empty {})) } }