pub mod config; pub mod container; pub mod data; pub mod global; pub mod grpc; pub mod utils; use anyhow::Result; use data::App; use detee_shared::sgx::pb::brain::brain_message_app; use detee_shared::sgx::pb::brain::AppContract; use detee_shared::sgx::pb::brain::BrainMessageApp; use detee_shared::sgx::pb::brain::DaemonMessageApp; use detee_shared::sgx::pb::brain::MappedPort; use detee_shared::sgx::pb::brain::NewAppRes; use detee_shared::sgx::types::brain::AppDeployConfig; use log::info; use log::warn; use std::collections::HashSet; use std::time::Duration; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::time::sleep; use utils::cleanup_enclave_disk_and_package; pub use crate::config::HostConfig; pub use crate::data::HostResources; use global::APP_CONFIG_DIR; use global::DAEMON_CONFIG_PATH; #[derive(Debug)] pub struct AppHandler { pub receiver: Receiver, pub sender: Sender, pub host_config: HostConfig, pub host_resource: HostResources, } impl AppHandler { pub fn new(receiver: Receiver, sender: Sender) -> Self { // TODO: load from config and resources from file, // if not found use default and save host resources to file let host_config = match HostConfig::load_from_disk(&DAEMON_CONFIG_PATH) { Ok(config) => config, Err(e) => panic!("Could not load config: {e:?}"), }; let host_resource = match HostResources::load_from_disk() { Ok(res) => res, Err(e) => { warn!("Could not load resources from disk: {e:?}"); info!("Creating new resource calculator."); HostResources::new() } }; Self { receiver, sender, host_config, host_resource, } } async fn handle_contracts(&mut self, contracts: Vec) { let apps_in_host = self.host_resource.existing_apps.clone(); let apps_in_brain: HashSet = contracts .iter() .map(|contact| contact.uuid.clone()) .collect(); let deleted_apps: HashSet = apps_in_host.difference(&apps_in_brain).cloned().collect(); for uuid in deleted_apps { if let Err(e) = self.handle_del_app_req(uuid.clone()).await { log::error!("Failed to delete app: {e}"); } } } async fn run(mut self) { while let Some(brain_msg) = self.receiver.recv().await { match brain_msg.msg { Some(brain_message_app::Msg::NewAppReq(msg)) => { self.handle_new_app_req(msg.into()).await; } Some(brain_message_app::Msg::DeleteAppReq(msg)) => { if let Err(er) = self.handle_del_app_req(msg.uuid).await { log::error!("Failed to delete app: {er}"); } self.send_node_resources().await; } None => { log::error!("Brain disconnected"); break; } } } } async fn handle_new_app_req(&mut self, new_app_req: AppDeployConfig) { let uuid = new_app_req.uuid.clone(); let app_result = App::new(new_app_req, &self.host_config, &mut self.host_resource).await; match app_result { Ok(app) => { // TODO: update host resources to brain self.send_node_resources().await; info!("Succesfully started VM {uuid}"); let res = NewAppRes { uuid, status: "success".to_string(), error: "".to_string(), mapped_ports: app.mapped_ports.into_iter().map(MappedPort::from).collect(), ip_address: self.host_config.host_ip_address.clone(), }; let _ = self.sender.send(res.into()).await; } Err(e) => { let res = NewAppRes { uuid, status: "failed".to_string(), error: e.to_string(), ..Default::default() }; log::error!("sending response {:?}", res); let _ = self.sender.send(res.into()).await; } }; } async fn handle_del_app_req(&mut self, uuid: String) -> Result<()> { let app_handle_file_name = format!("{}{}.yaml", *APP_CONFIG_DIR, &uuid); let content = std::fs::read_to_string(&app_handle_file_name)?; let app_instance: App = serde_yml::from_str(&content)?; app_instance.delete_app(&mut self.host_resource).await?; std::fs::remove_file(&app_handle_file_name)?; if let Err(er) = cleanup_enclave_disk_and_package(uuid).await { log::error!("Failed to cleanup disk:\n{er}"); }; Ok(()) } async fn send_node_resources(&self) { // TODO: send host resources to brain } } #[tokio::main] async fn main() -> Result<(), Box> { set_logging(); log::info!("Detee daemon running"); loop { let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6); let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6); let mut app_handler = AppHandler::new(brain_msg_rx, daemon_msg_tx.clone()); let brain_url = app_handler.host_config.brain_url.clone(); let mut contracts = vec![]; match grpc::register_node(&app_handler.host_config).await { Ok(app_contracts) => { contracts.append(&mut app_contracts.iter().map(|c| c.uuid.clone()).collect()); app_handler.handle_contracts(app_contracts).await } Err(e) => log::error!("Failed to connect to brain: {e}"), } tokio::spawn(async move { app_handler.run().await; }); log::info!("Connecting to brain..."); if let Err(e) = grpc::connect_and_run(grpc::ConnectionData { brain_url, brain_msg_tx, daemon_msg_rx, daemon_msg_tx, app_contracts_uuid: contracts, }) .await { log::error!("The connection broke: {e}"); } sleep(Duration::from_secs(3)).await; } } fn set_logging() { let log_level = match std::env::var("LOG_LEVEL") { Ok(val) => match val.as_str() { "DEBUG" => log::LevelFilter::Debug, "INFO" => log::LevelFilter::Info, _ => log::LevelFilter::Error, }, _ => log::LevelFilter::Warn, }; env_logger::builder() .filter_level(log_level) .format_timestamp(None) .init(); }