sgx-daemon/src/main.rs

190 lines
5.9 KiB
Rust

pub mod config;
pub mod container;
pub mod data;
pub mod global;
pub mod grpc;
pub mod utils;
use detee_shared::pb::brain::brain_message_app;
use detee_shared::pb::brain::daemon_message_app;
use detee_shared::pb::brain::AppContract;
use detee_shared::pb::brain::BrainMessageApp;
use detee_shared::pb::brain::DaemonMessageApp;
use detee_shared::pb::brain::MappedPort;
use detee_shared::pb::brain::NewAppRes;
use detee_shared::types::brain::AppDeployConfig;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
use utils::cleanup_enclave_disk_and_package;
use utils::handle_package;
pub use crate::config::Config;
pub use crate::data::DaemonState;
#[derive(Debug)]
pub struct AppHandler {
pub receiver: Receiver<BrainMessageApp>,
pub sender: Sender<DaemonMessageApp>,
pub config: Config,
pub data: DaemonState,
}
impl AppHandler {
pub fn new(receiver: Receiver<BrainMessageApp>, sender: Sender<DaemonMessageApp>) -> Self {
Self {
receiver,
sender,
config: Config::default(),
data: DaemonState::default(),
}
}
fn handle_contracts(&mut self, contracts: Vec<AppContract>) {
dbg!(&contracts);
}
async fn run(mut self) {
while let Some(brain_msg) = self.receiver.recv().await {
match brain_msg.msg {
Some(brain_message_app::Msg::NewAppReq(msg)) => {
self.handle_new_container_req(msg.into()).await;
}
Some(brain_message_app::Msg::DeleteAppReq(msg)) => {
let app_id = msg.uuid;
self.handle_del_container_req(app_id).await;
}
None => {
log::error!("Brain disconnected");
break;
}
}
}
}
async fn handle_new_container_req(&mut self, new_container_req: AppDeployConfig) {
let container_uuid = new_container_req.uuid.clone();
let unarchive_dir = match handle_package(
new_container_req.package_url.clone(),
container_uuid.clone(),
)
.await
{
Ok(unarchive_dir) => unarchive_dir,
Err(e) => {
let res = DaemonMessageApp {
msg: Some(daemon_message_app::Msg::NewAppRes(NewAppRes {
uuid: new_container_req.uuid,
status: "failed".to_string(),
error: e.to_string(),
..Default::default()
})),
};
println!("sending response {:?}", res);
let _ = self.sender.send(res).await;
return;
}
};
let mapped_ports = match self
.data
.create_new_container(new_container_req, unarchive_dir)
.await
{
Ok(mapped_ports) => mapped_ports.into_iter().map(MappedPort::from).collect(),
Err(e) => {
let res = DaemonMessageApp {
msg: Some(daemon_message_app::Msg::NewAppRes(NewAppRes {
uuid: container_uuid,
status: "failed".to_string(),
error: e.to_string(),
..Default::default()
})),
};
println!("sending response {:?}", res);
let _ = self.sender.send(res).await;
return;
}
};
let res = DaemonMessageApp {
msg: Some(daemon_message_app::Msg::NewAppRes(NewAppRes {
uuid: container_uuid,
status: "Success".to_string(),
error: "".to_string(),
ip_address: "".to_string(),
mapped_ports,
})),
};
println!("sending response {:?}", res);
let _ = self.sender.send(res).await;
}
async fn handle_del_container_req(&mut self, container_uuid: String) {
if let Err(e) = self.data.delete_container(container_uuid.clone()).await {
log::error!("Failed to delete container:\n{e}");
}
if let Err(er) = cleanup_enclave_disk_and_package(container_uuid).await {
log::error!("Failed to cleanup disk:\n{er}");
};
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
set_logging();
log::info!("Detee daemon running");
loop {
let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6);
let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6);
let mut container_handler = AppHandler::new(brain_msg_rx, daemon_msg_tx.clone());
let brain_url = container_handler.config.brain_url.clone();
match grpc::register_node(&container_handler.config).await {
Ok(container_contracts) => container_handler.handle_contracts(container_contracts),
Err(e) => log::error!("Failed to connect to brain: {e}"),
}
tokio::spawn(async move {
container_handler.run().await;
});
log::info!("Connecting to brain...");
if let Err(e) = grpc::connect_and_run(grpc::ConnectionData {
brain_url,
brain_msg_tx,
daemon_msg_rx,
daemon_msg_tx,
})
.await
{
log::error!("The connection broke: {e}");
}
sleep(Duration::from_secs(3)).await;
}
}
fn set_logging() {
let log_level = match std::env::var("LOG_LEVEL") {
Ok(val) => match val.as_str() {
"DEBUG" => log::LevelFilter::Debug,
"INFO" => log::LevelFilter::Info,
_ => log::LevelFilter::Error,
},
_ => log::LevelFilter::Warn,
};
env_logger::builder()
.filter_level(log_level)
.format_timestamp(None)
.init();
}