sgx-daemon/src/main.rs
Noor a0ba0003aa
Fix: enclave archive directory
update enclaves paths
daemon for staging network
enhanced docker command formatting
2025-04-21 18:34:59 +03:00

274 lines
9.3 KiB
Rust

pub mod config;
pub mod container;
pub mod data;
pub mod global;
pub mod grpc;
pub mod utils;
use anyhow::anyhow;
use anyhow::Result;
use data::App;
use detee_shared::app_proto::{
brain_message_app, AppContract, AppNodeResources, BrainMessageApp, DaemonMessageApp,
MappedPort, NewAppRes,
};
use detee_shared::sgx::types::brain::AppDeployConfig;
use global::PUBLIC_KEY;
use log::info;
use log::warn;
use std::collections::HashSet;
use std::fs::File;
use std::path::Path;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
use utils::cleanup_enclave_disk_and_package;
pub use crate::config::HostConfig;
pub use crate::data::HostResources;
use global::DAEMON_CONFIG_PATH;
use global::DEPLOYED_APPS_CONFIG_DIR;
#[derive(Debug)]
pub struct AppHandler {
pub receiver: Receiver<BrainMessageApp>,
pub sender: Sender<DaemonMessageApp>,
pub host_config: HostConfig,
pub host_resource: HostResources,
}
impl AppHandler {
pub fn new(receiver: Receiver<BrainMessageApp>, sender: Sender<DaemonMessageApp>) -> Self {
// TODO: load from config and resources from file,
// if not found use default and save host resources to file
let host_config = match HostConfig::load_from_disk(&DAEMON_CONFIG_PATH) {
Ok(config) => config,
Err(e) => panic!("Could not load config: {e:?}"),
};
let host_resource = match HostResources::load_from_disk() {
Ok(res) => res,
Err(e) => {
warn!("Could not load resources from disk: {e:?}");
info!("Creating new resource calculator.");
HostResources::new()
}
};
Self {
receiver,
sender,
host_config,
host_resource,
}
}
async fn handle_contracts(&mut self, contracts: Vec<AppContract>) {
let apps_in_host = self.host_resource.existing_apps.clone();
let apps_in_brain: HashSet<String> = contracts
.iter()
.map(|contact| contact.uuid.clone())
.collect();
let deleted_apps: HashSet<String> =
apps_in_host.difference(&apps_in_brain).cloned().collect();
for uuid in deleted_apps {
if let Err(e) = self.handle_del_app_req(uuid.clone()).await {
log::error!("Failed to delete app: {e}");
}
}
}
async fn run(mut self) {
sleep(Duration::from_millis(500)).await;
self.send_node_resources().await;
while let Some(brain_msg) = self.receiver.recv().await {
match brain_msg.msg {
Some(brain_message_app::Msg::NewAppReq(msg)) => {
self.handle_new_app_req(msg.into()).await;
}
Some(brain_message_app::Msg::DeleteAppReq(msg)) => {
if let Err(er) = self.handle_del_app_req(msg.uuid).await {
log::error!("Failed to delete app: {er}");
}
self.send_node_resources().await;
}
None => {
log::error!("Brain disconnected");
break;
}
}
}
}
async fn handle_new_app_req(&mut self, new_app_req: AppDeployConfig) {
let uuid = new_app_req.uuid.clone();
let app_result = App::new(new_app_req, &self.host_config, &mut self.host_resource).await;
match app_result {
Ok(app) => {
self.send_node_resources().await;
info!("Succesfully started App {uuid}");
let res = NewAppRes {
uuid,
status: "success".to_string(),
error: "".to_string(),
mapped_ports: app.mapped_ports.into_iter().map(MappedPort::from).collect(),
ip_address: self.host_config.host_ip_address.clone(),
};
let _ = self.sender.send(res.into()).await;
}
Err(e) => {
let res = NewAppRes {
uuid,
status: "failed".to_string(),
error: e.to_string(),
..Default::default()
};
log::error!("sending response {:?}", res);
let _ = self.sender.send(res.into()).await;
}
};
}
async fn handle_del_app_req(&mut self, uuid: String) -> Result<()> {
let app_handle_file_name = format!("{}/{}.yaml", *DEPLOYED_APPS_CONFIG_DIR, &uuid);
let content = std::fs::read_to_string(&app_handle_file_name)?;
let app_instance: App = serde_yml::from_str(&content)?;
app_instance.delete_app(&mut self.host_resource).await?;
std::fs::remove_file(&app_handle_file_name)?;
if let Err(er) = cleanup_enclave_disk_and_package(uuid).await {
log::error!("Failed to cleanup disk:\n{er}");
};
Ok(())
}
async fn send_node_resources(&mut self) {
let host_config = self.host_config.clone();
let host_resource = self.host_resource.clone();
let node_pubkey = PUBLIC_KEY.to_string();
let avail_no_of_port =
(host_config.public_port_range.len() - host_resource.reserved_host_ports.len()) as u32;
let avail_vcpus = host_config.max_vcpu_reservation - host_resource.reserved_vcpus;
let avail_memory_mb = host_config.max_mem_reservation_mb - host_resource.reserved_memory_mb;
let avail_storage_mb = host_config.max_disk_reservation_mb - host_resource.reserved_disk_mb;
let max_ports_per_app = host_config.max_ports_per_app;
let resource_update = AppNodeResources {
node_pubkey,
avail_no_of_port,
avail_vcpus,
avail_memory_mb,
avail_storage_mb,
max_ports_per_app,
};
log::debug!("sending node resources on brain: {resource_update:?}");
let _ = self.sender.send(resource_update.into()).await;
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
set_logging();
log::info!("Detee daemon running");
loop {
let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6);
let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6);
let mut app_handler = AppHandler::new(brain_msg_rx, daemon_msg_tx.clone());
let network = app_handler.host_config.network.clone();
if std::env::var("DAEMON_AUTO_UPGRADE") != Ok("OFF".to_string()) {
// This upgrade procedure will get replaced in prod. We need this for the testnet.
if let Err(e) = download_and_replace_binary(&network).await {
log::error!("Failed to upgrade detee-sgx-daemon to newer version: {e}");
}
}
let mut contracts = vec![];
match grpc::register_node(&app_handler.host_config).await {
Ok(app_contracts) => {
contracts.append(&mut app_contracts.iter().map(|c| c.uuid.clone()).collect());
app_handler.handle_contracts(app_contracts).await
}
Err(e) => log::error!("Failed to connect to brain: {e}"),
}
tokio::spawn(async move {
app_handler.run().await;
});
log::info!("Connecting to brain...");
if let Err(e) = grpc::connect_and_run(grpc::ConnectionData {
network,
brain_msg_tx,
daemon_msg_rx,
daemon_msg_tx,
app_contracts_uuid: contracts,
})
.await
{
log::error!("The connection broke: {e}");
}
sleep(Duration::from_secs(3)).await;
}
}
fn set_logging() {
let log_level = match std::env::var("LOG_LEVEL") {
Ok(val) => match val.as_str() {
"DEBUG" => log::LevelFilter::Debug,
"INFO" => log::LevelFilter::Info,
_ => log::LevelFilter::Error,
},
_ => log::LevelFilter::Warn,
};
env_logger::builder()
.filter_level(log_level)
.format_timestamp(None)
.init();
}
async fn download_and_replace_binary(network: &str) -> Result<()> {
use reqwest::get;
use std::os::unix::fs::PermissionsExt;
const TMP_DAEMON: &str = "/usr/local/bin/detee/new-daemon";
const BINARY: &str = "/usr/local/bin/detee-sgx-daemon";
let daemon_url = if network == "testnet" {
"https://registry.detee.ltd/sgx/daemon/detee-sgx-daemon"
} else {
"https://registry.detee.ltd/sgx/daemon_staging/detee-sgx-daemon"
};
let response = get(daemon_url).await?;
if !response.status().is_success() {
return Err(anyhow!("Failed to download file: {}", response.status()));
}
let mut tmp_file = File::create(Path::new(&TMP_DAEMON))?;
std::io::copy(&mut response.bytes().await?.as_ref(), &mut tmp_file)?;
let new_hash = crate::global::compute_sha256(TMP_DAEMON)?;
let old_hash = crate::global::compute_sha256(BINARY)?;
log::debug!("Old binary hash: {old_hash}. New binary hash: {new_hash}");
if new_hash != old_hash {
std::fs::rename(BINARY, BINARY.to_string() + "_BACKUP")?;
std::fs::rename(TMP_DAEMON, BINARY)?;
std::fs::set_permissions(BINARY, std::fs::Permissions::from_mode(0o775))?;
std::process::exit(0);
}
Ok(())
}