Refactor daemon state

configuring to make it persistance
changing code architecture to same node state into disk
This commit is contained in:
Noor 2025-02-07 01:25:05 +05:30
parent 83fa4728c6
commit 557376dc08
Signed by: noormohammedb
GPG Key ID: D83EFB8B3B967146
9 changed files with 201 additions and 122 deletions

3
Cargo.lock generated

@ -280,6 +280,7 @@ dependencies = [
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"serde_yml",
"tar", "tar",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
@ -290,7 +291,7 @@ dependencies = [
[[package]] [[package]]
name = "detee-shared" name = "detee-shared"
version = "0.1.0" version = "0.1.0"
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared?branch=stable_01#df3a4631dd919e0cb35f6238f91f262c999c93b8" source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared?branch=stable_01#fce57884937a4ec02acbf2f5b370ab879b1af657"
dependencies = [ dependencies = [
"base64", "base64",
"prost", "prost",

@ -21,6 +21,7 @@ detee-shared = { git = "ssh://git@gitea.detee.cloud/noormohammedb/detee-shared",
log = "0.4.25" log = "0.4.25"
serde = "1.0.217" serde = "1.0.217"
serde_json = "1.0.138" serde_json = "1.0.138"
serde_yml = "0.0.12"
[build-dependencies] [build-dependencies]
tonic-build = "0.12.3" tonic-build = "0.12.3"

@ -1,12 +1,43 @@
#[derive(Debug)] #[derive(Debug)]
pub struct Config { pub struct HostConfig {
pub brain_url: String, pub brain_url: String,
pub host_ip_address: String,
pub owner_wallet: String,
pub max_cores_per_app: u32,
pub max_vcpu_reservation: u32,
pub max_mem_reservation_mb: u32,
pub max_ports_per_app: u16,
// price per unit per minute
pub price: u64,
pub delete_archive: bool,
} }
impl Default for Config { impl Default for HostConfig {
fn default() -> Self { fn default() -> Self {
// TODO: load from config file
let brain_url = let brain_url =
std::env::var("BRAIN_URL").unwrap_or_else(|_| "http://127.0.0.1:31337".to_string()); std::env::var("BRAIN_URL").unwrap_or_else(|_| "http://127.0.0.1:31337".to_string());
Self { brain_url } let owner_wallet = "0x".to_string();
let host_ip_address = "127.0.0.1".to_string();
let max_cores_per_app = 1;
let max_vcpu_reservation = 8;
let max_mem_reservation_mb = 8192;
let max_ports_per_app = 9;
let price = 0;
Self {
brain_url,
host_ip_address,
owner_wallet,
max_cores_per_app,
max_ports_per_app,
max_vcpu_reservation,
max_mem_reservation_mb,
price,
delete_archive: true,
}
} }
} }

@ -1,16 +1,11 @@
use anyhow::Result; use anyhow::{anyhow, Result};
use std::process::Command; use std::process::Command;
use crate::utils::prepare_port_map;
pub async fn deploy_enclave( pub async fn deploy_enclave(
enclave_path: &str, enclave_path: &str,
container_name_uuid: String, container_name_uuid: String,
publishing_ports: Vec<u32>, port_map: Vec<(u16, u16)>,
// ... ) -> Result<i32> {
) -> Result<Vec<(u16, u16)>> {
let port_map = prepare_port_map(publishing_ports).await;
let port_maping_string = port_map let port_maping_string = port_map
.iter() .iter()
.map(|(host, container)| format!("-p {host}:{container}")) .map(|(host, container)| format!("-p {host}:{container}"))
@ -23,12 +18,17 @@ pub async fn deploy_enclave(
--device /dev/sgx/enclave --device /dev/sgx/provision {port_maping_string} noormohammedb/occlum-enclave:v1"# --device /dev/sgx/enclave --device /dev/sgx/provision {port_maping_string} noormohammedb/occlum-enclave:v1"#
); );
let _child = Command::new("sh") let mut child = Command::new("sh")
.arg("-c") .arg("-c")
.arg(docker_deploy_str) .arg(docker_deploy_str)
.spawn()?; .spawn()?;
Ok(port_map) let exit = child.wait()?;
let exit_code = exit
.code()
.ok_or(anyhow!("No exit code, process terminated by a signal"))?;
Ok(exit_code)
} }
pub fn delete_enclave(container_name_uuid: String) -> Result<()> { pub fn delete_enclave(container_name_uuid: String) -> Result<()> {

@ -1,56 +1,115 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use detee_shared::types::brain::AppDeployConfig; use detee_shared::types::brain::AppDeployConfig;
use detee_shared::types::brain::Resource as ResourceConfig; use detee_shared::types::brain::Resource as ResourceConfig;
use std::collections::HashSet;
use std::fs::File;
use std::io::Write;
use serde::{Deserialize, Serialize};
use crate::container::delete_enclave;
use crate::container::deploy_enclave; use crate::container::deploy_enclave;
use crate::global::APP_CONFIG_DIR;
use crate::global::APP_NAME_PREFIX;
use crate::utils::handle_package;
use crate::utils::prepare_port_map;
use crate::HostConfig;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct DaemonState { pub struct HostResources {
pub containers: Vec<Container>, pub existing_apps: HashSet<String>,
pub reserved_vcpus: u32,
pub reserved_memory: u32,
pub reserved_host_ports: HashSet<u16>,
} }
#[derive(Debug, Default)] impl HostResources {
pub struct Container { // TODO: implement load and save
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct App {
pub uuid: String, pub uuid: String,
pub name: String, pub name: String,
pub package_path: String, pub package_path: String,
pub status: String, pub status: String,
pub admin: String, pub admin: String,
pub container_resource: ResourceConfig, pub app_resource: ResourceConfig,
pub mapped_ports: Vec<(u16, u16)>, pub mapped_ports: Vec<(u16, u16)>,
} }
impl DaemonState { impl App {
pub async fn create_new_container( pub async fn new(
&mut self, new_app_req: AppDeployConfig,
req_data: AppDeployConfig, host_config: &HostConfig,
unarchive_dir: String, host_resource: &mut HostResources,
) -> Result<Vec<(u16, u16)>> { ) -> Result<Self> {
let publishing_ports = req_data.resource.clone().port; let app_uuid = new_app_req.uuid.clone();
let uuid = req_data.uuid;
let container_name = format!("dtpm-{uuid}");
let mapped_ports =
deploy_enclave(&unarchive_dir, container_name.clone(), publishing_ports).await?;
let container = Container { if host_config.max_cores_per_app < new_app_req.resource.vcpu {
uuid, return Err(anyhow!("too many vcpus for app"));
name: container_name, }
if host_config.max_vcpu_reservation
< host_resource
.reserved_vcpus
.saturating_add(new_app_req.resource.vcpu)
{
return Err(anyhow!("vcpus not available"));
}
if host_config.max_mem_reservation_mb
< host_resource
.reserved_memory
.saturating_add(new_app_req.resource.memory_mb)
{
return Err(anyhow!("not enough memory available"));
}
if new_app_req.resource.disk_mb < 128 {
return Err(anyhow!("disk too small"));
}
let package_url = new_app_req.package_url.clone();
let mapped_ports = prepare_port_map(new_app_req.resource.port.clone()).await;
let app_name = format!("{APP_NAME_PREFIX}-{app_uuid}");
let unarchive_dir =
handle_package(package_url, app_uuid.clone(), host_config.delete_archive).await?;
let exit_code =
deploy_enclave(&unarchive_dir, app_name.clone(), mapped_ports.clone()).await?;
if exit_code != 0 {
// TODO: cleanup unarchive_dir
return Err(anyhow!("Failed to deploy enclave"));
}
let app_instance = Self {
uuid: app_uuid,
name: app_name,
package_path: unarchive_dir, package_path: unarchive_dir,
status: "running".to_string(), status: "running".to_string(),
admin: req_data.admin_pubkey, admin: new_app_req.admin_pubkey,
container_resource: req_data.resource, app_resource: new_app_req.resource,
mapped_ports: mapped_ports.clone(), mapped_ports,
}; };
app_instance.write_config()?;
host_resource
.existing_apps
.insert(app_instance.uuid.clone());
self.containers.push(container); Ok(app_instance)
}
Ok(mapped_ports) fn write_config(&self) -> Result<()> {
let mut file = File::create(APP_CONFIG_DIR.to_string() + &self.uuid + ".yaml")?;
file.write_all(serde_yml::to_string(self)?.as_bytes())?;
Ok(())
} }
pub async fn delete_container(&mut self, container_uuid: String) -> Result<()> { pub async fn delete_container(&mut self, container_uuid: String) -> Result<()> {
let _ = container_uuid;
// TODO: implement delete
/*
let Some(container_position) = self let Some(container_position) = self
.containers .existing_apps
.iter() .iter()
.position(|c| c.uuid == container_uuid) .position(|c| c.uuid == container_uuid)
else { else {
@ -60,10 +119,11 @@ impl DaemonState {
let container = &self.containers[container_position]; let container = &self.containers[container_position];
let container_name = format!("dtpm-{}", container.uuid); let container_name = format!("{APP_NAME_PREFIX}-{}", container.uuid);
delete_enclave(container_name)?; delete_enclave(container_name)?;
self.containers.remove(container_position); self.containers.remove(container_position);
*/
Ok(()) Ok(())
} }

@ -4,3 +4,7 @@ pub const ADMIN_PUBKEY: &str = "0x28a3a71197250b0fa4dd0f86288e07ec9cc78ce3338e21
pub const PACKAGE_ARCHIVE_POSTFIX: &str = "-enclave_packager.tar.gz"; pub const PACKAGE_ARCHIVE_POSTFIX: &str = "-enclave_packager.tar.gz";
pub const PACKAGE_ARCHIVE_DIR_PATH: &str = "./enclave_archives"; pub const PACKAGE_ARCHIVE_DIR_PATH: &str = "./enclave_archives";
pub const PACKAGE_DIR_PATH: &str = "./enclaves"; pub const PACKAGE_DIR_PATH: &str = "./enclaves";
pub const APP_NAME_PREFIX: &str = "dtpm";
// TODO: handle this home directory properly
pub const APP_CONFIG_DIR: &str = "~/.dtpm/app_daemon/";

@ -1,6 +1,5 @@
use anyhow::Result; use anyhow::Result;
use detee_shared::pb::brain::brain_app_daemon_client::BrainAppDaemonClient; use detee_shared::pb::brain::brain_app_daemon_client::BrainAppDaemonClient;
use detee_shared::pb::brain::daemon_message_app;
use detee_shared::pb::brain::{ use detee_shared::pb::brain::{
AppContract, BrainMessageApp, DaemonMessageApp, Pubkey, RegisterAppNodeReq, AppContract, BrainMessageApp, DaemonMessageApp, Pubkey, RegisterAppNodeReq,
}; };
@ -21,7 +20,7 @@ pub struct ConnectionData {
pub daemon_msg_tx: Sender<DaemonMessageApp>, pub daemon_msg_tx: Sender<DaemonMessageApp>,
} }
pub async fn register_node(config: &crate::Config) -> Result<Vec<AppContract>> { pub async fn register_node(config: &crate::HostConfig) -> Result<Vec<AppContract>> {
let mut client = BrainAppDaemonClient::connect(config.brain_url.clone()).await?; let mut client = BrainAppDaemonClient::connect(config.brain_url.clone()).await?;
log::debug!("registering node with brain"); log::debug!("registering node with brain");
@ -105,10 +104,8 @@ pub async fn send_messages(
let pubkey = NODE_PUBKEY.to_string(); let pubkey = NODE_PUBKEY.to_string();
let rx_stream = ReceiverStream::new(rx); let rx_stream = ReceiverStream::new(rx);
tx.send(DaemonMessageApp {
msg: Some(daemon_message_app::Msg::Pubkey(pubkey)), tx.send(pubkey.into()).await?;
})
.await?;
client.daemon_messages(rx_stream).await?; client.daemon_messages(rx_stream).await?;
log::debug!("daemon_messages is about to exit"); log::debug!("daemon_messages is about to exit");
Ok(()) Ok(())

@ -5,40 +5,43 @@ pub mod global;
pub mod grpc; pub mod grpc;
pub mod utils; pub mod utils;
use data::App;
use detee_shared::pb::brain::brain_message_app; use detee_shared::pb::brain::brain_message_app;
use detee_shared::pb::brain::daemon_message_app;
use detee_shared::pb::brain::AppContract; use detee_shared::pb::brain::AppContract;
use detee_shared::pb::brain::BrainMessageApp; use detee_shared::pb::brain::BrainMessageApp;
use detee_shared::pb::brain::DaemonMessageApp; use detee_shared::pb::brain::DaemonMessageApp;
use detee_shared::pb::brain::MappedPort; use detee_shared::pb::brain::MappedPort;
use detee_shared::pb::brain::NewAppRes; use detee_shared::pb::brain::NewAppRes;
use detee_shared::types::brain::AppDeployConfig; use detee_shared::types::brain::AppDeployConfig;
use log::info;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::time::sleep; use tokio::time::sleep;
use utils::cleanup_enclave_disk_and_package; use utils::cleanup_enclave_disk_and_package;
use utils::handle_package;
pub use crate::config::Config; pub use crate::config::HostConfig;
pub use crate::data::DaemonState; pub use crate::data::HostResources;
#[derive(Debug)] #[derive(Debug)]
pub struct AppHandler { pub struct AppHandler {
pub receiver: Receiver<BrainMessageApp>, pub receiver: Receiver<BrainMessageApp>,
pub sender: Sender<DaemonMessageApp>, pub sender: Sender<DaemonMessageApp>,
pub config: Config, pub host_config: HostConfig,
pub data: DaemonState, pub host_resource: HostResources,
} }
impl AppHandler { impl AppHandler {
pub fn new(receiver: Receiver<BrainMessageApp>, sender: Sender<DaemonMessageApp>) -> Self { pub fn new(receiver: Receiver<BrainMessageApp>, sender: Sender<DaemonMessageApp>) -> Self {
// TODO: load from config and resources from file,
// if not found use default and save host resources to file
Self { Self {
receiver, receiver,
sender, sender,
config: Config::default(), host_config: HostConfig::default(),
data: DaemonState::default(), host_resource: HostResources::default(),
} }
} }
@ -50,11 +53,11 @@ impl AppHandler {
while let Some(brain_msg) = self.receiver.recv().await { while let Some(brain_msg) = self.receiver.recv().await {
match brain_msg.msg { match brain_msg.msg {
Some(brain_message_app::Msg::NewAppReq(msg)) => { Some(brain_message_app::Msg::NewAppReq(msg)) => {
self.handle_new_container_req(msg.into()).await; self.handle_new_app_req(msg.into()).await;
} }
Some(brain_message_app::Msg::DeleteAppReq(msg)) => { Some(brain_message_app::Msg::DeleteAppReq(msg)) => {
let app_id = msg.uuid; let app_id = msg.uuid;
self.handle_del_container_req(app_id).await; self.handle_del_app_req(app_id).await;
} }
None => { None => {
log::error!("Brain disconnected"); log::error!("Brain disconnected");
@ -64,76 +67,50 @@ impl AppHandler {
} }
} }
async fn handle_new_container_req(&mut self, new_container_req: AppDeployConfig) { async fn handle_new_app_req(&mut self, new_app_req: AppDeployConfig) {
let container_uuid = new_container_req.uuid.clone(); let uuid = new_app_req.uuid.clone();
let app_result = App::new(new_app_req, &self.host_config, &mut self.host_resource).await;
let unarchive_dir = match handle_package( match app_result {
new_container_req.package_url.clone(), Ok(app) => {
container_uuid.clone(), // TODO: update host resources to brain
) self.send_node_resources().await;
.await
{ info!("Succesfully started VM {uuid}");
Ok(unarchive_dir) => unarchive_dir, let res = NewAppRes {
uuid,
status: "success".to_string(),
error: "".to_string(),
mapped_ports: app.mapped_ports.into_iter().map(MappedPort::from).collect(),
ip_address: self.host_config.host_ip_address.clone(),
};
let _ = self.sender.send(res.into()).await;
}
Err(e) => { Err(e) => {
let res = DaemonMessageApp { let res = NewAppRes {
msg: Some(daemon_message_app::Msg::NewAppRes(NewAppRes { uuid,
uuid: new_container_req.uuid, status: "failed".to_string(),
status: "failed".to_string(), error: e.to_string(),
error: e.to_string(), ..Default::default()
..Default::default()
})),
}; };
println!("sending response {:?}", res); log::error!("sending response {:?}", res);
let _ = self.sender.send(res).await; let _ = self.sender.send(res.into()).await;
return;
} }
}; };
let mapped_ports = match self
.data
.create_new_container(new_container_req, unarchive_dir)
.await
{
Ok(mapped_ports) => mapped_ports.into_iter().map(MappedPort::from).collect(),
Err(e) => {
let res = DaemonMessageApp {
msg: Some(daemon_message_app::Msg::NewAppRes(NewAppRes {
uuid: container_uuid,
status: "failed".to_string(),
error: e.to_string(),
..Default::default()
})),
};
println!("sending response {:?}", res);
let _ = self.sender.send(res).await;
return;
}
};
let res = DaemonMessageApp {
msg: Some(daemon_message_app::Msg::NewAppRes(NewAppRes {
uuid: container_uuid,
status: "Success".to_string(),
error: "".to_string(),
ip_address: "".to_string(),
mapped_ports,
})),
};
println!("sending response {:?}", res);
let _ = self.sender.send(res).await;
} }
async fn handle_del_container_req(&mut self, container_uuid: String) { async fn handle_del_app_req(&mut self, container_uuid: String) {
if let Err(e) = self.data.delete_container(container_uuid.clone()).await { // if let Err(e) = self.data.delete_container(container_uuid.clone()).await { log::error!("Failed to delete container:\n{e}"); }
log::error!("Failed to delete container:\n{e}");
}
if let Err(er) = cleanup_enclave_disk_and_package(container_uuid).await { if let Err(er) = cleanup_enclave_disk_and_package(container_uuid).await {
log::error!("Failed to cleanup disk:\n{er}"); log::error!("Failed to cleanup disk:\n{er}");
}; };
} }
async fn send_node_resources(&self) {
// TODO: send host resources to brain
}
} }
#[tokio::main] #[tokio::main]
@ -145,17 +122,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6); let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6);
let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6); let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6);
let mut container_handler = AppHandler::new(brain_msg_rx, daemon_msg_tx.clone()); let mut app_handler = AppHandler::new(brain_msg_rx, daemon_msg_tx.clone());
let brain_url = container_handler.config.brain_url.clone(); let brain_url = app_handler.host_config.brain_url.clone();
match grpc::register_node(&container_handler.config).await { match grpc::register_node(&app_handler.host_config).await {
Ok(container_contracts) => container_handler.handle_contracts(container_contracts), Ok(app_contracts) => app_handler.handle_contracts(app_contracts),
Err(e) => log::error!("Failed to connect to brain: {e}"), Err(e) => log::error!("Failed to connect to brain: {e}"),
} }
tokio::spawn(async move { tokio::spawn(async move {
container_handler.run().await; app_handler.run().await;
}); });
log::info!("Connecting to brain..."); log::info!("Connecting to brain...");

@ -14,7 +14,11 @@ use crate::global::{PACKAGE_ARCHIVE_DIR_PATH, PACKAGE_ARCHIVE_POSTFIX, PACKAGE_D
pub static IP_INFO: LazyLock<IPInfo> = LazyLock::new(|| get_ip_info().unwrap()); pub static IP_INFO: LazyLock<IPInfo> = LazyLock::new(|| get_ip_info().unwrap());
pub async fn handle_package(package_url: String, container_uuid: String) -> Result<String> { pub async fn handle_package(
package_url: String,
container_uuid: String,
delete_archive: bool,
) -> Result<String> {
let dir_path = Path::new(PACKAGE_ARCHIVE_DIR_PATH); let dir_path = Path::new(PACKAGE_ARCHIVE_DIR_PATH);
fs::create_dir_all(dir_path).await?; fs::create_dir_all(dir_path).await?;
@ -25,7 +29,7 @@ pub async fn handle_package(package_url: String, container_uuid: String) -> Resu
return Err(anyhow!("Error downloading file")); return Err(anyhow!("Error downloading file"));
} }
let downloaded_file = std::fs::File::open(file_path)?; let downloaded_file = std::fs::File::open(&file_path)?;
let mut reader = BufReader::new(downloaded_file); let mut reader = BufReader::new(downloaded_file);
let mut archive = Archive::new(GzDecoder::new(&mut reader)); let mut archive = Archive::new(GzDecoder::new(&mut reader));
@ -38,6 +42,10 @@ pub async fn handle_package(package_url: String, container_uuid: String) -> Resu
fs::create_dir_all(Path::new(&unarchive_dir)).await?; fs::create_dir_all(Path::new(&unarchive_dir)).await?;
archive.unpack(&unarchive_dir)?; archive.unpack(&unarchive_dir)?;
if delete_archive {
let _ = fs::remove_file(file_path).await;
}
Ok(unarchive_dir) Ok(unarchive_dir)
} }