diff --git a/Cargo.lock b/Cargo.lock index 22ac164..407871b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,6 +280,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "serde_yml", "tar", "tokio", "tokio-stream", @@ -290,7 +291,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared?branch=stable_01#df3a4631dd919e0cb35f6238f91f262c999c93b8" +source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared?branch=stable_01#fce57884937a4ec02acbf2f5b370ab879b1af657" dependencies = [ "base64", "prost", diff --git a/Cargo.toml b/Cargo.toml index 373e0c8..deb58a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ detee-shared = { git = "ssh://git@gitea.detee.cloud/noormohammedb/detee-shared", log = "0.4.25" serde = "1.0.217" serde_json = "1.0.138" +serde_yml = "0.0.12" [build-dependencies] tonic-build = "0.12.3" diff --git a/src/config.rs b/src/config.rs index afbbcf3..ff4fcbd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,12 +1,43 @@ #[derive(Debug)] -pub struct Config { +pub struct HostConfig { pub brain_url: String, + pub host_ip_address: String, + pub owner_wallet: String, + pub max_cores_per_app: u32, + pub max_vcpu_reservation: u32, + pub max_mem_reservation_mb: u32, + pub max_ports_per_app: u16, + // price per unit per minute + pub price: u64, + + pub delete_archive: bool, } -impl Default for Config { +impl Default for HostConfig { fn default() -> Self { + // TODO: load from config file let brain_url = std::env::var("BRAIN_URL").unwrap_or_else(|_| "http://127.0.0.1:31337".to_string()); - Self { brain_url } + let owner_wallet = "0x".to_string(); + let host_ip_address = "127.0.0.1".to_string(); + + let max_cores_per_app = 1; + let max_vcpu_reservation = 8; + let max_mem_reservation_mb = 8192; + let max_ports_per_app = 9; + let price = 0; + + Self { + brain_url, + host_ip_address, + owner_wallet, + max_cores_per_app, + max_ports_per_app, + max_vcpu_reservation, + max_mem_reservation_mb, + price, + + delete_archive: true, + } } } diff --git a/src/container.rs b/src/container.rs index db7b753..137f542 100644 --- a/src/container.rs +++ b/src/container.rs @@ -1,16 +1,11 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use std::process::Command; -use crate::utils::prepare_port_map; - pub async fn deploy_enclave( enclave_path: &str, container_name_uuid: String, - publishing_ports: Vec, - // ... -) -> Result> { - let port_map = prepare_port_map(publishing_ports).await; - + port_map: Vec<(u16, u16)>, +) -> Result { let port_maping_string = port_map .iter() .map(|(host, container)| format!("-p {host}:{container}")) @@ -23,12 +18,17 @@ pub async fn deploy_enclave( --device /dev/sgx/enclave --device /dev/sgx/provision {port_maping_string} noormohammedb/occlum-enclave:v1"# ); - let _child = Command::new("sh") + let mut child = Command::new("sh") .arg("-c") .arg(docker_deploy_str) .spawn()?; - Ok(port_map) + let exit = child.wait()?; + let exit_code = exit + .code() + .ok_or(anyhow!("No exit code, process terminated by a signal"))?; + + Ok(exit_code) } pub fn delete_enclave(container_name_uuid: String) -> Result<()> { diff --git a/src/data.rs b/src/data.rs index 4020968..035b338 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,56 +1,115 @@ use anyhow::{anyhow, Result}; use detee_shared::types::brain::AppDeployConfig; use detee_shared::types::brain::Resource as ResourceConfig; +use std::collections::HashSet; +use std::fs::File; +use std::io::Write; + +use serde::{Deserialize, Serialize}; -use crate::container::delete_enclave; use crate::container::deploy_enclave; +use crate::global::APP_CONFIG_DIR; +use crate::global::APP_NAME_PREFIX; +use crate::utils::handle_package; +use crate::utils::prepare_port_map; +use crate::HostConfig; #[derive(Debug, Default)] -pub struct DaemonState { - pub containers: Vec, +pub struct HostResources { + pub existing_apps: HashSet, + pub reserved_vcpus: u32, + pub reserved_memory: u32, + pub reserved_host_ports: HashSet, } -#[derive(Debug, Default)] -pub struct Container { +impl HostResources { + // TODO: implement load and save +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct App { pub uuid: String, pub name: String, pub package_path: String, pub status: String, pub admin: String, - pub container_resource: ResourceConfig, + pub app_resource: ResourceConfig, pub mapped_ports: Vec<(u16, u16)>, } -impl DaemonState { - pub async fn create_new_container( - &mut self, - req_data: AppDeployConfig, - unarchive_dir: String, - ) -> Result> { - let publishing_ports = req_data.resource.clone().port; - let uuid = req_data.uuid; - let container_name = format!("dtpm-{uuid}"); - let mapped_ports = - deploy_enclave(&unarchive_dir, container_name.clone(), publishing_ports).await?; +impl App { + pub async fn new( + new_app_req: AppDeployConfig, + host_config: &HostConfig, + host_resource: &mut HostResources, + ) -> Result { + let app_uuid = new_app_req.uuid.clone(); - let container = Container { - uuid, - name: container_name, + if host_config.max_cores_per_app < new_app_req.resource.vcpu { + return Err(anyhow!("too many vcpus for app")); + } + if host_config.max_vcpu_reservation + < host_resource + .reserved_vcpus + .saturating_add(new_app_req.resource.vcpu) + { + return Err(anyhow!("vcpus not available")); + } + if host_config.max_mem_reservation_mb + < host_resource + .reserved_memory + .saturating_add(new_app_req.resource.memory_mb) + { + return Err(anyhow!("not enough memory available")); + } + if new_app_req.resource.disk_mb < 128 { + return Err(anyhow!("disk too small")); + } + + let package_url = new_app_req.package_url.clone(); + let mapped_ports = prepare_port_map(new_app_req.resource.port.clone()).await; + let app_name = format!("{APP_NAME_PREFIX}-{app_uuid}"); + + let unarchive_dir = + handle_package(package_url, app_uuid.clone(), host_config.delete_archive).await?; + + let exit_code = + deploy_enclave(&unarchive_dir, app_name.clone(), mapped_ports.clone()).await?; + + if exit_code != 0 { + // TODO: cleanup unarchive_dir + return Err(anyhow!("Failed to deploy enclave")); + } + + let app_instance = Self { + uuid: app_uuid, + name: app_name, package_path: unarchive_dir, status: "running".to_string(), - admin: req_data.admin_pubkey, - container_resource: req_data.resource, - mapped_ports: mapped_ports.clone(), + admin: new_app_req.admin_pubkey, + app_resource: new_app_req.resource, + mapped_ports, }; + app_instance.write_config()?; + host_resource + .existing_apps + .insert(app_instance.uuid.clone()); - self.containers.push(container); + Ok(app_instance) + } - Ok(mapped_ports) + fn write_config(&self) -> Result<()> { + let mut file = File::create(APP_CONFIG_DIR.to_string() + &self.uuid + ".yaml")?; + file.write_all(serde_yml::to_string(self)?.as_bytes())?; + Ok(()) } pub async fn delete_container(&mut self, container_uuid: String) -> Result<()> { + let _ = container_uuid; + // TODO: implement delete + /* let Some(container_position) = self - .containers + .existing_apps .iter() .position(|c| c.uuid == container_uuid) else { @@ -60,10 +119,11 @@ impl DaemonState { let container = &self.containers[container_position]; - let container_name = format!("dtpm-{}", container.uuid); + let container_name = format!("{APP_NAME_PREFIX}-{}", container.uuid); delete_enclave(container_name)?; self.containers.remove(container_position); + */ Ok(()) } diff --git a/src/global.rs b/src/global.rs index 6782083..317be11 100644 --- a/src/global.rs +++ b/src/global.rs @@ -4,3 +4,7 @@ pub const ADMIN_PUBKEY: &str = "0x28a3a71197250b0fa4dd0f86288e07ec9cc78ce3338e21 pub const PACKAGE_ARCHIVE_POSTFIX: &str = "-enclave_packager.tar.gz"; pub const PACKAGE_ARCHIVE_DIR_PATH: &str = "./enclave_archives"; pub const PACKAGE_DIR_PATH: &str = "./enclaves"; + +pub const APP_NAME_PREFIX: &str = "dtpm"; +// TODO: handle this home directory properly +pub const APP_CONFIG_DIR: &str = "~/.dtpm/app_daemon/"; diff --git a/src/grpc.rs b/src/grpc.rs index 47ca0c2..42f5fad 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,6 +1,5 @@ use anyhow::Result; use detee_shared::pb::brain::brain_app_daemon_client::BrainAppDaemonClient; -use detee_shared::pb::brain::daemon_message_app; use detee_shared::pb::brain::{ AppContract, BrainMessageApp, DaemonMessageApp, Pubkey, RegisterAppNodeReq, }; @@ -21,7 +20,7 @@ pub struct ConnectionData { pub daemon_msg_tx: Sender, } -pub async fn register_node(config: &crate::Config) -> Result> { +pub async fn register_node(config: &crate::HostConfig) -> Result> { let mut client = BrainAppDaemonClient::connect(config.brain_url.clone()).await?; log::debug!("registering node with brain"); @@ -105,10 +104,8 @@ pub async fn send_messages( let pubkey = NODE_PUBKEY.to_string(); let rx_stream = ReceiverStream::new(rx); - tx.send(DaemonMessageApp { - msg: Some(daemon_message_app::Msg::Pubkey(pubkey)), - }) - .await?; + + tx.send(pubkey.into()).await?; client.daemon_messages(rx_stream).await?; log::debug!("daemon_messages is about to exit"); Ok(()) diff --git a/src/main.rs b/src/main.rs index 92eed10..bce6a4e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,40 +5,43 @@ pub mod global; pub mod grpc; pub mod utils; +use data::App; use detee_shared::pb::brain::brain_message_app; -use detee_shared::pb::brain::daemon_message_app; use detee_shared::pb::brain::AppContract; use detee_shared::pb::brain::BrainMessageApp; use detee_shared::pb::brain::DaemonMessageApp; use detee_shared::pb::brain::MappedPort; use detee_shared::pb::brain::NewAppRes; use detee_shared::types::brain::AppDeployConfig; +use log::info; use std::time::Duration; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::time::sleep; use utils::cleanup_enclave_disk_and_package; -use utils::handle_package; -pub use crate::config::Config; -pub use crate::data::DaemonState; +pub use crate::config::HostConfig; +pub use crate::data::HostResources; #[derive(Debug)] pub struct AppHandler { pub receiver: Receiver, pub sender: Sender, - pub config: Config, - pub data: DaemonState, + pub host_config: HostConfig, + pub host_resource: HostResources, } impl AppHandler { pub fn new(receiver: Receiver, sender: Sender) -> Self { + // TODO: load from config and resources from file, + // if not found use default and save host resources to file + Self { receiver, sender, - config: Config::default(), - data: DaemonState::default(), + host_config: HostConfig::default(), + host_resource: HostResources::default(), } } @@ -50,11 +53,11 @@ impl AppHandler { while let Some(brain_msg) = self.receiver.recv().await { match brain_msg.msg { Some(brain_message_app::Msg::NewAppReq(msg)) => { - self.handle_new_container_req(msg.into()).await; + self.handle_new_app_req(msg.into()).await; } Some(brain_message_app::Msg::DeleteAppReq(msg)) => { let app_id = msg.uuid; - self.handle_del_container_req(app_id).await; + self.handle_del_app_req(app_id).await; } None => { log::error!("Brain disconnected"); @@ -64,76 +67,50 @@ impl AppHandler { } } - async fn handle_new_container_req(&mut self, new_container_req: AppDeployConfig) { - let container_uuid = new_container_req.uuid.clone(); + async fn handle_new_app_req(&mut self, new_app_req: AppDeployConfig) { + let uuid = new_app_req.uuid.clone(); + let app_result = App::new(new_app_req, &self.host_config, &mut self.host_resource).await; - let unarchive_dir = match handle_package( - new_container_req.package_url.clone(), - container_uuid.clone(), - ) - .await - { - Ok(unarchive_dir) => unarchive_dir, + match app_result { + Ok(app) => { + // TODO: update host resources to brain + self.send_node_resources().await; + + info!("Succesfully started VM {uuid}"); + let res = NewAppRes { + uuid, + status: "success".to_string(), + error: "".to_string(), + mapped_ports: app.mapped_ports.into_iter().map(MappedPort::from).collect(), + ip_address: self.host_config.host_ip_address.clone(), + }; + let _ = self.sender.send(res.into()).await; + } Err(e) => { - let res = DaemonMessageApp { - msg: Some(daemon_message_app::Msg::NewAppRes(NewAppRes { - uuid: new_container_req.uuid, - status: "failed".to_string(), - error: e.to_string(), - ..Default::default() - })), + let res = NewAppRes { + uuid, + status: "failed".to_string(), + error: e.to_string(), + ..Default::default() }; - println!("sending response {:?}", res); - let _ = self.sender.send(res).await; - return; + log::error!("sending response {:?}", res); + let _ = self.sender.send(res.into()).await; } }; - - let mapped_ports = match self - .data - .create_new_container(new_container_req, unarchive_dir) - .await - { - Ok(mapped_ports) => mapped_ports.into_iter().map(MappedPort::from).collect(), - Err(e) => { - let res = DaemonMessageApp { - msg: Some(daemon_message_app::Msg::NewAppRes(NewAppRes { - uuid: container_uuid, - status: "failed".to_string(), - error: e.to_string(), - ..Default::default() - })), - }; - - println!("sending response {:?}", res); - let _ = self.sender.send(res).await; - return; - } - }; - - let res = DaemonMessageApp { - msg: Some(daemon_message_app::Msg::NewAppRes(NewAppRes { - uuid: container_uuid, - status: "Success".to_string(), - error: "".to_string(), - ip_address: "".to_string(), - mapped_ports, - })), - }; - - println!("sending response {:?}", res); - let _ = self.sender.send(res).await; } - async fn handle_del_container_req(&mut self, container_uuid: String) { - if let Err(e) = self.data.delete_container(container_uuid.clone()).await { - log::error!("Failed to delete container:\n{e}"); - } + async fn handle_del_app_req(&mut self, container_uuid: String) { + // if let Err(e) = self.data.delete_container(container_uuid.clone()).await { log::error!("Failed to delete container:\n{e}"); } + if let Err(er) = cleanup_enclave_disk_and_package(container_uuid).await { log::error!("Failed to cleanup disk:\n{er}"); }; } + + async fn send_node_resources(&self) { + // TODO: send host resources to brain + } } #[tokio::main] @@ -145,17 +122,17 @@ async fn main() -> Result<(), Box> { let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6); let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6); - let mut container_handler = AppHandler::new(brain_msg_rx, daemon_msg_tx.clone()); - let brain_url = container_handler.config.brain_url.clone(); + let mut app_handler = AppHandler::new(brain_msg_rx, daemon_msg_tx.clone()); + let brain_url = app_handler.host_config.brain_url.clone(); - match grpc::register_node(&container_handler.config).await { - Ok(container_contracts) => container_handler.handle_contracts(container_contracts), + match grpc::register_node(&app_handler.host_config).await { + Ok(app_contracts) => app_handler.handle_contracts(app_contracts), Err(e) => log::error!("Failed to connect to brain: {e}"), } tokio::spawn(async move { - container_handler.run().await; + app_handler.run().await; }); log::info!("Connecting to brain..."); diff --git a/src/utils.rs b/src/utils.rs index c50e2ac..f17ff30 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -14,7 +14,11 @@ use crate::global::{PACKAGE_ARCHIVE_DIR_PATH, PACKAGE_ARCHIVE_POSTFIX, PACKAGE_D pub static IP_INFO: LazyLock = LazyLock::new(|| get_ip_info().unwrap()); -pub async fn handle_package(package_url: String, container_uuid: String) -> Result { +pub async fn handle_package( + package_url: String, + container_uuid: String, + delete_archive: bool, +) -> Result { let dir_path = Path::new(PACKAGE_ARCHIVE_DIR_PATH); fs::create_dir_all(dir_path).await?; @@ -25,7 +29,7 @@ pub async fn handle_package(package_url: String, container_uuid: String) -> Resu return Err(anyhow!("Error downloading file")); } - let downloaded_file = std::fs::File::open(file_path)?; + let downloaded_file = std::fs::File::open(&file_path)?; let mut reader = BufReader::new(downloaded_file); let mut archive = Archive::new(GzDecoder::new(&mut reader)); @@ -38,6 +42,10 @@ pub async fn handle_package(package_url: String, container_uuid: String) -> Resu fs::create_dir_all(Path::new(&unarchive_dir)).await?; archive.unpack(&unarchive_dir)?; + if delete_archive { + let _ = fs::remove_file(file_path).await; + } + Ok(unarchive_dir) }