From 8f40adcbf80f53b01bc677e9c6d20c4fdbc6146d Mon Sep 17 00:00:00 2001 From: Noor Date: Thu, 30 Jan 2025 18:31:10 +0530 Subject: [PATCH] redirect sgx container create call to daemon and response to cli --- Cargo.lock | 2 +- src/data.rs | 81 +++++++++++++++++++++++++++++++++++++++++++++++++ src/grpc.rs | 86 ++++++++++++++++++++++++++++++++++++++--------------- 3 files changed, 144 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 38d60b0..41262fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,7 +329,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#3e783b11bab6894b6f98bd3c6ce44e8bf5b1f78b" +source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#a2899ba5a25794aec60a093695675ff24f967484" dependencies = [ "base64", "prost", diff --git a/src/data.rs b/src/data.rs index b2d0ce3..bdff0b9 100644 --- a/src/data.rs +++ b/src/data.rs @@ -8,6 +8,10 @@ use std::sync::RwLock; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Sender as OneshotSender; +use detee_shared::pb::daemon::{BrainMessage as BrainMessageSgx, NewContainerRes}; +use detee_shared::pb::shared as sharedPb; +use detee_shared::pb::shared::Container; + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("We do not allow locking of more than 100000 tokens.")] @@ -143,6 +147,9 @@ pub struct BrainData { tmp_newvm_reqs: DashMap)>, tmp_updatevm_reqs: DashMap)>, daemon_tx: DashMap>, + + sgx_daemon_tx: DashMap>, + tmp_new_container_reqs: DashMap)>, } #[derive(Debug)] @@ -161,6 +168,9 @@ impl BrainData { tmp_newvm_reqs: DashMap::new(), tmp_updatevm_reqs: DashMap::new(), daemon_tx: DashMap::new(), + + sgx_daemon_tx: DashMap::new(), + tmp_new_container_reqs: DashMap::new(), } } @@ -669,3 +679,74 @@ impl BrainData { .collect() } } + +impl BrainData { + pub fn add_sgx_daemon_tx(&self, node_pubkey: &str, tx: Sender) { + self.sgx_daemon_tx.insert(node_pubkey.to_string(), tx); + } + + pub fn del_sgx_daemon_tx(&self, node_pubkey: &str) { + self.sgx_daemon_tx.remove(node_pubkey); + } + + pub async fn send_new_container_req( + &self, + mut req: Container, + tx: OneshotSender, + ) { + req.uuid = uuid::Uuid::new_v4().to_string(); + + info!("Inserting new container request in memory: {req:?}"); + self.tmp_new_container_reqs + .insert(req.uuid.clone(), (req.clone(), tx)); + + if let Some(sgx_daemon_tx) = self.sgx_daemon_tx.get(&req.node_pubkey) { + debug!( + "Found daemon TX for {}. Sending newVMReq {}", + req.node_pubkey, req.uuid + ); + let msg = BrainMessageSgx { + msg: Some( + detee_shared::pb::daemon::brain_message::Msg::NewContainerReq(req.clone()), + ), + }; + if let Err(e) = sgx_daemon_tx.send(msg).await { + warn!( + "Failed to send new container request to {} due to error: {e:?}", + req.node_pubkey + ); + info!("Deleting daemon TX for {}", req.node_pubkey); + self.del_sgx_daemon_tx(&req.node_pubkey); + self.send_new_container_resp(NewContainerRes { + uuid: req.uuid, + status: "Failed".to_string(), + error: "Daemon is offline.".to_string(), + ..Default::default() + }) + .await; + } + } + } + + pub async fn send_new_container_resp(&self, new_container_resp: NewContainerRes) { + dbg!(&new_container_resp.uuid); + dbg!(&self.tmp_new_container_reqs); + let new_container_req = match self.tmp_new_container_reqs.remove(&new_container_resp.uuid) { + Some((_, r)) => r, + None => { + log::error!( + "Received confirmation for ghost new container req {}", + new_container_resp.uuid + ); + return; + } + }; + if let Err(_) = new_container_req.1.send(new_container_resp.clone()) { + log::error!( + "CLI RX for {} dropped before receiving confirmation {:?}.", + &new_container_req.0.admin_pubkey, + new_container_resp, + ); + } + } +} diff --git a/src/grpc.rs b/src/grpc.rs index 7b4c1d5..dd98efe 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -16,8 +16,9 @@ use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; use detee_shared::pb::daemon::{ - brain_sgx_cli_server::BrainSgxCli, brain_sgx_daemon_server::BrainSgxDaemon, ContainerFilters, - ContainerListResp, DeleteContainerRes, NewContainerRes, + brain_sgx_cli_server::BrainSgxCli, brain_sgx_daemon_server::BrainSgxDaemon, + daemon_message as sgx_daemon_message, ContainerFilters, ContainerListResp, DeleteContainerRes, + NewContainerRes, }; pub struct BrainDaemonMock { @@ -278,10 +279,24 @@ impl BrainSgxCli for BrainSgxCliMock { &self, req: tonic::Request, ) -> Result, Status> { - dbg!(req); - Ok(Response::new(NewContainerRes { - ..Default::default() - })) + let req = req.into_inner(); + log::info!("Creating new container: {req:?}"); + let admin_pubkey = req.admin_pubkey.clone(); + let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); + self.data.send_new_container_req(req, oneshot_tx).await; + + match oneshot_rx.await { + Ok(response) => { + info!("responding container confirmation to {admin_pubkey}: {response:?}"); + Ok(Response::new(response)) + } + Err(e) => { + log::error!("Something went wrong. Reached error {e:?}"); + Err(Status::unknown( + "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", + )) + } + } } async fn delete_container( @@ -331,33 +346,56 @@ impl BrainSgxDaemon for BrainSgxDaemonMock { let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { - for _ in 0..5 { - let _ = tx - .send(detee_shared::pb::shared::ContainerContracts { - ..Default::default() - }) - .await; - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - } + let _ = tx + .send(detee_shared::pb::shared::ContainerContracts { + ..Default::default() + }) + .await; }); let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg)); Ok(Response::new(Box::pin(output_stream))) } + async fn brain_messages( + &self, + req: tonic::Request, + ) -> Result, Status> { + let req = req.into_inner(); + info!("Daemon {} connected to receive brain messages", req.pubkey); + let (tx, rx) = mpsc::channel(6); + self.data.add_sgx_daemon_tx(&req.pubkey, tx); + let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg)); + Ok(Response::new( + Box::pin(output_stream) as Self::BrainMessagesStream + )) + } + async fn daemon_messages( &self, req: tonic::Request>, ) -> Result, Status> { - dbg!(req); + dbg!(&req); + let mut req_stream = req.into_inner(); + + while let Some(daemon_message) = req_stream.next().await { + match daemon_message { + Ok(msg) => match msg.msg { + Some(sgx_daemon_message::Msg::NewContainerResp(new_cont)) => { + dbg!(&new_cont); + self.data.send_new_container_resp(new_cont).await; + } + Some(something) => { + dbg!(something); + } + None => { + dbg!("None"); + } + }, + Err(_) => todo!(), + } + // + } + Ok(Response::new(detee_shared::pb::shared::Empty {})) } - async fn brain_messages( - &self, - req: tonic::Request, - ) -> Result, Status> { - dbg!(req); - Ok(Response::new( - Box::pin(tokio_stream::empty()) as Self::BrainMessagesStream - )) - } }