pub mod config; pub mod container; pub mod data; pub mod global; pub mod grpc; pub mod utils; use anyhow::anyhow; use anyhow::Result; use data::App; use detee_shared::app_proto::{ brain_message_app, AppContract, AppNodeResources, BrainMessageApp, DaemonMessageApp, MappedPort, NewAppRes, }; use detee_shared::sgx::types::brain::AppDeployConfig; use global::PUBLIC_KEY; use log::info; use log::warn; use std::collections::HashSet; use std::fs::File; use std::path::Path; use std::time::Duration; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::time::sleep; use utils::cleanup_enclave_disk_and_package; pub use crate::config::HostConfig; pub use crate::data::HostResources; use global::DAEMON_CONFIG_PATH; use global::DEPLOYED_APPS_CONFIG_DIR; #[derive(Debug)] pub struct AppHandler { pub receiver: Receiver, pub sender: Sender, pub host_config: HostConfig, pub host_resource: HostResources, } impl AppHandler { pub fn new(receiver: Receiver, sender: Sender) -> Self { // TODO: load from config and resources from file, // if not found use default and save host resources to file let host_config = match HostConfig::load_from_disk(&DAEMON_CONFIG_PATH) { Ok(config) => config, Err(e) => panic!("Could not load config: {e:?}"), }; let host_resource = match HostResources::load_from_disk() { Ok(res) => res, Err(e) => { warn!("Could not load resources from disk: {e:?}"); info!("Creating new resource calculator."); HostResources::new() } }; Self { receiver, sender, host_config, host_resource, } } async fn handle_contracts(&mut self, contracts: Vec) { let apps_in_host = self.host_resource.existing_apps.clone(); let apps_in_brain: HashSet = contracts .iter() .map(|contact| contact.uuid.clone()) .collect(); let deleted_apps: HashSet = apps_in_host.difference(&apps_in_brain).cloned().collect(); for uuid in deleted_apps { if let Err(e) = self.handle_del_app_req(uuid.clone()).await { log::error!("Failed to delete app: {e}"); } } } async fn run(mut self) { sleep(Duration::from_millis(500)).await; self.send_node_resources().await; while let Some(brain_msg) = self.receiver.recv().await { match brain_msg.msg { Some(brain_message_app::Msg::NewAppReq(msg)) => { self.handle_new_app_req(msg.into()).await; } Some(brain_message_app::Msg::DeleteAppReq(msg)) => { if let Err(er) = self.handle_del_app_req(msg.uuid).await { log::error!("Failed to delete app: {er}"); } self.send_node_resources().await; } None => { log::error!("Brain disconnected"); break; } } } } async fn handle_new_app_req(&mut self, new_app_req: AppDeployConfig) { let uuid = new_app_req.uuid.clone(); let app_result = App::new(new_app_req, &self.host_config, &mut self.host_resource).await; match app_result { Ok(app) => { self.send_node_resources().await; info!("Succesfully started App {uuid}"); let res = NewAppRes { uuid, status: "success".to_string(), error: "".to_string(), mapped_ports: app.mapped_ports.into_iter().map(MappedPort::from).collect(), ip_address: self.host_config.host_ip_address.clone(), }; let _ = self.sender.send(res.into()).await; } Err(e) => { let res = NewAppRes { uuid, status: "failed".to_string(), error: e.to_string(), ..Default::default() }; log::error!("sending response {:?}", res); let _ = self.sender.send(res.into()).await; } }; } async fn handle_del_app_req(&mut self, uuid: String) -> Result<()> { let app_handle_file_name = format!("{}/{}.yaml", *DEPLOYED_APPS_CONFIG_DIR, &uuid); let content = std::fs::read_to_string(&app_handle_file_name)?; let app_instance: App = serde_yml::from_str(&content)?; app_instance.delete_app(&mut self.host_resource).await?; std::fs::remove_file(&app_handle_file_name)?; if let Err(er) = cleanup_enclave_disk_and_package(uuid).await { log::error!("Failed to cleanup disk:\n{er}"); }; Ok(()) } async fn send_node_resources(&mut self) { let host_config = self.host_config.clone(); let host_resource = self.host_resource.clone(); let node_pubkey = PUBLIC_KEY.to_string(); let avail_no_of_port = (host_config.public_port_range.len() - host_resource.reserved_host_ports.len()) as u32; let avail_vcpus = host_config.max_vcpu_reservation - host_resource.reserved_vcpus; let avail_memory_mb = host_config.max_mem_reservation_mb - host_resource.reserved_memory_mb; let avail_storage_mb = host_config.max_disk_reservation_mb - host_resource.reserved_disk_mb; let max_ports_per_app = host_config.max_ports_per_app; let resource_update = AppNodeResources { node_pubkey, avail_no_of_port, avail_vcpus, avail_memory_mb, avail_storage_mb, max_ports_per_app, }; log::debug!("sending node resources on brain: {resource_update:?}"); let _ = self.sender.send(resource_update.into()).await; } } #[tokio::main] async fn main() -> Result<(), Box> { set_logging(); log::info!("Detee daemon running"); loop { if std::env::var("DAEMON_AUTO_UPGRADE") != Ok("OFF".to_string()) { // This upgrade procedure will get replaced in prod. We need this for the testnet. if let Err(e) = download_and_replace_binary().await { log::error!("Failed to upgrade detee-sgx-daemon to newer version: {e}"); } } let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6); let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6); let mut app_handler = AppHandler::new(brain_msg_rx, daemon_msg_tx.clone()); let network = app_handler.host_config.network.clone(); let mut contracts = vec![]; match grpc::register_node(&app_handler.host_config).await { Ok(app_contracts) => { contracts.append(&mut app_contracts.iter().map(|c| c.uuid.clone()).collect()); app_handler.handle_contracts(app_contracts).await } Err(e) => log::error!("Failed to connect to brain: {e}"), } tokio::spawn(async move { app_handler.run().await; }); log::info!("Connecting to brain..."); if let Err(e) = grpc::connect_and_run(grpc::ConnectionData { network, brain_msg_tx, daemon_msg_rx, daemon_msg_tx, app_contracts_uuid: contracts, }) .await { log::error!("The connection broke: {e}"); } sleep(Duration::from_secs(3)).await; } } fn set_logging() { let log_level = match std::env::var("LOG_LEVEL") { Ok(val) => match val.as_str() { "DEBUG" => log::LevelFilter::Debug, "INFO" => log::LevelFilter::Info, _ => log::LevelFilter::Error, }, _ => log::LevelFilter::Warn, }; env_logger::builder() .filter_level(log_level) .format_timestamp(None) .init(); } async fn download_and_replace_binary() -> Result<()> { use reqwest::get; use std::os::unix::fs::PermissionsExt; const TMP_DAEMON: &str = "/usr/local/bin/detee/new-daemon"; const BINARY: &str = "/usr/local/bin/detee-sgx-daemon"; let response = get("https://registry.detee.ltd/sgx/daemon/detee-sgx-daemon").await?; if !response.status().is_success() { return Err(anyhow!("Failed to download file: {}", response.status())); } let mut tmp_file = File::create(Path::new(&TMP_DAEMON))?; std::io::copy(&mut response.bytes().await?.as_ref(), &mut tmp_file)?; let new_hash = crate::global::compute_sha256(TMP_DAEMON)?; let old_hash = crate::global::compute_sha256(BINARY)?; log::debug!("Old binary hash: {old_hash}. New binary hash: {new_hash}"); if new_hash != old_hash { std::fs::rename(BINARY, BINARY.to_string() + "_BACKUP")?; std::fs::rename(TMP_DAEMON, BINARY)?; std::fs::set_permissions(BINARY, std::fs::Permissions::from_mode(0o775))?; std::process::exit(0); } Ok(()) }