diff --git a/Cargo.lock b/Cargo.lock index 7509f23..deb88e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,7 +329,6 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#7c9f66a7394c06ad8af0934e34b113f9c965bc98" dependencies = [ "base64", "prost", diff --git a/src/data.rs b/src/data.rs index cf095ad..7632bb9 100644 --- a/src/data.rs +++ b/src/data.rs @@ -2,14 +2,18 @@ use crate::grpc::snp_proto::{self as grpc}; use chrono::Utc; use dashmap::DashMap; +use detee_shared::pb::brain::DelAppReq; use log::{debug, info, warn}; use std::str::FromStr; use std::sync::RwLock; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Sender as OneshotSender; -use detee_shared::pb::daemon as daemonPb; -use detee_shared::pb::shared as sharedPb; +use detee_shared::pb::brain::AppContract as AppContractPB; +use detee_shared::pb::brain::BrainMessageApp; +use detee_shared::pb::brain::MappedPort; +use detee_shared::pb::brain::NewAppReq; +use detee_shared::pb::brain::NewAppRes; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -157,7 +161,7 @@ pub struct AppContract { pub collected_at: chrono::DateTime, } -impl From for sharedPb::ContainerContracts { +impl From for AppContractPB { fn from(value: AppContract) -> Self { Self { uuid: value.uuid, @@ -167,7 +171,7 @@ impl From for sharedPb::ContainerContracts { exposed_ports: value .mapped_ports .into_iter() - .map(sharedPb::MappedPort::from) + .map(MappedPort::from) .collect(), ..Default::default() } @@ -202,14 +206,8 @@ pub struct BrainData { daemon_tx: DashMap>, app_nodes: RwLock>, - app_daemon_tx: DashMap>, - tmp_new_container_reqs: DashMap< - String, - ( - sharedPb::Container, - OneshotSender, - ), - >, + app_daemon_tx: DashMap>, + tmp_new_container_reqs: DashMap)>, app_contracts: RwLock>, } @@ -744,7 +742,7 @@ impl BrainData { } impl BrainData { - pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender) { + pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender) { self.app_daemon_tx.insert(node_pubkey.to_string(), tx); } @@ -767,7 +765,7 @@ impl BrainData { pub fn find_app_contract_by_uuid(&self, uuid: &str) -> Option { let contracts = self.app_contracts.read().unwrap(); - contracts.iter().cloned().find(|c| c.uuid == uuid) + contracts.iter().find(|c| c.uuid == uuid).cloned() } pub fn find_app_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec { @@ -793,11 +791,7 @@ impl BrainData { .collect() } - pub async fn send_new_container_req( - &self, - mut req: sharedPb::Container, - tx: OneshotSender, - ) { + pub async fn send_new_container_req(&self, mut req: NewAppReq, tx: OneshotSender) { req.uuid = uuid::Uuid::new_v4().to_string(); info!("Inserting new container request in memory: {req:?}"); @@ -809,10 +803,10 @@ impl BrainData { "Found daemon TX for {}. Sending newVMReq {}", req.node_pubkey, req.uuid ); - let msg = daemonPb::BrainMessage { - msg: Some( - detee_shared::pb::daemon::brain_message::Msg::NewContainerReq(req.clone()), - ), + let msg = BrainMessageApp { + msg: Some(detee_shared::pb::brain::brain_message_app::Msg::NewAppReq( + req.clone(), + )), }; if let Err(e) = app_daemon_tx.send(msg).await { warn!( @@ -821,7 +815,7 @@ impl BrainData { ); info!("Deleting daemon TX for {}", req.node_pubkey); self.del_app_daemon_tx(&req.node_pubkey); - self.send_new_container_resp(daemonPb::NewContainerRes { + self.send_new_container_resp(NewAppRes { uuid: req.uuid, status: "Failed".to_string(), error: "Daemon is offline.".to_string(), @@ -834,19 +828,18 @@ impl BrainData { pub async fn send_del_container_req( &self, - req: daemonPb::ContainerFilters, + req: DelAppReq, ) -> Result<(), Box> { - if let Some(app_contract) = self.find_app_contract_by_uuid(req.uuid()) { - info!("Found app contract {}. Deleting...", req.uuid()); + if let Some(app_contract) = self.find_app_contract_by_uuid(&req.uuid) { + info!("Found app contract {}. Deleting...", &req.uuid); if let Some(app_daemon_tx) = self.app_daemon_tx.get(&app_contract.node_pubkey) { debug!( "TX for daemon {} found. Informing daemon about deletion of {}.", - app_contract.node_pubkey, - req.uuid() + app_contract.node_pubkey, &req.uuid ); - let msg = daemonPb::BrainMessage { + let msg = BrainMessageApp { msg: Some( - detee_shared::pb::daemon::brain_message::Msg::DeleteContainer(req.clone()), + detee_shared::pb::brain::brain_message_app::Msg::DeleteAppReq(req.clone()), ), }; @@ -861,15 +854,15 @@ impl BrainData { } let mut app_contracts = self.app_contracts.write().unwrap(); - app_contracts.retain(|c| c.uuid != req.uuid()); + app_contracts.retain(|c| c.uuid != req.uuid); - return Ok(()); + Ok(()) } else { Err("Contract not found".into()) } } - pub async fn send_new_container_resp(&self, new_container_resp: daemonPb::NewContainerRes) { + pub async fn send_new_container_resp(&self, new_container_resp: NewAppRes) { let new_container_req = match self.tmp_new_container_reqs.remove(&new_container_resp.uuid) { Some((_, r)) => r, None => { diff --git a/src/grpc.rs b/src/grpc.rs index b62e9d4..e56b9c4 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -15,9 +15,10 @@ use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; -use detee_shared::pb::daemon::{ - brain_sgx_cli_server::BrainSgxCli, brain_sgx_daemon_server::BrainSgxDaemon, - daemon_message as sgx_daemon_message, ContainerFilters, ContainerListResp, NewContainerRes, +use detee_shared::pb::brain::brain_app_cli_server::BrainAppCli; +use detee_shared::pb::brain::brain_app_daemon_server::BrainAppDaemon; +use detee_shared::pb::brain::{ + AppContract, BrainMessageApp, NewAppReq, NewAppRes, RegisterAppNodeReq, }; pub struct BrainDaemonMock { @@ -40,21 +41,21 @@ impl BrainCliMock { } } -pub struct BrainSgxCliMock { +pub struct BrainAppCliMock { data: Arc, } -impl BrainSgxCliMock { +impl BrainAppCliMock { pub fn new(data: Arc) -> Self { Self { data } } } -pub struct BrainSgxDaemonMock { +pub struct BrainAppDaemonMock { data: Arc, } -impl BrainSgxDaemonMock { +impl BrainAppDaemonMock { pub fn new(data: Arc) -> Self { Self { data } } @@ -273,11 +274,11 @@ impl BrainCli for BrainCliMock { } #[tonic::async_trait] -impl BrainSgxCli for BrainSgxCliMock { - async fn create_container( +impl BrainAppCli for BrainAppCliMock { + async fn create_app( &self, - req: tonic::Request, - ) -> Result, Status> { + req: tonic::Request, + ) -> Result, Status> { let req = req.into_inner(); log::info!("Creating new container: {req:?}"); let admin_pubkey = req.admin_pubkey.clone(); @@ -298,6 +299,8 @@ impl BrainSgxCli for BrainSgxCliMock { } } + /* + async fn delete_container( &self, req: tonic::Request, @@ -319,12 +322,17 @@ impl BrainSgxCli for BrainSgxCliMock { &self, req: tonic::Request, ) -> Result, Status> { - dbg!(req); + let req_data = req.into_inner(); + dbg!(&req_data); + + // let containers = self.data.find_app_contracts_by_admin_pubkey(&req_data.admin_pubkey).into(); Ok(Response::new(ContainerListResp { ..Default::default() })) } + */ + // async fn inspect_container( // &self, // req: tonic::Request, @@ -337,16 +345,13 @@ impl BrainSgxCli for BrainSgxCliMock { } #[tonic::async_trait] -impl BrainSgxDaemon for BrainSgxDaemonMock { - type RegisterNodeStream = Pin< - Box> + Send>, - >; - type BrainMessagesStream = - Pin> + Send>>; +impl BrainAppDaemon for BrainAppDaemonMock { + type RegisterNodeStream = Pin> + Send>>; + type BrainMessagesStream = Pin> + Send>>; async fn register_node( &self, - req: tonic::Request, + req: tonic::Request, ) -> Result, Status> { let req_data = req.into_inner(); log::info!( @@ -378,24 +383,25 @@ impl BrainSgxDaemon for BrainSgxDaemonMock { let _ = tx.send(contract.into()).await; } }); - let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg)); + let output_stream = ReceiverStream::new(rx).map(Ok); Ok(Response::new(Box::pin(output_stream))) } async fn brain_messages( &self, - req: tonic::Request, + req: tonic::Request, ) -> Result, Status> { let req = req.into_inner(); info!("Daemon {} connected to receive brain messages", req.pubkey); let (tx, rx) = mpsc::channel(6); self.data.add_app_daemon_tx(&req.pubkey, tx); - let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg)); + let output_stream = ReceiverStream::new(rx).map(Ok); Ok(Response::new( Box::pin(output_stream) as Self::BrainMessagesStream )) } + /* async fn daemon_messages( &self, req: tonic::Request>, @@ -426,4 +432,5 @@ impl BrainSgxDaemon for BrainSgxDaemonMock { Ok(Response::new(detee_shared::pb::shared::Empty {})) } + */ } diff --git a/src/main.rs b/src/main.rs index 7305cc0..18a4f00 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,18 +2,17 @@ mod data; mod grpc; use data::BrainData; -use detee_shared::pb::daemon::brain_sgx_daemon_server::BrainSgxDaemonServer; +use detee_shared::pb::brain::brain_app_cli_server::BrainAppCliServer; +use detee_shared::pb::brain::brain_app_daemon_server::BrainAppDaemonServer; use grpc::snp_proto::brain_cli_server::BrainCliServer; use grpc::snp_proto::brain_daemon_server::BrainDaemonServer; +use grpc::BrainAppCliMock; +use grpc::BrainAppDaemonMock; use grpc::BrainCliMock; use grpc::BrainDaemonMock; -use grpc::BrainSgxCliMock; -use grpc::BrainSgxDaemonMock; use std::sync::Arc; use tonic::transport::Server; -use detee_shared::pb::daemon::brain_sgx_cli_server::BrainSgxCliServer; - #[tokio::main] async fn main() { env_logger::builder() @@ -32,8 +31,8 @@ async fn main() { let daemon_server = BrainDaemonServer::new(BrainDaemonMock::new(data.clone())); let cli_server = BrainCliServer::new(BrainCliMock::new(data.clone())); - let sgx_cli_server = BrainSgxCliServer::new(BrainSgxCliMock::new(data.clone())); - let sgx_daemon_server = BrainSgxDaemonServer::new(BrainSgxDaemonMock::new(data.clone())); + let sgx_cli_server = BrainAppCliServer::new(BrainAppCliMock::new(data.clone())); + let sgx_daemon_server = BrainAppDaemonServer::new(BrainAppDaemonMock::new(data.clone())); Server::builder() .add_service(daemon_server)