From 230eb8cfbd52a80f658eaab790f27bf65fa972be Mon Sep 17 00:00:00 2001 From: Noor Date: Mon, 3 Feb 2025 18:20:21 +0530 Subject: [PATCH] delete container implemented refactor daemon message --- Cargo.lock | 2 +- src/data.rs | 81 +++++++++++++++++++++++++++++++++++++++++++++-------- src/grpc.rs | 42 +++++++++++++++++---------- 3 files changed, 98 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5db03a2..7509f23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,7 +329,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#6e1b1853838905c44d535d984d1221dd5d0dc2bc" +source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#7c9f66a7394c06ad8af0934e34b113f9c965bc98" dependencies = [ "base64", "prost", diff --git a/src/data.rs b/src/data.rs index c3e3a7f..cf095ad 100644 --- a/src/data.rs +++ b/src/data.rs @@ -8,7 +8,7 @@ use std::sync::RwLock; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Sender as OneshotSender; -use detee_shared::pb::daemon::{BrainMessage as BrainMessageSgx, NewContainerRes}; +use detee_shared::pb::daemon as daemonPb; use detee_shared::pb::shared as sharedPb; #[derive(thiserror::Error, Debug)] @@ -202,8 +202,14 @@ pub struct BrainData { daemon_tx: DashMap>, app_nodes: RwLock>, - app_daemon_tx: DashMap>, - tmp_new_container_reqs: DashMap)>, + app_daemon_tx: DashMap>, + tmp_new_container_reqs: DashMap< + String, + ( + sharedPb::Container, + OneshotSender, + ), + >, app_contracts: RwLock>, } @@ -738,7 +744,7 @@ impl BrainData { } impl BrainData { - pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender) { + pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender) { self.app_daemon_tx.insert(node_pubkey.to_string(), tx); } @@ -747,7 +753,6 @@ impl BrainData { } pub fn insert_app_node(&self, node: AppNode) { - info!("Registering app node {node:?}"); let mut nodes = self.app_nodes.write().unwrap(); for n in nodes.iter_mut() { if n.public_key == node.public_key { @@ -760,6 +765,25 @@ impl BrainData { nodes.push(node); } + pub fn find_app_contract_by_uuid(&self, uuid: &str) -> Option { + let contracts = self.app_contracts.read().unwrap(); + contracts.iter().cloned().find(|c| c.uuid == uuid) + } + + pub fn find_app_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec { + debug!("Searching contracts for admin pubkey {admin_pubkey}"); + let contracts: Vec = self + .app_contracts + .read() + .unwrap() + .iter() + .filter(|c| c.admin_pubkey == admin_pubkey) + .cloned() + .collect(); + debug!("Found {} contracts or {admin_pubkey}.", contracts.len()); + contracts + } + pub fn find_app_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec { let app_contracts = self.app_contracts.read().unwrap(); app_contracts @@ -772,7 +796,7 @@ impl BrainData { pub async fn send_new_container_req( &self, mut req: sharedPb::Container, - tx: OneshotSender, + tx: OneshotSender, ) { req.uuid = uuid::Uuid::new_v4().to_string(); @@ -785,7 +809,7 @@ impl BrainData { "Found daemon TX for {}. Sending newVMReq {}", req.node_pubkey, req.uuid ); - let msg = BrainMessageSgx { + let msg = daemonPb::BrainMessage { msg: Some( detee_shared::pb::daemon::brain_message::Msg::NewContainerReq(req.clone()), ), @@ -797,7 +821,7 @@ impl BrainData { ); info!("Deleting daemon TX for {}", req.node_pubkey); self.del_app_daemon_tx(&req.node_pubkey); - self.send_new_container_resp(NewContainerRes { + self.send_new_container_resp(daemonPb::NewContainerRes { uuid: req.uuid, status: "Failed".to_string(), error: "Daemon is offline.".to_string(), @@ -808,9 +832,44 @@ impl BrainData { } } - pub async fn send_new_container_resp(&self, new_container_resp: NewContainerRes) { - dbg!(&new_container_resp.uuid); - dbg!(&self.tmp_new_container_reqs); + pub async fn send_del_container_req( + &self, + req: daemonPb::ContainerFilters, + ) -> Result<(), Box> { + if let Some(app_contract) = self.find_app_contract_by_uuid(req.uuid()) { + info!("Found app contract {}. Deleting...", req.uuid()); + if let Some(app_daemon_tx) = self.app_daemon_tx.get(&app_contract.node_pubkey) { + debug!( + "TX for daemon {} found. Informing daemon about deletion of {}.", + app_contract.node_pubkey, + req.uuid() + ); + let msg = daemonPb::BrainMessage { + msg: Some( + detee_shared::pb::daemon::brain_message::Msg::DeleteContainer(req.clone()), + ), + }; + + if let Err(e) = app_daemon_tx.send(msg).await { + warn!( + "Failed to send deletion request to {} due to error: {e:?}", + app_contract.node_pubkey + ); + info!("Deleting daemon TX for {}", app_contract.node_pubkey); + self.del_app_daemon_tx(&app_contract.node_pubkey); + } + } + + let mut app_contracts = self.app_contracts.write().unwrap(); + app_contracts.retain(|c| c.uuid != req.uuid()); + + return Ok(()); + } else { + Err("Contract not found".into()) + } + } + + pub async fn send_new_container_resp(&self, new_container_resp: daemonPb::NewContainerRes) { let new_container_req = match self.tmp_new_container_reqs.remove(&new_container_resp.uuid) { Some((_, r)) => r, None => { diff --git a/src/grpc.rs b/src/grpc.rs index 5353f35..b62e9d4 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -17,8 +17,7 @@ use tonic::{Request, Response, Status, Streaming}; use detee_shared::pb::daemon::{ brain_sgx_cli_server::BrainSgxCli, brain_sgx_daemon_server::BrainSgxDaemon, - daemon_message as sgx_daemon_message, ContainerFilters, ContainerListResp, DeleteContainerRes, - NewContainerRes, + daemon_message as sgx_daemon_message, ContainerFilters, ContainerListResp, NewContainerRes, }; pub struct BrainDaemonMock { @@ -302,11 +301,18 @@ impl BrainSgxCli for BrainSgxCliMock { async fn delete_container( &self, req: tonic::Request, - ) -> Result, Status> { - dbg!(req); - Ok(Response::new(DeleteContainerRes { - ..Default::default() - })) + ) -> Result, Status> { + let req = req.into_inner(); + log::info!( + "deleting container: {}", + req.uuid.clone().unwrap_or_default() + ); + if let Err(er) = self.data.send_del_container_req(req).await { + info!("Could not delete container: {er}"); + return Err(Status::not_found("Could not find container")); + }; + + Ok(Response::new(detee_shared::pb::shared::Empty {})) } async fn list_containers( @@ -342,8 +348,12 @@ impl BrainSgxDaemon for BrainSgxDaemonMock { &self, req: tonic::Request, ) -> Result, Status> { - log::info!("registering app node : {:?}", &req); let req_data = req.into_inner(); + log::info!( + "registering app node_key : {}, owner_key: {}", + &req_data.node_pubkey, + &req_data.owner_pubkey + ); let app_node = crate::data::AppNode { public_key: req_data.node_pubkey.clone(), @@ -390,24 +400,26 @@ impl BrainSgxDaemon for BrainSgxDaemonMock { &self, req: tonic::Request>, ) -> Result, Status> { - dbg!(&req); let mut req_stream = req.into_inner(); + let mut pubkey = String::new(); while let Some(daemon_message) = req_stream.next().await { match daemon_message { Ok(msg) => match msg.msg { - Some(sgx_daemon_message::Msg::NewContainerResp(new_cont)) => { - dbg!(&new_cont); - self.data.send_new_container_resp(new_cont).await; + Some(sgx_daemon_message::Msg::Pubkey(something)) => { + pubkey = something.pubkey; } - Some(something) => { - dbg!(something); + Some(sgx_daemon_message::Msg::NewContainerResp(new_cont)) => { + self.data.send_new_container_resp(new_cont).await; } None => { dbg!("None"); } }, - Err(_) => todo!(), + Err(e) => { + log::warn!("Daemon disconnected: {e:?}"); + self.data.del_app_daemon_tx(&pubkey); + } } // }