diff --git a/Cargo.lock b/Cargo.lock index 6d37cca..c14e3ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -318,7 +318,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#3e783b11bab6894b6f98bd3c6ce44e8bf5b1f78b" +source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#a2899ba5a25794aec60a093695675ff24f967484" dependencies = [ "base64", "prost", diff --git a/src/data.rs b/src/data.rs index da759e4..ecfcf51 100644 --- a/src/data.rs +++ b/src/data.rs @@ -32,7 +32,7 @@ impl DaemonState { unarchive_dir: String, ) -> Result, Box> { let publishing_ports = req_data.resource.clone().unwrap().port; - let uuid = req_data.uuid.unwrap_or_default().uuid; + let uuid = req_data.uuid; let container_name = format!("dtpm-{uuid}"); let mapped_ports = deploy_enclave(&unarchive_dir, container_name.clone(), publishing_ports).await?; diff --git a/src/grpc.rs b/src/grpc.rs index 25d7e5b..07325f2 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -54,6 +54,11 @@ pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> { let mut streaming_tasks = JoinSet::new(); streaming_tasks.spawn(receive_messages(client.clone(), conn_data.brain_msg_tx)); + streaming_tasks.spawn(send_messages( + client.clone(), + conn_data.daemon_msg_rx, + conn_data.daemon_msg_tx, + )); let task_output = streaming_tasks.join_next().await; println!("exiting: {task_output:?}"); @@ -64,7 +69,7 @@ pub async fn receive_messages( mut client: BrainSgxDaemonClient, tx: Sender, ) -> Result<()> { - let pubkey = "node_pubkey".to_owned(); + let pubkey = "0xd0837609aedd53854651210327db90f5c2626188a00e940bbc9eea2c7e6838b7".to_owned(); log::debug!("starting to listen for messages from brain"); let mut grpc_stream = client.brain_messages(Pubkey { pubkey }).await?.into_inner(); @@ -73,7 +78,7 @@ pub async fn receive_messages( match stream_update { Ok(msg) => { log::info!("Received message from brain: {msg:?}"); - let _ = tx.send(msg).await?; + tx.send(msg).await?; } Err(e) => { println!("Brain disconnected from brain_messaages: {e}"); @@ -89,7 +94,7 @@ pub async fn send_messages( rx: Receiver, tx: Sender, ) -> Result<()> { - let pubkey = "node_pubkey".to_owned(); + let pubkey = "0xd0837609aedd53854651210327db90f5c2626188a00e940bbc9eea2c7e6838b7".to_owned(); let rx_stream = ReceiverStream::new(rx); tx.send(DaemonMessage { @@ -97,6 +102,6 @@ pub async fn send_messages( }) .await?; client.daemon_messages(rx_stream).await?; - log::debug!("send_newvm_resp is about to exit"); + log::debug!("daemon_messages is about to exit"); Ok(()) } diff --git a/src/main.rs b/src/main.rs index 714e0f5..723d164 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,9 +6,14 @@ pub mod utils; use std::time::Duration; pub use data::DaemonState; +use detee_shared::pb::daemon::brain_message; +use detee_shared::pb::daemon::daemon_message; use detee_shared::pb::daemon::BrainMessage; use detee_shared::pb::daemon::DaemonMessage; +use detee_shared::pb::daemon::NewContainerRes; use detee_shared::pb::shared::ContainerContracts; +use detee_shared::types::shared::Container; + use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::time::sleep; @@ -41,14 +46,22 @@ impl ContainerHandler { } } - fn handle_contracts(&mut self, contracts: Vec) -> () { + fn handle_contracts(&mut self, contracts: Vec) { dbg!(&contracts); } - async fn run(mut self) -> () { + async fn run(mut self) { while let Some(brain_msg) = self.receiver.recv().await { match brain_msg.msg { - Some(msg) => { + Some(brain_message::Msg::NewContainerReq(msg)) => { + self.handle_new_container_req(msg.into()).await; + } + + Some(brain_message::Msg::DeleteContainer(msg)) => { + dbg!(&msg); + } + + Some(brain_message::Msg::ListContainer(msg)) => { dbg!(&msg); } None => { @@ -58,11 +71,25 @@ impl ContainerHandler { } } } + + async fn handle_new_container_req(&mut self, new_container_req: Container) { + dbg!(&new_container_req); + let res = DaemonMessage { + msg: Some(daemon_message::Msg::NewContainerResp(NewContainerRes { + uuid: new_container_req.uuid, + ..Default::default() + })), + }; + + println!("sending response {:?}", res); + let _ = self.sender.send(res).await; + } } #[tokio::main] async fn main() -> Result<(), Box> { - println!("Detee daemon"); + set_logging(); + log::info!("Detee daemon running"); loop { let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6); @@ -95,3 +122,18 @@ async fn main() -> Result<(), Box> { sleep(Duration::from_secs(3)).await; } } + +fn set_logging() { + let log_level = match std::env::var("LOG_LEVEL") { + Ok(val) => match val.as_str() { + "DEBUG" => log::LevelFilter::Debug, + "INFO" => log::LevelFilter::Info, + _ => log::LevelFilter::Error, + }, + _ => log::LevelFilter::Warn, + }; + env_logger::builder() + .filter_level(log_level) + .format_timestamp(None) + .init(); +}