fixed small memory leaks

This commit is contained in:
ghe0 2024-12-22 16:07:38 +02:00
parent cb7e8f98d6
commit 4a3a6b4fdc
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
2 changed files with 29 additions and 28 deletions

@ -1,7 +1,6 @@
#![allow(dead_code)] #![allow(dead_code)]
use crate::grpc::brain as grpc; use crate::grpc::brain as grpc;
use dashmap::DashMap; use dashmap::DashMap;
use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Sender as OneshotSender; use tokio::sync::oneshot::Sender as OneshotSender;
@ -131,36 +130,34 @@ impl BrainData {
nodes.push(node.into()); nodes.push(node.into());
} }
pub fn add_daemon_deletevm_tx( pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::DeletedVmUpdate>) {
self: Arc<Self>,
node_pubkey: &str,
tx: Sender<grpc::DeletedVmUpdate>,
) {
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
} }
pub fn del_daemon_deletevm_tx(&self, node_pubkey: &str) {
self.daemon_deletevm_tx.remove(&node_pubkey.to_string());
}
pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) {
if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) { if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) {
if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) { if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) {
let _ = daemon_tx.send(delete_vm.clone()).await; if daemon_tx.send(delete_vm.clone()).await.is_err() {
self.memory_cleanup();
}
} }
let mut contracts = self.contracts.write().unwrap(); let mut contracts = self.contracts.write().unwrap();
contracts.retain(|c| c.uuid != delete_vm.uuid); contracts.retain(|c| c.uuid != delete_vm.uuid);
} }
} }
pub fn add_daemon_newvm_tx(self: Arc<Self>, node_pubkey: &str, tx: Sender<grpc::NewVmRequest>) { pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmRequest>) {
for dangling_vm_request in self
.tmp_vmrequests
.iter()
.filter(|req| req.node_pubkey == node_pubkey)
.map(|entry| entry.value().clone())
{
let _ = tx.send(dangling_vm_request).await;
}
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
} }
pub fn del_daemon_newvm_tx(&self, node_pubkey: &str) {
self.daemon_newvm_tx.remove(&node_pubkey.to_string());
}
pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) { pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) {
let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) { let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) {
Some((_, r)) => r, Some((_, r)) => r,
@ -191,7 +188,7 @@ impl BrainData {
} }
pub async fn submit_newvmrequest( pub async fn submit_newvmrequest(
self: Arc<Self>, &self,
mut req: grpc::NewVmRequest, mut req: grpc::NewVmRequest,
tx: OneshotSender<grpc::NewVmConfirmation>, tx: OneshotSender<grpc::NewVmConfirmation>,
) -> bool { ) -> bool {
@ -199,12 +196,14 @@ impl BrainData {
self.tmp_vmrequests.insert(req.uuid.clone(), req.clone()); self.tmp_vmrequests.insert(req.uuid.clone(), req.clone());
self.cli_vmcontract_tx self.cli_vmcontract_tx
.insert(req.admin_pubkey.to_string(), tx); .insert(req.admin_pubkey.to_string(), tx);
if let Some(server_tx) = self.clone().daemon_newvm_tx.get(&req.node_pubkey) { if let Some(server_tx) = self.daemon_newvm_tx.get(&req.node_pubkey) {
if server_tx.send(req.clone()).await.is_err() { if server_tx.send(req.clone()).await.is_ok() {
return false; return true;
} else {
self.memory_cleanup();
} }
} }
true false
} }
pub fn del_cli_vmcontract_tx(&self, admin_pubkey: &str) { pub fn del_cli_vmcontract_tx(&self, admin_pubkey: &str) {
@ -264,4 +263,11 @@ impl BrainData {
.filter(|c| c.node_pubkey == node_pubkey) .filter(|c| c.node_pubkey == node_pubkey)
.collect() .collect()
} }
pub fn memory_cleanup(&self) {
self.daemon_newvm_tx
.retain(|_, server_tx| !server_tx.is_closed());
self.daemon_deletevm_tx
.retain(|_, server_tx| !server_tx.is_closed());
}
} }

@ -61,9 +61,7 @@ impl BrainDaemonService for BrainDaemonMock {
let (data_tx, mut data_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6);
self.data self.data
.clone() .clone()
.add_daemon_newvm_tx(&req.node_pubkey, data_tx); .add_daemon_newvm_tx(&req.node_pubkey, data_tx).await;
let node_pubkey = req.node_pubkey.clone();
let data = self.data.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Some(newvmreq) = data_rx.recv().await { while let Some(newvmreq) = data_rx.recv().await {
debug!( debug!(
@ -72,7 +70,6 @@ impl BrainDaemonService for BrainDaemonMock {
); );
let _ = grpc_tx.send(Ok(newvmreq)).await; let _ = grpc_tx.send(Ok(newvmreq)).await;
} }
data.del_daemon_newvm_tx(&node_pubkey);
}); });
let output_stream = ReceiverStream::new(grpc_rx); let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new( Ok(Response::new(
@ -106,12 +103,10 @@ impl BrainDaemonService for BrainDaemonMock {
self.data self.data
.clone() .clone()
.add_daemon_deletevm_tx(&node_pubkey, data_tx); .add_daemon_deletevm_tx(&node_pubkey, data_tx);
let data = self.data.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Some(deleted_vm) = data_rx.recv().await { while let Some(deleted_vm) = data_rx.recv().await {
let _ = grpc_tx.send(Ok(deleted_vm)).await; let _ = grpc_tx.send(Ok(deleted_vm)).await;
} }
data.del_daemon_deletevm_tx(&node_pubkey);
}); });
let output_stream = ReceiverStream::new(grpc_rx); let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new( Ok(Response::new(
@ -149,7 +144,7 @@ impl BrainCliService for BrainCliMock {
let req = req.into_inner(); let req = req.into_inner();
let admin_pubkey = req.admin_pubkey.clone(); let admin_pubkey = req.admin_pubkey.clone();
let (engine_tx, engine_rx) = tokio::sync::oneshot::channel(); let (engine_tx, engine_rx) = tokio::sync::oneshot::channel();
if !self.data.clone().submit_newvmrequest(req, engine_tx).await { if !self.data.submit_newvmrequest(req, engine_tx).await {
return Err(Status::unavailable( return Err(Status::unavailable(
"The node you picked is currently offline.", "The node you picked is currently offline.",
)); ));