Compare commits

...

5 Commits

Author SHA1 Message Date
7f751a5f30
fixed disk ratio 2025-06-30 20:16:39 +05:30
4e6fbfb42d
switching to sloat and credit system
updated proto
change all units to mib
calculating sloat ration while new app and sending resource
2025-06-30 20:16:36 +05:30
ee8ce47be3
switch to new staging brain 2025-06-30 20:16:33 +05:30
a31186fcb9
Improves brain connection reliability
Updates the brain connection logic to randomly select from a list of available URLs for staging and testnet environments.
2025-06-30 20:16:31 +05:30
4821547556
Register returns deleted apps
Updated proto, changes in app resource
change disk unit to GB and vcpu to vcpus
refactor contract handling while registering
logging brain url and san
2025-06-30 20:16:24 +05:30
8 changed files with 222 additions and 77 deletions

84
Cargo.lock generated

@ -1,7 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
# SPDX-License-Identifier: Apache-2.0
version = 4
[[package]]
@ -218,6 +217,26 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bincode"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740"
dependencies = [
"bincode_derive",
"serde",
"unty",
]
[[package]]
name = "bincode_derive"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09"
dependencies = [
"virtue",
]
[[package]]
name = "bitflags"
version = "2.8.0"
@ -266,6 +285,8 @@ version = "1.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13208fcbb66eaeffe09b99fffbe1af420f00a7b35aa99ad683dfc1aa76145229"
dependencies = [
"jobserver",
"libc",
"shlex",
]
@ -413,15 +434,17 @@ dependencies = [
[[package]]
name = "detee-shared"
version = "0.1.0"
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=main#3024c00b8e1c93e70902793385b92bc0a8d1f26a"
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=credits_app#01e93d3a2e4502c0e8e72026e8a1c55810961815"
dependencies = [
"base64",
"bincode",
"prost",
"serde",
"serde_yaml",
"tar",
"thiserror",
"tonic",
"tonic-build",
"zstd",
]
[[package]]
@ -1064,6 +1087,15 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
[[package]]
name = "jobserver"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.77"
@ -1853,9 +1885,9 @@ dependencies = [
[[package]]
name = "tar"
version = "0.4.43"
version = "0.4.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c65998313f8e17d0d553d28f91a0df93e4dbbbf770279c7bc21ca0f09ea1a1f6"
checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a"
dependencies = [
"filetime",
"libc",
@ -2146,6 +2178,12 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "unty"
version = "0.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae"
[[package]]
name = "url"
version = "2.5.4"
@ -2187,6 +2225,12 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "virtue"
version = "0.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1"
[[package]]
name = "want"
version = "0.3.1"
@ -2520,3 +2564,31 @@ dependencies = [
"quote",
"syn",
]
[[package]]
name = "zstd"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "7.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d"
dependencies = [
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.15+zstd.1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237"
dependencies = [
"cc",
"pkg-config",
]

@ -27,7 +27,7 @@ bs58 = "0.5.1"
chrono = "0.4.39"
sha2 = "0.10.8"
detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "main" }
detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "credits_app" }
# detee-shared = { path = "../detee-shared" }
[build-dependencies]

@ -13,10 +13,10 @@ pub struct HostConfig {
pub host_ip_address: String,
pub operator_wallet: String,
pub max_cores_per_app: u32,
pub max_memory_mb_per_app: u32,
pub max_memory_mib_per_app: u32,
pub max_vcpu_reservation: u32,
pub max_mem_reservation_mb: u32,
pub max_disk_reservation_mb: u32,
pub max_mem_reservation_mib: u32,
pub max_disk_reservation_mib: u32,
pub max_ports_per_app: u32,
// price per unit per minute
pub price: u64,

@ -23,14 +23,14 @@ pub fn deploy_enclave(
enclave_path, hratls_pubkey
);
let memory_mb = app_resource.memory_mb;
let vcpu = app_resource.vcpu;
let memory_mib = app_resource.memory_mib;
let vcpus = app_resource.vcpus;
// TODO: docker limit disk space
// let disk_mb = app_resource.disk_mb;
// --storage-opt size={disk_mb}m
// let disk_mib = app_resource.disk_mib;
// --storage-opt size={disk_mib}m
let docker_deploy_str = format!(
"docker run -d --restart unless-stopped --name {container_name_uuid} --memory={memory_mb}m --cpus={vcpu} \
"docker run -d --restart unless-stopped --name {container_name_uuid} --memory={memory_mib}m --cpus={vcpus} \
-v {enclave_path}:/enclave_package --device /dev/sgx/enclave --device /dev/sgx/provision \
{port_maping_string} noormohammedb/occlum-enclave:v1 {hratls_pubkey}"
);

@ -23,8 +23,8 @@ use crate::global::USED_RESOURCES_PATH;
pub struct HostResources {
pub existing_apps: HashSet<String>,
pub reserved_vcpus: u32,
pub reserved_memory_mb: u32,
pub reserved_disk_mb: u32,
pub reserved_memory_mib: u32,
pub reserved_disk_mib: u32,
pub reserved_host_ports: HashSet<u16>,
}
@ -52,9 +52,9 @@ impl HostResources {
}
pub fn reserve_resources(&mut self, app: &App) -> Result<()> {
self.reserved_memory_mb += app.app_resource.memory_mb;
self.reserved_vcpus += app.app_resource.vcpu;
self.reserved_disk_mb += app.app_resource.disk_mb;
self.reserved_memory_mib += app.app_resource.memory_mib;
self.reserved_vcpus += app.app_resource.vcpus;
self.reserved_disk_mib += app.app_resource.disk_size_mib;
for (port, _) in app.mapped_ports.iter() {
self.reserved_host_ports.insert(*port);
@ -65,9 +65,9 @@ impl HostResources {
}
pub fn un_reserve_resources(&mut self, app: &App) -> Result<()> {
self.reserved_memory_mb -= app.app_resource.memory_mb;
self.reserved_vcpus -= app.app_resource.vcpu;
self.reserved_disk_mb -= app.app_resource.disk_mb;
self.reserved_memory_mib -= app.app_resource.memory_mib;
self.reserved_vcpus -= app.app_resource.vcpus;
self.reserved_disk_mib -= app.app_resource.disk_size_mib;
for (port, _) in app.mapped_ports.iter() {
self.reserved_host_ports.remove(port);
@ -143,11 +143,11 @@ impl App {
let app_uuid = new_app_req.uuid.clone();
if host_config.max_cores_per_app < new_app_req.resource.vcpu {
if host_config.max_cores_per_app < new_app_req.resource.vcpus {
return Err(anyhow!("too many vcpus for app"));
}
if host_config.max_memory_mb_per_app < new_app_req.resource.memory_mb {
if host_config.max_memory_mib_per_app < new_app_req.resource.memory_mib {
return Err(anyhow!("too much memory for app"));
}
@ -155,18 +155,18 @@ impl App {
if host_config.max_vcpu_reservation
< host_resource
.reserved_vcpus
.saturating_add(new_app_req.resource.vcpu)
.saturating_add(new_app_req.resource.vcpus)
{
return Err(anyhow!("vcpus not available"));
}
if host_config.max_mem_reservation_mb
if host_config.max_mem_reservation_mib
< host_resource
.reserved_memory_mb
.saturating_add(new_app_req.resource.memory_mb)
.reserved_memory_mib
.saturating_add(new_app_req.resource.memory_mib)
{
return Err(anyhow!("not enough memory available"));
}
if new_app_req.resource.disk_mb < 128 {
if new_app_req.resource.disk_size_mib < 1 {
return Err(anyhow!("disk too small"));
}

@ -3,20 +3,43 @@
use anyhow::Result;
use ed25519_dalek::SigningKey;
use log::{info, warn};
use rand::Rng;
use sha2::{Digest, Sha256};
use std::fs::File;
use std::io::{Read, Write};
use std::sync::LazyLock;
pub const DETEE_ROOT_CA: &str = "/etc/detee/root_ca.pem";
pub const BRAIN_STAGING: (&str, &str) = ("https://159.65.58.38:31337", "staging-brain");
pub const BRAIN_TESTING: (&str, &str) = ("https://164.92.249.180:31337", "testnet-brain");
pub(crate) const BRAIN_STAGING_URLS: [&str; 3] = [
"https://156.146.63.216:31337",
"https://156.146.63.216:31337",
"https://156.146.63.216:31337",
];
pub(crate) const BRAIN_TESTING_URLS: [&str; 3] = [
"https://184.107.169.199:45223",
"https://149.22.95.1:44522",
"https://149.36.48.99:48638",
];
pub const PACKAGE_ARCHIVE_POSTFIX: &str = "-enclave_package.tar.gz";
pub const PACKAGE_ARCHIVE_DIR_PATH: &str = "/var/lib/detee/archives";
pub const PACKAGE_DIR_PATH: &str = "/var/lib/detee/enclaves";
pub const APP_NAME_PREFIX: &str = "dtpm";
pub static BRAIN_STAGING: LazyLock<(&str, &str)> = LazyLock::new(|| {
(
BRAIN_STAGING_URLS[rand::thread_rng().gen_range(0..BRAIN_STAGING_URLS.len())],
"staging-brain",
)
});
pub static BRAIN_TESTING: LazyLock<(&str, &str)> = LazyLock::new(|| {
(
BRAIN_TESTING_URLS[rand::thread_rng().gen_range(0..BRAIN_TESTING_URLS.len())],
"testnet-brain",
)
});
// const DETEE_DIR_ENV_NAME: &str = "DETEE_DIR";
pub static IP_INFO: LazyLock<IPInfo> =

@ -3,7 +3,7 @@
use anyhow::Result;
use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient;
use detee_shared::app_proto::{
AppContract, BrainMessageApp, DaemonAuth, DaemonMessageApp, RegisterAppNodeReq,
BrainMessageApp, DaemonAuth, DaemonMessageApp, DelAppReq, RegisterAppNodeReq,
};
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
@ -14,37 +14,43 @@ use tonic::metadata::AsciiMetadataValue;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::Request;
use crate::global::{IP_INFO, PUBLIC_KEY, BRAIN_STAGING, BRAIN_TESTING, DETEE_ROOT_CA};
use crate::global::{BRAIN_STAGING, BRAIN_TESTING, DETEE_ROOT_CA, IP_INFO, PUBLIC_KEY};
pub struct ConnectionData {
pub network: String,
pub brain_msg_tx: Sender<BrainMessageApp>,
pub daemon_msg_rx: Receiver<DaemonMessageApp>,
pub daemon_msg_tx: Sender<DaemonMessageApp>,
pub app_contracts_uuid: Vec<String>,
pub del_apps_uuid: Vec<String>,
}
async fn client(network: &str) -> Result<BrainAppDaemonClient<Channel>> {
let (brain_url, brain_san) = match network {
"staging" => BRAIN_STAGING,
"testnet" => BRAIN_TESTING,
"staging" => *BRAIN_STAGING,
"testnet" => *BRAIN_TESTING,
_ => {
return Err(anyhow::anyhow!(
"The only networks currently supported are staging and testnet."
))
}
};
log::info!("brain_url: {brain_url}, brain_san: {brain_san}");
let pem = std::fs::read_to_string(DETEE_ROOT_CA)?;
let ca = Certificate::from_pem(pem);
let tls = ClientTlsConfig::new().ca_certificate(ca).domain_name(brain_san);
let tls = ClientTlsConfig::new()
.ca_certificate(ca)
.domain_name(brain_san);
let channel = Channel::from_shared(brain_url.to_string())?.tls_config(tls)?.connect().await?;
let channel = Channel::from_shared(brain_url.to_string())?
.tls_config(tls)?
.connect()
.await?;
Ok(BrainAppDaemonClient::new(channel))
}
pub async fn register_node(config: &crate::HostConfig) -> Result<Vec<AppContract>> {
pub async fn register_node(config: &crate::HostConfig) -> Result<Vec<DelAppReq>> {
let mut client = client(&config.network).await?;
log::debug!("registering node with brain");
@ -70,13 +76,13 @@ pub async fn register_node(config: &crate::HostConfig) -> Result<Vec<AppContract
req.metadata_mut().insert("pubkey", pubkey);
req.metadata_mut().insert("request-signature", signature);
let mut container_contracts = vec![];
let mut del_app_reqs = vec![];
let mut grpc_stream = client.register_app_node(req).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(contract) => {
container_contracts.push(contract);
Ok(del_app_req) => {
del_app_reqs.push(del_app_req);
}
Err(e) => {
println!("Brain disconnected from register_node: {e}");
@ -85,10 +91,10 @@ pub async fn register_node(config: &crate::HostConfig) -> Result<Vec<AppContract
}
log::info!(
"Brain registration succcessful, with contract count: {}",
container_contracts.len()
del_app_reqs.len()
);
Ok(container_contracts)
Ok(del_app_reqs)
}
pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> {
@ -98,12 +104,12 @@ pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> {
streaming_tasks.spawn(receive_messages(
client.clone(),
conn_data.app_contracts_uuid.clone(),
conn_data.del_apps_uuid.clone(),
conn_data.brain_msg_tx,
));
streaming_tasks.spawn(send_messages(
client.clone(),
conn_data.app_contracts_uuid.clone(),
conn_data.del_apps_uuid.clone(),
conn_data.daemon_msg_rx,
conn_data.daemon_msg_tx,
));
@ -128,12 +134,12 @@ fn sign_stream_auth(contracts: Vec<String>) -> Result<DaemonAuth> {
pub async fn receive_messages(
mut client: BrainAppDaemonClient<Channel>,
contracts: Vec<String>,
del_apps_uuid: Vec<String>,
tx: Sender<BrainMessageApp>,
) -> Result<()> {
log::debug!("starting to listen for messages from brain");
let mut grpc_stream = client
.brain_messages(sign_stream_auth(contracts)?)
.brain_messages(sign_stream_auth(del_apps_uuid)?)
.await?
.into_inner();
@ -154,14 +160,14 @@ pub async fn receive_messages(
pub async fn send_messages(
mut client: BrainAppDaemonClient<Channel>,
contracts: Vec<String>,
del_apps_uuid: Vec<String>,
rx: Receiver<DaemonMessageApp>,
tx: Sender<DaemonMessageApp>,
) -> Result<()> {
let rx_stream = ReceiverStream::new(rx);
tx.send(DaemonMessageApp {
msg: Some(detee_shared::app_proto::daemon_message_app::Msg::Auth(
sign_stream_auth(contracts)?,
sign_stream_auth(del_apps_uuid)?,
)),
})
.await?;

@ -11,14 +11,13 @@ use anyhow::anyhow;
use anyhow::Result;
use data::App;
use detee_shared::app_proto::{
brain_message_app, AppContract, AppNodeResources, BrainMessageApp, DaemonMessageApp,
MappedPort, NewAppRes,
brain_message_app, AppNodeResources, BrainMessageApp, DaemonMessageApp, NewAppRes,
};
use detee_shared::common_proto::MappedPort;
use detee_shared::sgx::types::brain::AppDeployConfig;
use global::PUBLIC_KEY;
use log::info;
use log::warn;
use std::collections::HashSet;
use std::fs::File;
use std::path::Path;
use std::time::Duration;
@ -68,22 +67,16 @@ impl AppHandler {
}
}
async fn handle_contracts(&mut self, contracts: Vec<AppContract>) {
async fn handle_contracts(&mut self, del_apps_uuid: Vec<String>) {
let apps_in_host = self.host_resource.existing_apps.clone();
let apps_in_brain: HashSet<String> = contracts
.iter()
.map(|contact| contact.uuid.clone())
.collect();
let deleted_apps: HashSet<String> =
apps_in_host.difference(&apps_in_brain).cloned().collect();
for uuid in deleted_apps {
if let Err(e) = self.handle_del_app_req(uuid.clone()).await {
for del_app_uuid in del_apps_uuid {
if apps_in_host.contains(&del_app_uuid) {
if let Err(e) = self.handle_del_app_req(del_app_uuid.clone()).await {
log::error!("Failed to delete app: {e}");
}
}
}
}
async fn run(mut self) {
sleep(Duration::from_millis(500)).await;
@ -109,6 +102,29 @@ impl AppHandler {
}
async fn handle_new_app_req(&mut self, new_app_req: AppDeployConfig) {
let (avail_memory_per_cpu, avail_disk_per_cpu) = self.slot_ratios();
let req_resource = &new_app_req.resource;
let req_memory_per_cpu = req_resource.memory_mib / req_resource.vcpus;
let req_disk_per_cpu = req_resource.disk_size_mib / req_resource.vcpus;
if !within_10_percent(avail_memory_per_cpu, req_memory_per_cpu as usize)
|| !within_10_percent(avail_disk_per_cpu, req_disk_per_cpu as usize)
{
warn!("Refusing to create app due to unbalanced resources: {new_app_req:?}");
let _ = self
.sender
.send(
NewAppRes {
uuid: new_app_req.uuid,
error: format!("Unbalanced hardware resources."),
..Default::default()
}
.into(),
)
.await;
return;
};
log::debug!("Processing new app request: {new_app_req:?}");
let uuid = new_app_req.uuid.clone();
let app_result = App::new(new_app_req, &self.host_config, &mut self.host_resource).await;
@ -119,7 +135,6 @@ impl AppHandler {
info!("Succesfully started App {uuid}");
let res = NewAppRes {
uuid,
status: "success".to_string(),
error: "".to_string(),
mapped_ports: app.mapped_ports.into_iter().map(MappedPort::from).collect(),
ip_address: self.host_config.host_ip_address.clone(),
@ -129,7 +144,6 @@ impl AppHandler {
Err(e) => {
let res = NewAppRes {
uuid,
status: "failed".to_string(),
error: e.to_string(),
..Default::default()
};
@ -155,6 +169,28 @@ impl AppHandler {
Ok(())
}
/// returns Memory per vCPU and Disk per vCPU ratio
fn slot_ratios(&self) -> (usize, usize) {
let total_storage_mib =
self.host_config
.max_disk_reservation_mib
.saturating_sub(self.host_resource.reserved_disk_mib) as usize;
let available_cpus =
self.host_config
.max_vcpu_reservation
.saturating_sub(self.host_resource.reserved_vcpus) as usize;
let available_mem =
self.host_config
.max_mem_reservation_mib
.saturating_sub(self.host_resource.reserved_memory_mib) as usize;
let memory_per_cpu = available_mem / available_cpus;
let disk_per_cpu = total_storage_mib / available_cpus;
(memory_per_cpu, disk_per_cpu)
}
async fn send_node_resources(&mut self) {
let host_config = self.host_config.clone();
let host_resource = self.host_resource.clone();
@ -164,16 +200,18 @@ impl AppHandler {
(host_config.public_port_range.len() - host_resource.reserved_host_ports.len()) as u32;
let avail_vcpus = host_config.max_vcpu_reservation - host_resource.reserved_vcpus;
let avail_memory_mb = host_config.max_mem_reservation_mb - host_resource.reserved_memory_mb;
let avail_storage_mb = host_config.max_disk_reservation_mb - host_resource.reserved_disk_mb;
let avail_memory_mib =
host_config.max_mem_reservation_mib - host_resource.reserved_memory_mib;
let avail_storage_mib =
host_config.max_disk_reservation_mib - host_resource.reserved_disk_mib;
let max_ports_per_app = host_config.max_ports_per_app;
let resource_update = AppNodeResources {
node_pubkey,
avail_no_of_port,
avail_vcpus,
avail_memory_mb,
avail_storage_mb,
avail_memory_mib,
avail_storage_mib,
max_ports_per_app,
};
@ -201,11 +239,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
let mut contracts = vec![];
let mut del_apps_uuid = vec![];
match grpc::register_node(&app_handler.host_config).await {
Ok(app_contracts) => {
contracts.append(&mut app_contracts.iter().map(|c| c.uuid.clone()).collect());
app_handler.handle_contracts(app_contracts).await
Ok(del_app_reqs) => {
del_apps_uuid.append(&mut del_app_reqs.iter().map(|c| c.uuid.clone()).collect());
app_handler.handle_contracts(del_apps_uuid.clone()).await
}
Err(e) => log::error!("Failed to connect to brain: {e}"),
@ -221,7 +259,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
brain_msg_tx,
daemon_msg_rx,
daemon_msg_tx,
app_contracts_uuid: contracts,
del_apps_uuid,
})
.await
{
@ -273,3 +311,9 @@ async fn download_and_replace_binary(network: &str) -> Result<()> {
}
Ok(())
}
fn within_10_percent(a: usize, b: usize) -> bool {
let diff = a.abs_diff(b); // u32
let reference = a.max(b); // the larger of the two
diff * 10 <= reference
}