diff --git a/src/data.rs b/src/data.rs index d082a57..eb3f2e7 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,6 +1,7 @@ #![allow(dead_code)] use crate::grpc::brain as grpc; use dashmap::DashMap; +use log::{debug, info, warn}; use std::sync::RwLock; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Sender as OneshotSender; @@ -53,7 +54,7 @@ impl Into for Node { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Contract { pub uuid: String, pub hostname: String, @@ -120,9 +121,11 @@ impl BrainData { } pub fn insert_node(&self, node: grpc::RegisterNodeRequest) { + info!("Registering node {node:?}"); let mut nodes = self.nodes.write().unwrap(); for n in nodes.iter_mut() { if n.public_key == node.node_pubkey { + info!("Node {} already exists. Updating data.", n.public_key); *n = node.into(); return; } @@ -136,8 +139,17 @@ impl BrainData { pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) { + info!("Found vm {}. Deleting...", delete_vm.uuid); if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) { + debug!( + "TX for daemon {} found. Informing daemon about deletion of {}.", + contract.node_pubkey, delete_vm.uuid + ); if daemon_tx.send(delete_vm.clone()).await.is_err() { + warn!( + "Failed to send deletion request to {}. Triggering memory cleanup.", + contract.node_pubkey + ); self.memory_cleanup(); } } @@ -161,10 +173,18 @@ impl BrainData { pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) { let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) { Some((_, r)) => r, - None => return, + None => { + log::error!("Received confirmation for ghost NewVMReq {}", confirmation.uuid); + return + }, }; if let Some((_, client_tx)) = self.cli_vmcontract_tx.remove(&newvmreq.admin_pubkey) { - let _ = client_tx.send(confirmation.clone()); + if let Err(e) = client_tx.send(confirmation.clone()) { + log::error!( + "CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", + &newvmreq.admin_pubkey, confirmation, e + ); + } } if confirmation.error == "" { return; @@ -184,6 +204,7 @@ impl BrainData { kernel_sha: newvmreq.kernel_sha, dtrfs_sha: newvmreq.dtrfs_sha, }; + info!("Created new contract: {contract:?}"); self.contracts.write().unwrap().push(contract); } @@ -193,13 +214,16 @@ impl BrainData { tx: OneshotSender, ) -> bool { req.uuid = uuid::Uuid::new_v4().to_string(); + info!("Inserting new vm request in memory: {req:?}"); self.tmp_vmrequests.insert(req.uuid.clone(), req.clone()); self.cli_vmcontract_tx .insert(req.admin_pubkey.to_string(), tx); if let Some(server_tx) = self.daemon_newvm_tx.get(&req.node_pubkey) { + debug!("Found daemon TX for {}. Sending newVMReq {}", req.node_pubkey, req.uuid); if server_tx.send(req.clone()).await.is_ok() { return true; } else { + warn!("Daemon {} RX dropped before sending update. Cleaning memory...", req.node_pubkey); self.memory_cleanup(); } } diff --git a/src/grpc.rs b/src/grpc.rs index 03ae715..4f13974 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -53,15 +53,13 @@ impl BrainDaemonService for BrainDaemonMock { req: Request, ) -> Result, Status> { let req = req.into_inner(); - info!( - "Daemon {} connected for GetNewVMReqsStream", - req.node_pubkey - ); + info!("Daemon {} requested GetNewVMReqsStream", req.node_pubkey); let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); self.data .clone() - .add_daemon_newvm_tx(&req.node_pubkey, data_tx).await; + .add_daemon_newvm_tx(&req.node_pubkey, data_tx) + .await; tokio::spawn(async move { while let Some(newvmreq) = data_rx.recv().await { debug!( @@ -81,10 +79,15 @@ impl BrainDaemonService for BrainDaemonMock { &self, req: Request>, ) -> Result, Status> { + debug!("Some node connected to stream NewVmConfirmation"); let mut confirmations = req.into_inner(); while let Some(confirmation) = confirmations.next().await { - if let Ok(confirmation) = confirmation { - self.data.submit_vmconfirmation(confirmation).await; + match confirmation { + Ok(c) => { + info!("Received confirmation from daemon: {c:?}"); + self.data.submit_vmconfirmation(c).await; + } + Err(e) => log::warn!("Daemon disconnected from Streaming: {e:?}"), } } Ok(Response::new(Empty {})) @@ -98,6 +101,7 @@ impl BrainDaemonService for BrainDaemonMock { req: Request, ) -> Result, Status> { let node_pubkey = req.into_inner().node_pubkey; + info!("Daemon {node_pubkey} requested DeletedVMUpdatesStream"); let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); self.data @@ -105,7 +109,18 @@ impl BrainDaemonService for BrainDaemonMock { .add_daemon_deletevm_tx(&node_pubkey, data_tx); tokio::spawn(async move { while let Some(deleted_vm) = data_rx.recv().await { - let _ = grpc_tx.send(Ok(deleted_vm)).await; + match grpc_tx.send(Ok(deleted_vm.clone())).await { + Ok(_) => debug!( + "Sent delete_vm confirmation to {}: {:?}", + node_pubkey, deleted_vm + ), + Err(e) => log::error!( + "Could not send delete_vm confirmation {:?} to {} because of error: {:?}", + deleted_vm, + node_pubkey, + e + ), + } } }); let output_stream = ReceiverStream::new(grpc_rx); @@ -121,6 +136,7 @@ impl BrainDaemonService for BrainDaemonMock { req: Request, ) -> Result, Status> { let req = req.into_inner(); + info!("Node {} requested ListVMContractsStream", req.node_pubkey); let contracts = self.data.find_contracts_by_admin_pubkey(&req.node_pubkey); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { @@ -142,6 +158,7 @@ impl BrainCliService for BrainCliMock { req: Request, ) -> Result, Status> { let req = req.into_inner(); + info!("New VM requested via CLI: {req:?}"); let admin_pubkey = req.admin_pubkey.clone(); let (engine_tx, engine_rx) = tokio::sync::oneshot::channel(); if !self.data.submit_newvmrequest(req, engine_tx).await { @@ -150,6 +167,7 @@ impl BrainCliService for BrainCliMock { )); } if let Ok(response) = engine_rx.await { + info!("Sending VM confirmation to {admin_pubkey}: {response:?}"); return Ok(Response::new(response)); } self.data.del_cli_vmcontract_tx(&admin_pubkey); @@ -164,6 +182,7 @@ impl BrainCliService for BrainCliMock { req: Request, ) -> Result, Status> { let req = req.into_inner(); + info!("CLI {} requested ListVMContractsStream", req.admin_pubkey); let contracts = self.data.find_contracts_by_admin_pubkey(&req.admin_pubkey); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { @@ -183,6 +202,7 @@ impl BrainCliService for BrainCliMock { req: Request, ) -> Result, tonic::Status> { let req = req.into_inner(); + info!("Unknown CLI requested ListNodesStream: {req:?}"); let nodes = self.data.find_nodes_by_filters(&req); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { @@ -197,7 +217,9 @@ impl BrainCliService for BrainCliMock { } async fn delete_vm(&self, req: Request) -> Result, Status> { - self.data.delete_vm(req.into_inner()).await; + let req = req.into_inner(); + info!("Unknown CLI requested to delete vm {}", req.uuid); + self.data.delete_vm(req).await; Ok(Response::new(Empty {})) } }