added guards for all txs

This commit is contained in:
ghe0 2024-12-21 22:22:10 +02:00
parent ec5d266c28
commit 5fb2a941b1
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
2 changed files with 57 additions and 18 deletions

@ -101,14 +101,25 @@ pub struct BrainData {
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>, daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>,
} }
pub struct GuardCliTx { enum TxType {
brain_data: Arc<BrainData>, CliContract,
admin_pubkey: String, DaemonDeleteVm,
DaemonNewVm,
} }
impl Drop for GuardCliTx { pub struct GuardTx {
brain_data: Arc<BrainData>,
key: String,
tx_type: TxType,
}
impl Drop for GuardTx {
fn drop(&mut self) { fn drop(&mut self) {
self.brain_data.del_cli_vmcontract_tx(&self.admin_pubkey); match self.tx_type {
TxType::CliContract => self.brain_data.del_cli_vmcontract_tx(&self.key),
TxType::DaemonDeleteVm => self.brain_data.del_daemon_deletevm_tx(&self.key),
TxType::DaemonNewVm => self.brain_data.del_daemon_newvm_tx(&self.key),
}
} }
} }
@ -124,8 +135,21 @@ impl BrainData {
nodes.push(node.into()); nodes.push(node.into());
} }
pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::DeletedVmUpdate>) { pub fn add_daemon_deletevm_tx(
self: Arc<Self>,
node_pubkey: &str,
tx: Sender<grpc::DeletedVmUpdate>,
) -> GuardTx {
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
GuardTx {
brain_data: self,
key: node_pubkey.to_string(),
tx_type: TxType::DaemonDeleteVm,
}
}
pub fn del_daemon_deletevm_tx(&self, node_pubkey: &str) {
self.daemon_deletevm_tx.remove(&node_pubkey.to_string());
} }
pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) {
@ -138,8 +162,21 @@ impl BrainData {
} }
} }
pub fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmRequest>) { pub fn add_daemon_newvm_tx(
self: Arc<Self>,
node_pubkey: &str,
tx: Sender<grpc::NewVmRequest>,
) -> GuardTx {
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
GuardTx {
brain_data: self,
key: node_pubkey.to_string(),
tx_type: TxType::DaemonNewVm,
}
}
pub fn del_daemon_newvm_tx(&self, node_pubkey: &str) {
self.daemon_newvm_tx.remove(&node_pubkey.to_string());
} }
pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) { pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) {
@ -175,7 +212,7 @@ impl BrainData {
self: Arc<Self>, self: Arc<Self>,
mut req: grpc::NewVmRequest, mut req: grpc::NewVmRequest,
tx: OneshotSender<grpc::NewVmConfirmation>, tx: OneshotSender<grpc::NewVmConfirmation>,
) -> Option<GuardCliTx> { ) -> Option<GuardTx> {
req.uuid = uuid::Uuid::new_v4().to_string(); req.uuid = uuid::Uuid::new_v4().to_string();
self.tmp_vmrequests.insert(req.uuid.clone(), req.clone()); self.tmp_vmrequests.insert(req.uuid.clone(), req.clone());
self.cli_vmcontract_tx self.cli_vmcontract_tx
@ -185,9 +222,10 @@ impl BrainData {
return None; return None;
} }
} }
Some(GuardCliTx { Some(GuardTx {
brain_data: self, brain_data: self,
admin_pubkey: req.admin_pubkey.to_string(), key: req.admin_pubkey.to_string(),
tx_type: TxType::CliContract,
}) })
} }

@ -28,7 +28,7 @@ impl BrainDaemonService for BrainDaemonMock {
req: Request<RegisterNodeRequest>, req: Request<RegisterNodeRequest>,
) -> Result<Response<Empty>, Status> { ) -> Result<Response<Empty>, Status> {
self.data.insert_node(req.into_inner()); self.data.insert_node(req.into_inner());
Ok(Response::new(Empty{})) Ok(Response::new(Empty {}))
} }
type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmRequest, Status>> + Send>>; type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmRequest, Status>> + Send>>;
@ -39,7 +39,9 @@ impl BrainDaemonService for BrainDaemonMock {
) -> Result<Response<Self::GetNewVMReqsStream>, Status> { ) -> Result<Response<Self::GetNewVMReqsStream>, Status> {
let (grpc_tx, grpc_rx) = mpsc::channel(6); let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6);
self.data let _dropper = self
.data
.clone()
.add_daemon_newvm_tx(&req.into_inner().node_pubkey, data_tx); .add_daemon_newvm_tx(&req.into_inner().node_pubkey, data_tx);
tokio::spawn(async move { tokio::spawn(async move {
while let Some(newvmreq) = data_rx.recv().await { while let Some(newvmreq) = data_rx.recv().await {
@ -74,7 +76,9 @@ impl BrainDaemonService for BrainDaemonMock {
) -> Result<Response<Self::DeletedVMUpdatesStream>, Status> { ) -> Result<Response<Self::DeletedVMUpdatesStream>, Status> {
let (grpc_tx, grpc_rx) = mpsc::channel(6); let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6);
self.data let _dropper = self
.data
.clone()
.add_daemon_deletevm_tx(&req.into_inner().node_pubkey, data_tx); .add_daemon_deletevm_tx(&req.into_inner().node_pubkey, data_tx);
tokio::spawn(async move { tokio::spawn(async move {
while let Some(deleted_vm) = data_rx.recv().await { while let Some(deleted_vm) = data_rx.recv().await {
@ -168,11 +172,8 @@ impl BrainCliService for BrainCliMock {
)) ))
} }
async fn delete_vm( async fn delete_vm(&self, req: Request<DeletedVmUpdate>) -> Result<Response<Empty>, Status> {
&self,
req: Request<DeletedVmUpdate>,
) -> Result<Response<Empty>, Status> {
self.data.delete_vm(req.into_inner()).await; self.data.delete_vm(req.into_inner()).await;
Ok(Response::new(Empty{})) Ok(Response::new(Empty {}))
} }
} }