diff --git a/Cargo.lock b/Cargo.lock index ce542e0..6d37cca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -303,12 +303,14 @@ dependencies = [ "detee-shared", "env_logger", "flate2", + "log", "prost", "prost-types", "rand", "reqwest", "tar", "tokio", + "tokio-stream", "tonic", "tonic-build", ] @@ -316,7 +318,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#78c84299947e887fe8d8c737656318f409c7f0b4" +source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#3e783b11bab6894b6f98bd3c6ce44e8bf5b1f78b" dependencies = [ "base64", "prost", diff --git a/Cargo.toml b/Cargo.toml index 5e71c42..a940bcf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,15 +9,17 @@ prost = "0.13.4" prost-types = "0.13.4" tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread", "fs"] } tonic = "0.12.3" - -detee-shared = { git = "ssh://git@gitea.detee.cloud/noormohammedb/detee-shared" } -# detee-shared = { path = "../detee-shared" } chrono = "0.4.39" reqwest = "0.12.12" flate2 = "1.0.35" tar = "0.4.43" anyhow = "1.0.95" rand = "0.8.5" +tokio-stream = "0.1.17" + +detee-shared = { git = "ssh://git@gitea.detee.cloud/noormohammedb/detee-shared" } +# detee-shared = { path = "../detee-shared" } +log = "0.4.25" [build-dependencies] tonic-build = "0.12.3" diff --git a/daemon.proto b/daemon.proto deleted file mode 100644 index a798244..0000000 --- a/daemon.proto +++ /dev/null @@ -1,19 +0,0 @@ -syntax = "proto3"; -package deamon; - -message Empty { -} - -message NewContainerReq { - repeated string port = 1; -} - -message NewContainerRes { - string status = 1; -} - -service DaemonService { - rpc CreateContainer (NewContainerReq) returns (NewContainerRes); - // rpc ListContainer (NodeFilters) returns (stream NodeListResp); - -} diff --git a/src/grpc.rs b/src/grpc.rs index fada4c6..25d7e5b 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,141 +1,102 @@ use anyhow::Result; +use detee_shared::pb::daemon::brain_sgx_daemon_client::BrainSgxDaemonClient; +use detee_shared::pb::daemon::daemon_message::Msg; +use detee_shared::pb::daemon::{BrainMessage, DaemonMessage}; +use detee_shared::pb::shared::ContainerContracts; +use detee_shared::pb::shared::{Pubkey, RegisterNodeReq}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::Sender; +use tokio::task::JoinSet; +use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::StreamExt; +use tonic::transport::Channel; -use std::sync::Arc; -use std::{net::SocketAddr, str::FromStr}; -use tokio::sync::RwLock; -use tonic::transport::Server; - -use detee_shared::pb::daemon::daemon_service_server::{ - DaemonService as DaemonServicePB, DaemonServiceServer as DaemonServiceServerPB, -}; -use detee_shared::pb::daemon::{ - ContainerFilters, ContainerInspectResp, ContainerListResp, DeleteContainerRes, NewContainerRes, -}; -use detee_shared::pb::shared::Container as ContainerPB; - -use detee_shared::types::shared::Container as ContainerConfig; - -use crate::utils::handle_package; -use crate::DaemonState; - -#[derive(Debug, Clone)] -pub struct DaemonServer { - pub data: Arc>, +pub struct ConnectionData { + pub brain_url: String, + pub brain_msg_tx: Sender, + pub daemon_msg_rx: Receiver, + pub daemon_msg_tx: Sender, } -impl DaemonServer { - pub fn new(data: Arc>) -> Self { - Self { data } - } +pub async fn register_node(config: &crate::Config) -> Result> { + let mut client = BrainSgxDaemonClient::connect(config.brain_url.clone()).await?; - pub async fn start(&self) -> Result<()> { - let port: String = std::env::var("PORT").unwrap_or_else(|_| "33400".to_string()); + log::debug!("registering node with brain"); - let addr = SocketAddr::from_str(format!("0.0.0.0:{port}").as_str())?; + let req = RegisterNodeReq { + ..Default::default() + }; - let daemon_server = DaemonServiceServerPB::new(DaemonServer::new(self.data.clone())); + let mut container_contracts = vec![]; - println!("Listening on {}", addr); - - Server::builder() - .add_service(daemon_server) - .serve(addr) - .await?; - Ok(()) - } -} - -#[tonic::async_trait] -impl DaemonServicePB for DaemonServer { - async fn create_container( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let req_data = request.into_inner(); - - if req_data.package_url.is_none() || req_data.resource.is_none() { - return Err(tonic::Status::data_loss("missing some data in request")); + let mut grpc_stream = client.register_node(req).await?.into_inner(); + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(contract) => { + container_contracts.push(contract); + } + Err(e) => { + println!("Brain disconnected from register_node: {e}"); + } } - let unarchive_dir = handle_package(req_data.package_url.clone().unwrap_or_default()) - .await - .map_err(|err| tonic::Status::internal(err.to_string()))?; - - let req_container: ContainerConfig = req_data.into(); - let container_uuid = req_container.uuid.clone().unwrap_or_default().uuid; - - let mapped_ports = self - .data - .write() - .await - .create_new_container(req_container, unarchive_dir) - .await - .map_err(|err| tonic::Status::internal(err.to_string()))?; - - let mapped_ports = mapped_ports - .into_iter() - .map(|(host, container)| detee_shared::pb::shared::MappedPort { - host_port: host.into(), - container_port: container.into(), - }) - .collect(); - - return Ok(tonic::Response::new(NewContainerRes { - container_id: Some(detee_shared::pb::shared::Uuid { - uuid: container_uuid, - }), - status: "success".to_string(), - ip_address: "".to_string(), - mapped_ports, - })); } + log::info!( + "Brain registration succcessful, with contract count: {}", + container_contracts.len() + ); - async fn delete_container( - &self, - req: tonic::Request, - ) -> Result, tonic::Status> { - let req_data = req.into_inner(); - if req_data.container_id.is_none() { - return Err(tonic::Status::data_loss("missing container id")); - } - self.data - .write() - .await - .delete_container( - req_data.admin_pubkey, - req_data.container_id.unwrap_or_default().uuid, - ) - .await - .map_err(|err| tonic::Status::internal(err.to_string()))?; - - return Ok(tonic::Response::new(DeleteContainerRes { - ..Default::default() - })); - } - - async fn inspect_container( - &self, - req: tonic::Request, - ) -> Result, tonic::Status> { - dbg!(req); - return Ok(tonic::Response::new(ContainerInspectResp { - ..Default::default() - })); - } - - // async fn container_log( - // &self, - // req: tonic::Request, - // ) -> Result, tonic::Status> { - // todo!() - // } - - async fn list_containers( - &self, - req: tonic::Request, - ) -> Result, tonic::Status> { - dbg!(req); - return Ok(tonic::Response::new(ContainerListResp { - ..Default::default() - })); - } + Ok(container_contracts) +} + +pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> { + let client = BrainSgxDaemonClient::connect(conn_data.brain_url).await?; + + let mut streaming_tasks = JoinSet::new(); + + streaming_tasks.spawn(receive_messages(client.clone(), conn_data.brain_msg_tx)); + + let task_output = streaming_tasks.join_next().await; + println!("exiting: {task_output:?}"); + Ok(()) +} + +pub async fn receive_messages( + mut client: BrainSgxDaemonClient, + tx: Sender, +) -> Result<()> { + let pubkey = "node_pubkey".to_owned(); + + log::debug!("starting to listen for messages from brain"); + let mut grpc_stream = client.brain_messages(Pubkey { pubkey }).await?.into_inner(); + + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(msg) => { + log::info!("Received message from brain: {msg:?}"); + let _ = tx.send(msg).await?; + } + Err(e) => { + println!("Brain disconnected from brain_messaages: {e}"); + } + } + } + println!("brain_messages is about to exit"); + Ok(()) +} + +pub async fn send_messages( + mut client: BrainSgxDaemonClient, + rx: Receiver, + tx: Sender, +) -> Result<()> { + let pubkey = "node_pubkey".to_owned(); + + let rx_stream = ReceiverStream::new(rx); + tx.send(DaemonMessage { + msg: Some(Msg::Pubkey(Pubkey { pubkey })), + }) + .await?; + client.daemon_messages(rx_stream).await?; + log::debug!("send_newvm_resp is about to exit"); + Ok(()) } diff --git a/src/main.rs b/src/main.rs index 3e04e8a..714e0f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,18 +3,95 @@ pub mod data; pub mod grpc; pub mod utils; -pub use data::DaemonState; -use std::sync::Arc; +use std::time::Duration; -use grpc::DaemonServer; -use tokio::sync::RwLock; +pub use data::DaemonState; +use detee_shared::pb::daemon::BrainMessage; +use detee_shared::pb::daemon::DaemonMessage; +use detee_shared::pb::shared::ContainerContracts; +use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::Sender; +use tokio::time::sleep; + +pub struct Config { + pub brain_url: String, +} + +impl Default for Config { + fn default() -> Self { + let brain_url = + std::env::var("BRAIN_URL").unwrap_or_else(|_| "http://127.0.0.1:31337".to_string()); + Self { brain_url } + } +} + +pub struct ContainerHandler { + receiver: Receiver, + sender: Sender, + config: Config, + // res: state::Resources, +} + +impl ContainerHandler { + pub fn new(receiver: Receiver, sender: Sender) -> Self { + Self { + receiver, + sender, + config: Config::default(), + } + } + + fn handle_contracts(&mut self, contracts: Vec) -> () { + dbg!(&contracts); + } + + async fn run(mut self) -> () { + while let Some(brain_msg) = self.receiver.recv().await { + match brain_msg.msg { + Some(msg) => { + dbg!(&msg); + } + None => { + log::error!("Brain disconnected"); + break; + } + } + } + } +} #[tokio::main] async fn main() -> Result<(), Box> { println!("Detee daemon"); - DaemonServer::new(Arc::new(RwLock::new(DaemonState::default()))) - .start() - .await?; - Ok(()) + loop { + let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6); + let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6); + + let mut container_handler = ContainerHandler::new(brain_msg_rx, daemon_msg_tx.clone()); + let brain_url = container_handler.config.brain_url.clone(); + + match grpc::register_node(&container_handler.config).await { + Ok(container_contracts) => container_handler.handle_contracts(container_contracts), + + Err(e) => log::error!("Failed to connect to brain: {e}"), + } + + tokio::spawn(async move { + container_handler.run().await; + }); + + log::info!("Connecting to brain..."); + if let Err(e) = grpc::connect_and_run(grpc::ConnectionData { + brain_url, + brain_msg_tx, + daemon_msg_rx, + daemon_msg_tx, + }) + .await + { + log::error!("The connection broke: {e}"); + } + sleep(Duration::from_secs(3)).await; + } }