From 7f5fb2efad268a1ccb8e8abdb62ec1891cb25844 Mon Sep 17 00:00:00 2001 From: Noor Date: Thu, 30 Jan 2025 19:23:14 +0530 Subject: [PATCH] fix: update detee-shared dependency source and refactor message handling handle requests from brain and send response accordingly with new grpc methods and channels implement daemon send message grpc method some WIP handler method to make it work setup application logging --- Cargo.lock | 2 +- src/data.rs | 2 +- src/grpc.rs | 13 +++++++++---- src/main.rs | 50 ++++++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 57 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6d37cca..c14e3ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -318,7 +318,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#3e783b11bab6894b6f98bd3c6ce44e8bf5b1f78b" +source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#a2899ba5a25794aec60a093695675ff24f967484" dependencies = [ "base64", "prost", diff --git a/src/data.rs b/src/data.rs index da759e4..ecfcf51 100644 --- a/src/data.rs +++ b/src/data.rs @@ -32,7 +32,7 @@ impl DaemonState { unarchive_dir: String, ) -> Result, Box> { let publishing_ports = req_data.resource.clone().unwrap().port; - let uuid = req_data.uuid.unwrap_or_default().uuid; + let uuid = req_data.uuid; let container_name = format!("dtpm-{uuid}"); let mapped_ports = deploy_enclave(&unarchive_dir, container_name.clone(), publishing_ports).await?; diff --git a/src/grpc.rs b/src/grpc.rs index 25d7e5b..07325f2 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -54,6 +54,11 @@ pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> { let mut streaming_tasks = JoinSet::new(); streaming_tasks.spawn(receive_messages(client.clone(), conn_data.brain_msg_tx)); + streaming_tasks.spawn(send_messages( + client.clone(), + conn_data.daemon_msg_rx, + conn_data.daemon_msg_tx, + )); let task_output = streaming_tasks.join_next().await; println!("exiting: {task_output:?}"); @@ -64,7 +69,7 @@ pub async fn receive_messages( mut client: BrainSgxDaemonClient, tx: Sender, ) -> Result<()> { - let pubkey = "node_pubkey".to_owned(); + let pubkey = "0xd0837609aedd53854651210327db90f5c2626188a00e940bbc9eea2c7e6838b7".to_owned(); log::debug!("starting to listen for messages from brain"); let mut grpc_stream = client.brain_messages(Pubkey { pubkey }).await?.into_inner(); @@ -73,7 +78,7 @@ pub async fn receive_messages( match stream_update { Ok(msg) => { log::info!("Received message from brain: {msg:?}"); - let _ = tx.send(msg).await?; + tx.send(msg).await?; } Err(e) => { println!("Brain disconnected from brain_messaages: {e}"); @@ -89,7 +94,7 @@ pub async fn send_messages( rx: Receiver, tx: Sender, ) -> Result<()> { - let pubkey = "node_pubkey".to_owned(); + let pubkey = "0xd0837609aedd53854651210327db90f5c2626188a00e940bbc9eea2c7e6838b7".to_owned(); let rx_stream = ReceiverStream::new(rx); tx.send(DaemonMessage { @@ -97,6 +102,6 @@ pub async fn send_messages( }) .await?; client.daemon_messages(rx_stream).await?; - log::debug!("send_newvm_resp is about to exit"); + log::debug!("daemon_messages is about to exit"); Ok(()) } diff --git a/src/main.rs b/src/main.rs index 714e0f5..723d164 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,9 +6,14 @@ pub mod utils; use std::time::Duration; pub use data::DaemonState; +use detee_shared::pb::daemon::brain_message; +use detee_shared::pb::daemon::daemon_message; use detee_shared::pb::daemon::BrainMessage; use detee_shared::pb::daemon::DaemonMessage; +use detee_shared::pb::daemon::NewContainerRes; use detee_shared::pb::shared::ContainerContracts; +use detee_shared::types::shared::Container; + use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::time::sleep; @@ -41,14 +46,22 @@ impl ContainerHandler { } } - fn handle_contracts(&mut self, contracts: Vec) -> () { + fn handle_contracts(&mut self, contracts: Vec) { dbg!(&contracts); } - async fn run(mut self) -> () { + async fn run(mut self) { while let Some(brain_msg) = self.receiver.recv().await { match brain_msg.msg { - Some(msg) => { + Some(brain_message::Msg::NewContainerReq(msg)) => { + self.handle_new_container_req(msg.into()).await; + } + + Some(brain_message::Msg::DeleteContainer(msg)) => { + dbg!(&msg); + } + + Some(brain_message::Msg::ListContainer(msg)) => { dbg!(&msg); } None => { @@ -58,11 +71,25 @@ impl ContainerHandler { } } } + + async fn handle_new_container_req(&mut self, new_container_req: Container) { + dbg!(&new_container_req); + let res = DaemonMessage { + msg: Some(daemon_message::Msg::NewContainerResp(NewContainerRes { + uuid: new_container_req.uuid, + ..Default::default() + })), + }; + + println!("sending response {:?}", res); + let _ = self.sender.send(res).await; + } } #[tokio::main] async fn main() -> Result<(), Box> { - println!("Detee daemon"); + set_logging(); + log::info!("Detee daemon running"); loop { let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6); @@ -95,3 +122,18 @@ async fn main() -> Result<(), Box> { sleep(Duration::from_secs(3)).await; } } + +fn set_logging() { + let log_level = match std::env::var("LOG_LEVEL") { + Ok(val) => match val.as_str() { + "DEBUG" => log::LevelFilter::Debug, + "INFO" => log::LevelFilter::Info, + _ => log::LevelFilter::Error, + }, + _ => log::LevelFilter::Warn, + }; + env_logger::builder() + .filter_level(log_level) + .format_timestamp(None) + .init(); +}