diff --git a/Cargo.lock b/Cargo.lock index 80681db..720cc4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,6 +216,26 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bincode" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" +dependencies = [ + "bincode_derive", + "serde", + "unty", +] + +[[package]] +name = "bincode_derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" +dependencies = [ + "virtue", +] + [[package]] name = "bitflags" version = "2.8.0" @@ -264,6 +284,8 @@ version = "1.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13208fcbb66eaeffe09b99fffbe1af420f00a7b35aa99ad683dfc1aa76145229" dependencies = [ + "jobserver", + "libc", "shlex", ] @@ -411,15 +433,17 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=main#3024c00b8e1c93e70902793385b92bc0a8d1f26a" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#0b195b4589e4ec689af7ddca27dc051716ecee78" dependencies = [ - "base64", + "bincode", "prost", "serde", "serde_yaml", + "tar", "thiserror", "tonic", "tonic-build", + "zstd", ] [[package]] @@ -1062,6 +1086,15 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.77" @@ -1851,9 +1884,9 @@ dependencies = [ [[package]] name = "tar" -version = "0.4.43" +version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c65998313f8e17d0d553d28f91a0df93e4dbbbf770279c7bc21ca0f09ea1a1f6" +checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a" dependencies = [ "filetime", "libc", @@ -2144,6 +2177,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unty" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" + [[package]] name = "url" version = "2.5.4" @@ -2185,6 +2224,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "virtue" +version = "0.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" + [[package]] name = "want" version = "0.3.1" @@ -2518,3 +2563,31 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.15+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index 12ecb9a..b9b790d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ bs58 = "0.5.1" chrono = "0.4.39" sha2 = "0.10.8" -detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "main" } +detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" } # detee-shared = { path = "../detee-shared" } [build-dependencies] diff --git a/src/config.rs b/src/config.rs index 93604c2..2ec9dfe 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,7 +14,7 @@ pub struct HostConfig { pub max_memory_mb_per_app: u32, pub max_vcpu_reservation: u32, pub max_mem_reservation_mb: u32, - pub max_disk_reservation_mb: u32, + pub max_disk_reservation_gb: u32, pub max_ports_per_app: u32, // price per unit per minute pub price: u64, diff --git a/src/container.rs b/src/container.rs index f8b943f..93d6eb0 100644 --- a/src/container.rs +++ b/src/container.rs @@ -22,13 +22,13 @@ pub fn deploy_enclave( ); let memory_mb = app_resource.memory_mb; - let vcpu = app_resource.vcpu; + let vcpus = app_resource.vcpus; // TODO: docker limit disk space // let disk_mb = app_resource.disk_mb; // --storage-opt size={disk_mb}m let docker_deploy_str = format!( - "docker run -d --restart unless-stopped --name {container_name_uuid} --memory={memory_mb}m --cpus={vcpu} \ + "docker run -d --restart unless-stopped --name {container_name_uuid} --memory={memory_mb}m --cpus={vcpus} \ -v {enclave_path}:/enclave_package --device /dev/sgx/enclave --device /dev/sgx/provision \ {port_maping_string} noormohammedb/occlum-enclave:v1 {hratls_pubkey}" ); diff --git a/src/data.rs b/src/data.rs index 5414919..0c53172 100644 --- a/src/data.rs +++ b/src/data.rs @@ -22,7 +22,7 @@ pub struct HostResources { pub existing_apps: HashSet, pub reserved_vcpus: u32, pub reserved_memory_mb: u32, - pub reserved_disk_mb: u32, + pub reserved_disk_gb: u32, pub reserved_host_ports: HashSet, } @@ -51,8 +51,8 @@ impl HostResources { pub fn reserve_resources(&mut self, app: &App) -> Result<()> { self.reserved_memory_mb += app.app_resource.memory_mb; - self.reserved_vcpus += app.app_resource.vcpu; - self.reserved_disk_mb += app.app_resource.disk_mb; + self.reserved_vcpus += app.app_resource.vcpus; + self.reserved_disk_gb += app.app_resource.disk_size_gb; for (port, _) in app.mapped_ports.iter() { self.reserved_host_ports.insert(*port); @@ -64,8 +64,8 @@ impl HostResources { pub fn un_reserve_resources(&mut self, app: &App) -> Result<()> { self.reserved_memory_mb -= app.app_resource.memory_mb; - self.reserved_vcpus -= app.app_resource.vcpu; - self.reserved_disk_mb -= app.app_resource.disk_mb; + self.reserved_vcpus -= app.app_resource.vcpus; + self.reserved_disk_gb -= app.app_resource.disk_size_gb; for (port, _) in app.mapped_ports.iter() { self.reserved_host_ports.remove(port); @@ -141,7 +141,7 @@ impl App { let app_uuid = new_app_req.uuid.clone(); - if host_config.max_cores_per_app < new_app_req.resource.vcpu { + if host_config.max_cores_per_app < new_app_req.resource.vcpus { return Err(anyhow!("too many vcpus for app")); } @@ -153,7 +153,7 @@ impl App { if host_config.max_vcpu_reservation < host_resource .reserved_vcpus - .saturating_add(new_app_req.resource.vcpu) + .saturating_add(new_app_req.resource.vcpus) { return Err(anyhow!("vcpus not available")); } @@ -164,7 +164,7 @@ impl App { { return Err(anyhow!("not enough memory available")); } - if new_app_req.resource.disk_mb < 128 { + if new_app_req.resource.disk_size_gb < 1 { return Err(anyhow!("disk too small")); } diff --git a/src/grpc.rs b/src/grpc.rs index f105be1..bd05fa3 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,7 +1,7 @@ use anyhow::Result; use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient; use detee_shared::app_proto::{ - AppContract, BrainMessageApp, DaemonAuth, DaemonMessageApp, RegisterAppNodeReq, + BrainMessageApp, DaemonAuth, DaemonMessageApp, DelAppReq, RegisterAppNodeReq, }; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; @@ -12,14 +12,14 @@ use tonic::metadata::AsciiMetadataValue; use tonic::transport::{Certificate, Channel, ClientTlsConfig}; use tonic::Request; -use crate::global::{IP_INFO, PUBLIC_KEY, BRAIN_STAGING, BRAIN_TESTING, DETEE_ROOT_CA}; +use crate::global::{BRAIN_STAGING, BRAIN_TESTING, DETEE_ROOT_CA, IP_INFO, PUBLIC_KEY}; pub struct ConnectionData { pub network: String, pub brain_msg_tx: Sender, pub daemon_msg_rx: Receiver, pub daemon_msg_tx: Sender, - pub app_contracts_uuid: Vec, + pub del_apps_uuid: Vec, } async fn client(network: &str) -> Result> { @@ -32,17 +32,23 @@ async fn client(network: &str) -> Result> { )) } }; + log::info!("brain_url: {brain_url}, brain_san: {brain_san}"); let pem = std::fs::read_to_string(DETEE_ROOT_CA)?; let ca = Certificate::from_pem(pem); - let tls = ClientTlsConfig::new().ca_certificate(ca).domain_name(brain_san); + let tls = ClientTlsConfig::new() + .ca_certificate(ca) + .domain_name(brain_san); - let channel = Channel::from_shared(brain_url.to_string())?.tls_config(tls)?.connect().await?; + let channel = Channel::from_shared(brain_url.to_string())? + .tls_config(tls)? + .connect() + .await?; Ok(BrainAppDaemonClient::new(channel)) } -pub async fn register_node(config: &crate::HostConfig) -> Result> { +pub async fn register_node(config: &crate::HostConfig) -> Result> { let mut client = client(&config.network).await?; log::debug!("registering node with brain"); @@ -68,13 +74,13 @@ pub async fn register_node(config: &crate::HostConfig) -> Result { - container_contracts.push(contract); + Ok(del_app_req) => { + del_app_reqs.push(del_app_req); } Err(e) => { println!("Brain disconnected from register_node: {e}"); @@ -83,10 +89,10 @@ pub async fn register_node(config: &crate::HostConfig) -> Result Result<()> { @@ -96,12 +102,12 @@ pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> { streaming_tasks.spawn(receive_messages( client.clone(), - conn_data.app_contracts_uuid.clone(), + conn_data.del_apps_uuid.clone(), conn_data.brain_msg_tx, )); streaming_tasks.spawn(send_messages( client.clone(), - conn_data.app_contracts_uuid.clone(), + conn_data.del_apps_uuid.clone(), conn_data.daemon_msg_rx, conn_data.daemon_msg_tx, )); @@ -126,12 +132,12 @@ fn sign_stream_auth(contracts: Vec) -> Result { pub async fn receive_messages( mut client: BrainAppDaemonClient, - contracts: Vec, + del_apps_uuid: Vec, tx: Sender, ) -> Result<()> { log::debug!("starting to listen for messages from brain"); let mut grpc_stream = client - .brain_messages(sign_stream_auth(contracts)?) + .brain_messages(sign_stream_auth(del_apps_uuid)?) .await? .into_inner(); @@ -152,14 +158,14 @@ pub async fn receive_messages( pub async fn send_messages( mut client: BrainAppDaemonClient, - contracts: Vec, + del_apps_uuid: Vec, rx: Receiver, tx: Sender, ) -> Result<()> { let rx_stream = ReceiverStream::new(rx); tx.send(DaemonMessageApp { msg: Some(detee_shared::app_proto::daemon_message_app::Msg::Auth( - sign_stream_auth(contracts)?, + sign_stream_auth(del_apps_uuid)?, )), }) .await?; diff --git a/src/main.rs b/src/main.rs index 1245378..5e7b9e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,14 +9,13 @@ use anyhow::anyhow; use anyhow::Result; use data::App; use detee_shared::app_proto::{ - brain_message_app, AppContract, AppNodeResources, BrainMessageApp, DaemonMessageApp, - MappedPort, NewAppRes, + brain_message_app, AppNodeResources, BrainMessageApp, DaemonMessageApp, NewAppRes, }; +use detee_shared::common_proto::MappedPort; use detee_shared::sgx::types::brain::AppDeployConfig; use global::PUBLIC_KEY; use log::info; use log::warn; -use std::collections::HashSet; use std::fs::File; use std::path::Path; use std::time::Duration; @@ -66,19 +65,13 @@ impl AppHandler { } } - async fn handle_contracts(&mut self, contracts: Vec) { + async fn handle_contracts(&mut self, del_apps_uuid: Vec) { let apps_in_host = self.host_resource.existing_apps.clone(); - let apps_in_brain: HashSet = contracts - .iter() - .map(|contact| contact.uuid.clone()) - .collect(); - - let deleted_apps: HashSet = - apps_in_host.difference(&apps_in_brain).cloned().collect(); - - for uuid in deleted_apps { - if let Err(e) = self.handle_del_app_req(uuid.clone()).await { - log::error!("Failed to delete app: {e}"); + for del_app_uuid in del_apps_uuid { + if apps_in_host.contains(&del_app_uuid) { + if let Err(e) = self.handle_del_app_req(del_app_uuid.clone()).await { + log::error!("Failed to delete app: {e}"); + } } } } @@ -117,7 +110,6 @@ impl AppHandler { info!("Succesfully started App {uuid}"); let res = NewAppRes { uuid, - status: "success".to_string(), error: "".to_string(), mapped_ports: app.mapped_ports.into_iter().map(MappedPort::from).collect(), ip_address: self.host_config.host_ip_address.clone(), @@ -127,7 +119,6 @@ impl AppHandler { Err(e) => { let res = NewAppRes { uuid, - status: "failed".to_string(), error: e.to_string(), ..Default::default() }; @@ -163,7 +154,7 @@ impl AppHandler { let avail_vcpus = host_config.max_vcpu_reservation - host_resource.reserved_vcpus; let avail_memory_mb = host_config.max_mem_reservation_mb - host_resource.reserved_memory_mb; - let avail_storage_mb = host_config.max_disk_reservation_mb - host_resource.reserved_disk_mb; + let avail_storage_gb = host_config.max_disk_reservation_gb - host_resource.reserved_disk_gb; let max_ports_per_app = host_config.max_ports_per_app; let resource_update = AppNodeResources { @@ -171,7 +162,7 @@ impl AppHandler { avail_no_of_port, avail_vcpus, avail_memory_mb, - avail_storage_mb, + avail_storage_gb, max_ports_per_app, }; @@ -199,11 +190,11 @@ async fn main() -> Result<(), Box> { } } - let mut contracts = vec![]; + let mut del_apps_uuid = vec![]; match grpc::register_node(&app_handler.host_config).await { - Ok(app_contracts) => { - contracts.append(&mut app_contracts.iter().map(|c| c.uuid.clone()).collect()); - app_handler.handle_contracts(app_contracts).await + Ok(del_app_reqs) => { + del_apps_uuid.append(&mut del_app_reqs.iter().map(|c| c.uuid.clone()).collect()); + app_handler.handle_contracts(del_apps_uuid.clone()).await } Err(e) => log::error!("Failed to connect to brain: {e}"), @@ -219,7 +210,7 @@ async fn main() -> Result<(), Box> { brain_msg_tx, daemon_msg_rx, daemon_msg_tx, - app_contracts_uuid: contracts, + del_apps_uuid, }) .await {