forked from ghe0/brain-mock
removing guard and hoping there are no leaks
This commit is contained in:
parent
5910e63440
commit
cb7e8f98d6
46
src/data.rs
46
src/data.rs
@ -1,7 +1,6 @@
|
|||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
use crate::grpc::brain as grpc;
|
use crate::grpc::brain as grpc;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use log::debug;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
@ -109,23 +108,6 @@ enum TxType {
|
|||||||
DaemonNewVm,
|
DaemonNewVm,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct GuardTx {
|
|
||||||
brain_data: Arc<BrainData>,
|
|
||||||
key: String,
|
|
||||||
tx_type: TxType,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for GuardTx {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
debug!("Dropping {:?} for {}", self.tx_type, self.key);
|
|
||||||
// match self.tx_type {
|
|
||||||
// TxType::CliContract => self.brain_data.del_cli_vmcontract_tx(&self.key),
|
|
||||||
// TxType::DaemonDeleteVm => self.brain_data.del_daemon_deletevm_tx(&self.key),
|
|
||||||
// TxType::DaemonNewVm => self.brain_data.del_daemon_newvm_tx(&self.key),
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BrainData {
|
impl BrainData {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -153,13 +135,8 @@ impl BrainData {
|
|||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
node_pubkey: &str,
|
node_pubkey: &str,
|
||||||
tx: Sender<grpc::DeletedVmUpdate>,
|
tx: Sender<grpc::DeletedVmUpdate>,
|
||||||
) -> GuardTx {
|
) {
|
||||||
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
|
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
|
||||||
GuardTx {
|
|
||||||
brain_data: self,
|
|
||||||
key: node_pubkey.to_string(),
|
|
||||||
tx_type: TxType::DaemonDeleteVm,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn del_daemon_deletevm_tx(&self, node_pubkey: &str) {
|
pub fn del_daemon_deletevm_tx(&self, node_pubkey: &str) {
|
||||||
@ -176,17 +153,8 @@ impl BrainData {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_daemon_newvm_tx(
|
pub fn add_daemon_newvm_tx(self: Arc<Self>, node_pubkey: &str, tx: Sender<grpc::NewVmRequest>) {
|
||||||
self: Arc<Self>,
|
|
||||||
node_pubkey: &str,
|
|
||||||
tx: Sender<grpc::NewVmRequest>,
|
|
||||||
) -> GuardTx {
|
|
||||||
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
|
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
|
||||||
GuardTx {
|
|
||||||
brain_data: self,
|
|
||||||
key: node_pubkey.to_string(),
|
|
||||||
tx_type: TxType::DaemonNewVm,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn del_daemon_newvm_tx(&self, node_pubkey: &str) {
|
pub fn del_daemon_newvm_tx(&self, node_pubkey: &str) {
|
||||||
@ -226,21 +194,17 @@ impl BrainData {
|
|||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut req: grpc::NewVmRequest,
|
mut req: grpc::NewVmRequest,
|
||||||
tx: OneshotSender<grpc::NewVmConfirmation>,
|
tx: OneshotSender<grpc::NewVmConfirmation>,
|
||||||
) -> Option<GuardTx> {
|
) -> bool {
|
||||||
req.uuid = uuid::Uuid::new_v4().to_string();
|
req.uuid = uuid::Uuid::new_v4().to_string();
|
||||||
self.tmp_vmrequests.insert(req.uuid.clone(), req.clone());
|
self.tmp_vmrequests.insert(req.uuid.clone(), req.clone());
|
||||||
self.cli_vmcontract_tx
|
self.cli_vmcontract_tx
|
||||||
.insert(req.admin_pubkey.to_string(), tx);
|
.insert(req.admin_pubkey.to_string(), tx);
|
||||||
if let Some(server_tx) = self.clone().daemon_newvm_tx.get(&req.node_pubkey) {
|
if let Some(server_tx) = self.clone().daemon_newvm_tx.get(&req.node_pubkey) {
|
||||||
if server_tx.send(req.clone()).await.is_err() {
|
if server_tx.send(req.clone()).await.is_err() {
|
||||||
return None;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(GuardTx {
|
true
|
||||||
brain_data: self,
|
|
||||||
key: req.admin_pubkey.to_string(),
|
|
||||||
tx_type: TxType::CliContract,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn del_cli_vmcontract_tx(&self, admin_pubkey: &str) {
|
pub fn del_cli_vmcontract_tx(&self, admin_pubkey: &str) {
|
||||||
|
19
src/grpc.rs
19
src/grpc.rs
@ -59,8 +59,7 @@ impl BrainDaemonService for BrainDaemonMock {
|
|||||||
);
|
);
|
||||||
let (grpc_tx, grpc_rx) = mpsc::channel(6);
|
let (grpc_tx, grpc_rx) = mpsc::channel(6);
|
||||||
let (data_tx, mut data_rx) = mpsc::channel(6);
|
let (data_tx, mut data_rx) = mpsc::channel(6);
|
||||||
self
|
self.data
|
||||||
.data
|
|
||||||
.clone()
|
.clone()
|
||||||
.add_daemon_newvm_tx(&req.node_pubkey, data_tx);
|
.add_daemon_newvm_tx(&req.node_pubkey, data_tx);
|
||||||
let node_pubkey = req.node_pubkey.clone();
|
let node_pubkey = req.node_pubkey.clone();
|
||||||
@ -73,8 +72,7 @@ impl BrainDaemonService for BrainDaemonMock {
|
|||||||
);
|
);
|
||||||
let _ = grpc_tx.send(Ok(newvmreq)).await;
|
let _ = grpc_tx.send(Ok(newvmreq)).await;
|
||||||
}
|
}
|
||||||
let _ = dropper;
|
data.del_daemon_newvm_tx(&node_pubkey);
|
||||||
data.del_cli_vmcontract_tx(&node_pubkey);
|
|
||||||
});
|
});
|
||||||
let output_stream = ReceiverStream::new(grpc_rx);
|
let output_stream = ReceiverStream::new(grpc_rx);
|
||||||
Ok(Response::new(
|
Ok(Response::new(
|
||||||
@ -102,16 +100,18 @@ impl BrainDaemonService for BrainDaemonMock {
|
|||||||
&self,
|
&self,
|
||||||
req: Request<NodePubkey>,
|
req: Request<NodePubkey>,
|
||||||
) -> Result<Response<Self::DeletedVMUpdatesStream>, Status> {
|
) -> Result<Response<Self::DeletedVMUpdatesStream>, Status> {
|
||||||
|
let node_pubkey = req.into_inner().node_pubkey;
|
||||||
let (grpc_tx, grpc_rx) = mpsc::channel(6);
|
let (grpc_tx, grpc_rx) = mpsc::channel(6);
|
||||||
let (data_tx, mut data_rx) = mpsc::channel(6);
|
let (data_tx, mut data_rx) = mpsc::channel(6);
|
||||||
let _dropper = self
|
self.data
|
||||||
.data
|
|
||||||
.clone()
|
.clone()
|
||||||
.add_daemon_deletevm_tx(&req.into_inner().node_pubkey, data_tx);
|
.add_daemon_deletevm_tx(&node_pubkey, data_tx);
|
||||||
|
let data = self.data.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Some(deleted_vm) = data_rx.recv().await {
|
while let Some(deleted_vm) = data_rx.recv().await {
|
||||||
let _ = grpc_tx.send(Ok(deleted_vm)).await;
|
let _ = grpc_tx.send(Ok(deleted_vm)).await;
|
||||||
}
|
}
|
||||||
|
data.del_daemon_deletevm_tx(&node_pubkey);
|
||||||
});
|
});
|
||||||
let output_stream = ReceiverStream::new(grpc_rx);
|
let output_stream = ReceiverStream::new(grpc_rx);
|
||||||
Ok(Response::new(
|
Ok(Response::new(
|
||||||
@ -147,9 +147,9 @@ impl BrainCliService for BrainCliMock {
|
|||||||
req: Request<NewVmRequest>,
|
req: Request<NewVmRequest>,
|
||||||
) -> Result<Response<NewVmConfirmation>, Status> {
|
) -> Result<Response<NewVmConfirmation>, Status> {
|
||||||
let req = req.into_inner();
|
let req = req.into_inner();
|
||||||
|
let admin_pubkey = req.admin_pubkey.clone();
|
||||||
let (engine_tx, engine_rx) = tokio::sync::oneshot::channel();
|
let (engine_tx, engine_rx) = tokio::sync::oneshot::channel();
|
||||||
let dropper = self.data.clone().submit_newvmrequest(req, engine_tx).await;
|
if !self.data.clone().submit_newvmrequest(req, engine_tx).await {
|
||||||
if dropper.is_none() {
|
|
||||||
return Err(Status::unavailable(
|
return Err(Status::unavailable(
|
||||||
"The node you picked is currently offline.",
|
"The node you picked is currently offline.",
|
||||||
));
|
));
|
||||||
@ -157,6 +157,7 @@ impl BrainCliService for BrainCliMock {
|
|||||||
if let Ok(response) = engine_rx.await {
|
if let Ok(response) = engine_rx.await {
|
||||||
return Ok(Response::new(response));
|
return Ok(Response::new(response));
|
||||||
}
|
}
|
||||||
|
self.data.del_cli_vmcontract_tx(&admin_pubkey);
|
||||||
Err(Status::unknown(
|
Err(Status::unknown(
|
||||||
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
|
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
|
||||||
))
|
))
|
||||||
|
Loading…
Reference in New Issue
Block a user