From db02218fa91af9a5a5e1497d5964b4d8679fc02a Mon Sep 17 00:00:00 2001 From: Noor Date: Thu, 8 May 2025 19:31:16 +0530 Subject: [PATCH 01/11] WIP on App services --- src/bin/brain.rs | 10 +++- src/grpc/app.rs | 153 +++++++++++++++++++++++++++++++++++++++++++++++ src/grpc/mod.rs | 15 ++--- 3 files changed, 169 insertions(+), 9 deletions(-) diff --git a/src/bin/brain.rs b/src/bin/brain.rs index 62e1fb8..f549f6d 100644 --- a/src/bin/brain.rs +++ b/src/bin/brain.rs @@ -1,11 +1,13 @@ -use std::sync::Arc; - +use detee_shared::app_proto::brain_app_cli_server::BrainAppCliServer; +use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemonServer; use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; use dotenv::dotenv; +use std::sync::Arc; use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH}; use surreal_brain::db; +use surreal_brain::grpc::app::{AppCliServer, AppDaemonServer}; use surreal_brain::grpc::general::GeneralCliServer; use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; use tonic::transport::{Identity, Server, ServerTlsConfig}; @@ -31,6 +33,8 @@ async fn main() { let snp_daemon_server = BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())); let snp_cli_server = BrainVmCliServer::new(VmCliServer::new(db_arc.clone())); let general_service_server = BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone())); + let sgx_daemon_server = BrainAppDaemonServer::new(AppDaemonServer::new(db_arc.clone())); + let sgx_cli_server = BrainAppCliServer::new(AppCliServer::new(db_arc.clone())); let cert_path = std::env::var("CERT_PATH").unwrap_or(CERT_PATH.to_string()); let key_path = std::env::var("CERT_KEY_PATH").unwrap_or(CERT_KEY_PATH.to_string()); @@ -45,6 +49,8 @@ async fn main() { .add_service(snp_daemon_server) .add_service(snp_cli_server) .add_service(general_service_server) + .add_service(sgx_daemon_server) + .add_service(sgx_cli_server) .serve(addr) .await .unwrap(); diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 8b13789..b672cef 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -1 +1,154 @@ +use crate::grpc::{check_sig_from_parts, check_sig_from_req}; +use detee_shared::app_proto::brain_app_cli_server::BrainAppCli; +use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon; +use detee_shared::app_proto::{ + daemon_message_app, AppContract, AppNodeFilters, AppNodeListResp, BrainMessageApp, DaemonAuth, + DaemonMessageApp, DelAppReq, ListAppContractsReq, RegisterAppNodeReq, +}; +use detee_shared::common_proto::Empty; +use log::info; +use std::pin::Pin; +use std::sync::Arc; +use surrealdb::engine::remote::ws::Client; +use surrealdb::Surreal; +use tokio_stream::{Stream, StreamExt}; +use tonic::{Status, Streaming}; +pub struct AppDaemonServer { + pub db: Arc>, +} + +impl AppDaemonServer { + pub fn new(db: Arc>) -> Self { + Self { db } + } +} + +#[tonic::async_trait] +impl BrainAppDaemon for AppDaemonServer { + type RegisterAppNodeStream = Pin> + Send>>; + type BrainMessagesStream = Pin> + Send>>; + + async fn register_app_node( + &self, + req: tonic::Request, + ) -> Result::RegisterAppNodeStream>, tonic::Status> + { + let req = check_sig_from_req(req)?; + info!("Starting app_node registration process for {:?}", req); + + todo!() + } + + async fn brain_messages( + &self, + req: tonic::Request, + ) -> Result::BrainMessagesStream>, tonic::Status> { + let auth = req.into_inner(); + let pubkey = auth.pubkey.clone(); + check_sig_from_parts( + &pubkey, + &auth.timestamp, + &format!("{:?}", auth.contracts), + &auth.signature, + )?; + + info!("App Daemon {} connected to receive brain messages", pubkey); + + todo!() + } + + async fn daemon_messages( + &self, + req: tonic::Request>, + ) -> Result, tonic::Status> { + let mut req_stream = req.into_inner(); + let pubkey: String; + if let Some(Ok(msg)) = req_stream.next().await { + log::debug!("App daemon_messages received auth message: {:?}", msg); + if let Some(daemon_message_app::Msg::Auth(auth)) = msg.msg { + pubkey = auth.pubkey.clone(); + check_sig_from_parts( + &pubkey, + &auth.timestamp, + &format!("{:?}", &auth.contracts), + &auth.signature, + )?; + } else { + return Err(Status::unauthenticated( + "Could not authenticate the app daemon: could not extract auth signature", + )); + } + } else { + return Err(Status::unauthenticated("Could not authenticate the app daemon")); + } + + todo!() + } +} + +pub struct AppCliServer { + pub db: Arc>, +} + +impl AppCliServer { + pub fn new(db: Arc>) -> Self { + Self { db } + } +} + +#[tonic::async_trait] +impl BrainAppCli for AppCliServer { + type ListAppContractsStream = Pin> + Send>>; + type ListAppNodesStream = Pin> + Send>>; + + async fn deploy_app( + &self, + req: tonic::Request, + ) -> Result, tonic::Status> { + let req = check_sig_from_req(req)?; + info!("deploy_app process starting for {:?}", req); + + todo!() + } + + async fn delete_app( + &self, + req: tonic::Request, + ) -> Result, tonic::Status> { + let req = check_sig_from_req(req)?; + info!("delete_app process starting for {:?}", req); + + todo!() + } + + async fn list_app_contracts( + &self, + req: tonic::Request, + ) -> Result::ListAppContractsStream>, tonic::Status> { + let req = check_sig_from_req(req)?; + info!("list_app_contracts process starting for {:?}", req); + + todo!() + } + + async fn list_app_nodes( + &self, + req: tonic::Request, + ) -> Result::ListAppNodesStream>, tonic::Status> { + let req = check_sig_from_req(req)?; + info!("list_app_nodes process starting for {:?}", req); + + todo!() + } + + async fn get_one_app_node( + &self, + req: tonic::Request, + ) -> Result, tonic::Status> { + let req = check_sig_from_req(req)?; + info!("get_one_app_node process starting for {:?}", req); + + todo!() + } +} diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs index 07bb334..1dec498 100644 --- a/src/grpc/mod.rs +++ b/src/grpc/mod.rs @@ -4,11 +4,12 @@ pub mod types; pub mod vm; use crate::constants::ADMIN_ACCOUNTS; +use detee_shared::app_proto::*; use detee_shared::common_proto::{Empty, Pubkey}; use detee_shared::general_proto::{ AirdropReq, BanUserReq, KickReq, RegOperatorReq, ReportNodeReq, SlashReq, }; -use detee_shared::vm_proto::{ListVmContractsReq, *}; +use detee_shared::vm_proto::*; use tonic::{Request, Status}; pub trait PubkeyGetter { @@ -49,12 +50,12 @@ impl_pubkey_getter!(Empty); impl_pubkey_getter!(AirdropReq); impl_pubkey_getter!(SlashReq); -// impl_pubkey_getter!(NewAppReq, admin_pubkey); -// impl_pubkey_getter!(DelAppReq, admin_pubkey); -// impl_pubkey_getter!(ListAppContractsReq, admin_pubkey); -// -// impl_pubkey_getter!(RegisterAppNodeReq); -// impl_pubkey_getter!(AppNodeFilters); +impl_pubkey_getter!(NewAppReq, admin_pubkey); +impl_pubkey_getter!(DelAppReq, admin_pubkey); +impl_pubkey_getter!(ListAppContractsReq, admin_pubkey); + +impl_pubkey_getter!(RegisterAppNodeReq); +impl_pubkey_getter!(AppNodeFilters); pub fn check_sig_from_req(req: Request) -> Result { let time = match req.metadata().get("timestamp") { -- 2.43.0 From 71720ed4c53bf1fc4cf34eb4ffda13d740a055f6 Mon Sep 17 00:00:00 2001 From: Noor Date: Fri, 9 May 2025 16:14:48 +0530 Subject: [PATCH 02/11] refactor mapped port into common_proto --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/grpc/types.rs | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2d57616..394680f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d0d4622c52efdf74ed6582fbac23a6159986ade3" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#0d4b712fad1040dd773d2076d7b4b65b18527136" dependencies = [ "bincode 2.0.1", "prost", diff --git a/Cargo.toml b/Cargo.toml index 40b329b..c63e043 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ serde_yaml = "0.9.34" surrealdb = "2.2.2" tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } tonic = { version = "0.12", features = ["tls"] } -detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain" } +detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" } ed25519-dalek = "2.1.1" bs58 = "0.5.1" tokio-stream = "0.1.17" diff --git a/src/grpc/types.rs b/src/grpc/types.rs index 54cd886..59c7af0 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -1,8 +1,9 @@ use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE}; use crate::db::prelude as db; use detee_shared::app_proto::AppNodeListResp; +use detee_shared::common_proto::MappedPort; use detee_shared::general_proto::{AccountBalance, ListOperatorsResp}; -use detee_shared::vm_proto::*; +use detee_shared::{app_proto::*, vm_proto::*}; use nanoid::nanoid; use surrealdb::RecordId; -- 2.43.0 From 7807e14167d0492ae109e7f6effaf6bd6ac3d38f Mon Sep 17 00:00:00 2001 From: Noor Date: Fri, 9 May 2025 17:11:34 +0530 Subject: [PATCH 03/11] WIP App: daemon register enhance logging levels --- Cargo.lock | 1 + Cargo.toml | 1 + src/bin/brain.rs | 6 +++++- src/constants.rs | 1 + src/db/app.rs | 22 +++++++++++++++++++++- src/grpc/app.rs | 38 ++++++++++++++++++++++++++++++++++++-- src/grpc/types.rs | 35 +++++++++++++++++++++++++++++++++++ 7 files changed, 100 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 394680f..dd6da18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3796,6 +3796,7 @@ dependencies = [ "ed25519-dalek", "env_logger", "futures", + "hex", "hyper-util", "itertools 0.14.0", "log", diff --git a/Cargo.toml b/Cargo.toml index c63e043..9d96c45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ env_logger = "0.11.8" thiserror = "2.0.12" nanoid = "0.4.0" dotenv = "0.15.0" +hex = "0.4.3" [profile.release] lto = true diff --git a/src/bin/brain.rs b/src/bin/brain.rs index f549f6d..4798063 100644 --- a/src/bin/brain.rs +++ b/src/bin/brain.rs @@ -17,7 +17,11 @@ async fn main() { if dotenv::from_filename("/etc/detee/brain/config.ini").is_err() { dotenv().ok(); } - env_logger::builder().filter_level(log::LevelFilter::Debug).init(); + env_logger::builder() + .filter_level(log::LevelFilter::Trace) + .filter_module("tungstenite", log::LevelFilter::Debug) + .filter_module("tokio_tungstenite", log::LevelFilter::Debug) + .init(); let db_url = std::env::var("DB_URL").expect("the environment variable DB_URL is not set"); let db_user = std::env::var("DB_USER").expect("the environment variable DB_USER is not set"); diff --git a/src/constants.rs b/src/constants.rs index bc95da8..d398407 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -32,6 +32,7 @@ pub const DELETED_VM: &str = "deleted_vm"; pub const VM_CONTRACT: &str = "vm_contract"; pub const ACTIVE_APP: &str = "active_app"; +pub const APP_NODE: &str = "app_node"; pub const ID_ALPHABET: [char; 62] = [ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', diff --git a/src/db/app.rs b/src/db/app.rs index c49bb0b..9ef5009 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,7 +1,8 @@ -use crate::constants::{ACCOUNT, ACTIVE_APP}; +use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE}; use crate::db::general::Report; use super::Error; +use crate::db; use crate::old_brain; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; @@ -25,6 +26,14 @@ pub struct AppNode { pub offline_minutes: u64, } +impl AppNode { + pub async fn register(self, db: &Surreal) -> Result { + db::Account::get_or_create(db, &self.operator.key().to_string()).await?; + let app_node: Option = db.upsert(self.id.clone()).content(self).await?; + app_node.ok_or(Error::FailedToCreateDBEntry) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct AppNodeWithReports { pub id: RecordId, @@ -88,6 +97,17 @@ pub struct ActiveAppWithNode { } impl ActiveAppWithNode { + pub async fn list_by_node(db: &Surreal, node_pubkey: &str) -> Result, Error> { + let mut query_result = db + .query(format!( + "select * from {ACTIVE_APP} where out = {APP_NODE}:{node_pubkey} fetch out;" + )) + .await?; + + let contract: Vec = query_result.take(0)?; + Ok(contract) + } + pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { let contract: Option = db.query(format!("select * from {ACTIVE_APP}:{uuid} fetch out;")).await?.take(0)?; diff --git a/src/grpc/app.rs b/src/grpc/app.rs index b672cef..d8e2fac 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -1,3 +1,5 @@ +use crate::constants::{ACCOUNT, APP_NODE}; +use crate::db::prelude as db; use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use detee_shared::app_proto::brain_app_cli_server::BrainAppCli; use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon; @@ -10,9 +12,12 @@ use log::info; use std::pin::Pin; use std::sync::Arc; use surrealdb::engine::remote::ws::Client; +use surrealdb::RecordId; use surrealdb::Surreal; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; -use tonic::{Status, Streaming}; +use tonic::{Response, Status, Streaming}; pub struct AppDaemonServer { pub db: Arc>, @@ -37,7 +42,36 @@ impl BrainAppDaemon for AppDaemonServer { let req = check_sig_from_req(req)?; info!("Starting app_node registration process for {:?}", req); - todo!() + let app_node = db::AppNode { + id: RecordId::from((APP_NODE, req.node_pubkey.clone())), + operator: RecordId::from((ACCOUNT, req.operator_wallet)), + country: req.country, + region: req.region, + city: req.city, + ip: req.main_ip, + price: req.price, + + avail_mem_mb: 0, + avail_vcpus: 0, + avail_storage_gbs: 0, + avail_ports: 0, + max_ports_per_app: 0, + offline_minutes: 0, + }; + + app_node.register(&self.db).await?; + info!("Sending existing contracts to {}", req.node_pubkey); + + let contracts = db::ActiveAppWithNode::list_by_node(&self.db, &req.node_pubkey).await?; + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for contract in contracts { + let _ = tx.send(Ok(contract.into())).await; + } + }); + + let resp_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(resp_stream))) } async fn brain_messages( diff --git a/src/grpc/types.rs b/src/grpc/types.rs index 59c7af0..ee85a69 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -250,3 +250,38 @@ impl From for db::VmNodeResources { } } } + +impl From for AppContract { + fn from(value: db::ActiveAppWithNode) -> Self { + let public_package_mr_enclave = + Some(hex::decode(value.mr_enclave.clone()).unwrap_or_default()); + + AppContract { + uuid: value.id.key().to_string(), + package_url: value.package_url, + admin_pubkey: value.admin.key().to_string(), + node_pubkey: value.app_node.id.key().to_string(), + public_ipv4: value.host_ipv4, + resource: Some(AppResource { + memory_mb: value.memory_mb as u32, + disk_mb: value.disk_size_gb as u32, + vcpu: value.vcpus as u32, + ports: value.mapped_ports.iter().map(|(_, g)| *g as u32).collect(), + }), + mapped_ports: value + .mapped_ports + .iter() + .map(|(h, g)| MappedPort { host_port: *h as u32, guest_port: *g as u32 }) + .collect(), + + created_at: value.created_at.to_rfc3339(), + updated_at: value.created_at.to_rfc3339(), + nano_per_minute: value.price_per_unit, + locked_nano: value.locked_nano, + collected_at: value.collected_at.to_rfc3339(), + hratls_pubkey: value.mr_enclave, + public_package_mr_enclave, + app_name: value.app_name, + } + } +} -- 2.43.0 From f831e31d8f0425bdc33f7286e883c1960848326d Mon Sep 17 00:00:00 2001 From: Noor Date: Mon, 12 May 2025 13:05:38 +0530 Subject: [PATCH 04/11] App brain message new app req and delete app message schema for new_app_req improve error handling on live select --- interim_tables.surql | 14 +++++++++++ src/constants.rs | 2 ++ src/db/app.rs | 60 ++++++++++++++++++++++++++++++++++++++++++++ src/db/mod.rs | 52 +++++++++++++++++++++++++++++++++++--- src/grpc/app.rs | 21 +++++++++++++++- src/grpc/types.rs | 44 ++++++++++++++++++++++++++++++++ 6 files changed, 188 insertions(+), 5 deletions(-) diff --git a/interim_tables.surql b/interim_tables.surql index 8c4d0ba..a0cf525 100644 --- a/interim_tables.surql +++ b/interim_tables.surql @@ -91,6 +91,20 @@ DEFINE FIELD max_ports_per_app ON TABLE app_node TYPE int; DEFINE FIELD price ON TABLE app_node TYPE int; DEFINE FIELD offline_minutes ON TABLE app_node TYPE int; +DEFINE TABLE new_app_req Type RELATION FROM account to app_node SCHEMAFULL; +DEFINE FIELD app_name ON TABLE new_app_req TYPE string; +DEFINE FIELD package_url ON TABLE new_app_req TYPE string; +DEFINE FIELD mr_enclave ON TABLE new_app_req TYPE string; +DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string; +DEFINE FIELD ports ON TABLE new_app_req TYPE array; +DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int; +DEFINE FIELD vcpu ON TABLE new_app_req TYPE int; +DEFINE FIELD disk_mb ON TABLE new_app_req TYPE int; +DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int; +DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int; +DEFINE FIELD error ON TABLE new_app_req TYPE string; +DEFINE FIELD created_at ON TABLE new_app_req TYPE datetime; + DEFINE TABLE active_app TYPE RELATION FROM account TO app_node SCHEMAFULL; DEFINE FIELD app_name ON TABLE active_app TYPE string; DEFINE FIELD mapped_ports ON TABLE active_app TYPE array<[int, int]>; diff --git a/src/constants.rs b/src/constants.rs index d398407..751f441 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -33,6 +33,8 @@ pub const VM_CONTRACT: &str = "vm_contract"; pub const ACTIVE_APP: &str = "active_app"; pub const APP_NODE: &str = "app_node"; +pub const NEW_APP_REQ: &str = "new_app_req"; +pub const DELETED_APP: &str = "deleted_app"; pub const ID_ALPHABET: [char; 62] = [ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', diff --git a/src/db/app.rs b/src/db/app.rs index 9ef5009..44f49f4 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -34,6 +34,44 @@ impl AppNode { } } +pub enum AppDaemonMsg { + Create(NewAppReq), + Delete(DeletedApp), +} + +impl From for AppDaemonMsg { + fn from(value: NewAppReq) -> Self { + Self::Create(value) + } +} + +impl From for AppDaemonMsg { + fn from(value: DeletedApp) -> Self { + Self::Delete(value) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct NewAppReq { + pub id: RecordId, + #[serde(rename = "in")] + pub admin: RecordId, + #[serde(rename = "out")] + pub app_node: RecordId, + pub app_name: String, + pub package_url: String, + pub mr_enclave: String, + pub hratls_pubkey: String, + pub ports: Vec, + pub memory_mb: u32, + pub vcpu: u32, + pub disk_mb: u32, + pub locked_nano: u64, + pub price_per_unit: u64, + pub error: String, + pub created_at: Datetime, +} + #[derive(Debug, Serialize, Deserialize)] pub struct AppNodeWithReports { pub id: RecordId, @@ -138,3 +176,25 @@ impl From<&old_brain::BrainData> for Vec { nodes } } + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeletedApp { + pub id: RecordId, + #[serde(rename = "in")] + pub admin: RecordId, + #[serde(rename = "out")] + pub app_node: RecordId, + pub app_name: String, + pub mapped_ports: Vec<(u64, u64)>, + pub host_ipv4: String, + pub vcpus: u64, + pub memory_mb: u64, + pub disk_size_gb: u64, + pub created_at: Datetime, + pub price_per_unit: u64, + pub locked_nano: u64, + pub collected_at: Datetime, + pub mr_enclave: String, + pub package_url: String, + pub hratls_pubkey: String, +} diff --git a/src/db/mod.rs b/src/db/mod.rs index 6b3f6c4..725d27a 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -2,7 +2,7 @@ pub mod app; pub mod general; pub mod vm; -use crate::constants::{DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ}; +use crate::constants::{APP_NODE, DELETED_APP, DELETED_VM, NEW_APP_REQ, NEW_VM_REQ, UPDATE_VM_REQ}; use crate::old_brain; use prelude::*; use serde::{Deserialize, Serialize}; @@ -24,6 +24,10 @@ pub enum Error { TimeOut(#[from] tokio::time::error::Elapsed), #[error("Failed to create account")] FailedToCreateDBEntry, + #[error("Unknown Table: {0}")] + UnknownTable(String), + #[error("Daemon channel got closed: {0}")] + AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), } pub mod prelude { @@ -95,9 +99,9 @@ pub async fn live_vmnode_msgs< t if t == std::any::type_name::() => NEW_VM_REQ.to_string(), t if t == std::any::type_name::() => UPDATE_VM_REQ.to_string(), t if t == std::any::type_name::() => DELETED_VM.to_string(), - wat => { - log::error!("listen_for_node: T has type {wat}"); - String::from("wat") + t => { + log::error!("live_vmnode_msgs type {t} not supported",); + return Err(Error::UnknownTable(t.to_string())); } }; let mut resp = @@ -119,3 +123,43 @@ pub async fn live_vmnode_msgs< } Ok(()) } + +pub async fn live_appnode_msgs< + T: std::fmt::Debug + Into + std::marker::Unpin + for<'de> Deserialize<'de>, +>( + db: &Surreal, + node_pubkey: &str, + tx: Sender, +) -> Result<(), Error> { + let table_name = match std::any::type_name::() { + t if t == std::any::type_name::() => NEW_APP_REQ.to_string(), + t if t == std::any::type_name::() => DELETED_APP.to_string(), + t => { + log::error!("live_appnode_msgs type {t} not supported",); + return Err(Error::UnknownTable(t.to_string())); + } + }; + let mut query_resp = db + .query(format!("live select * from {table_name} where out = {APP_NODE}:{node_pubkey};")) + .await?; + + let mut live_stream = query_resp.stream::>(0)?; + while let Some(result) = live_stream.next().await { + match result { + Ok(notification) => { + log::debug!("Got notification for node {node_pubkey}: {notification:?}"); + if notification.action == surrealdb::Action::Create { + tx.send(notification.data.into()).await? + } + } + Err(e) => { + log::error!( + "live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" + ); + return Err(Error::from(e)); + } + } + } + + Ok(()) +} diff --git a/src/grpc/app.rs b/src/grpc/app.rs index d8e2fac..3fbdafa 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -89,7 +89,26 @@ impl BrainAppDaemon for AppDaemonServer { info!("App Daemon {} connected to receive brain messages", pubkey); - todo!() + let (tx, rx) = mpsc::channel(6); + { + let db = self.db.clone(); + let pubkey = pubkey.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + let _ = db::live_appnode_msgs::(&db, &pubkey, tx).await; + }); + } + { + let db = self.db.clone(); + let pubkey = pubkey.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + let _ = db::live_appnode_msgs::(&db, &pubkey, tx).await; + }); + } + + let resp_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into())); + Ok(Response::new(Box::pin(resp_stream))) } async fn daemon_messages( diff --git a/src/grpc/types.rs b/src/grpc/types.rs index ee85a69..58ee2b5 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -285,3 +285,47 @@ impl From for AppContract { } } } + +impl From for NewAppReq { + fn from(value: db::NewAppReq) -> Self { + let resource = AppResource { + vcpu: value.vcpu, + memory_mb: value.memory_mb, + disk_mb: value.disk_mb, + ports: value.ports, + }; + let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default()); + + Self { + package_url: value.package_url, + node_pubkey: value.app_node.key().to_string(), + resource: Some(resource), + uuid: value.id.key().to_string(), + admin_pubkey: value.admin.key().to_string(), + price_per_unit: value.price_per_unit, + locked_nano: value.locked_nano, + hratls_pubkey: value.hratls_pubkey, + public_package_mr_enclave: mr_enclave, + app_name: value.app_name, + } + } +} + +impl From for DelAppReq { + fn from(value: db::DeletedApp) -> Self { + Self { uuid: value.id.key().to_string(), admin_pubkey: value.admin.key().to_string() } + } +} + +impl From for BrainMessageApp { + fn from(value: db::AppDaemonMsg) -> Self { + match value { + db::AppDaemonMsg::Create(new_app_req) => { + BrainMessageApp { msg: Some(brain_message_app::Msg::NewAppReq(new_app_req.into())) } + } + db::AppDaemonMsg::Delete(del_app_req) => BrainMessageApp { + msg: Some(brain_message_app::Msg::DeleteAppReq(del_app_req.into())), + }, + } + } +} -- 2.43.0 From 6f40a9c770f21c7b6e6d0a075c741ae5f5bfe582 Mon Sep 17 00:00:00 2001 From: Noor Date: Mon, 12 May 2025 18:09:30 +0530 Subject: [PATCH 05/11] App daemon message activate new app in db update node resource in db --- src/db/app.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++++- src/grpc/app.rs | 30 ++++++++++++++++++- src/grpc/types.rs | 12 ++++++++ 3 files changed, 116 insertions(+), 2 deletions(-) diff --git a/src/db/app.rs b/src/db/app.rs index 44f49f4..3080380 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,4 +1,4 @@ -use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE}; +use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, NEW_APP_REQ}; use crate::db::general::Report; use super::Error; @@ -72,6 +72,28 @@ pub struct NewAppReq { pub created_at: Datetime, } +impl NewAppReq { + pub async fn get(db: &Surreal, id: &str) -> Result, Error> { + let new_app_req: Option = db.select((NEW_APP_REQ, id)).await?; + Ok(new_app_req) + } + pub async fn submit_error( + db: &Surreal, + id: &str, + error: String, + ) -> Result, Error> { + #[derive(Serialize)] + struct NewAppError { + error: String, + } + + let record: Option = + db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?; + + Ok(record) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct AppNodeWithReports { pub id: RecordId, @@ -112,6 +134,38 @@ pub struct ActiveApp { hratls_pubkey: String, } +impl ActiveApp { + pub async fn activate(db: &Surreal, id: &str) -> Result<(), Error> { + let new_app_req = match NewAppReq::get(db, id).await? { + Some(r) => r, + None => return Ok(()), + }; + + let active_app = Self { + id: RecordId::from((ACTIVE_APP, id)), + admin: new_app_req.admin, + app_node: new_app_req.app_node, + app_name: new_app_req.app_name, + mapped_ports: vec![], + host_ipv4: String::new(), + vcpus: new_app_req.vcpu as u64, + memory_mb: new_app_req.memory_mb as u64, + disk_size_gb: new_app_req.disk_mb as u64, + created_at: new_app_req.created_at.clone(), + price_per_unit: new_app_req.price_per_unit, + locked_nano: new_app_req.locked_nano, + collected_at: new_app_req.created_at, + mr_enclave: new_app_req.mr_enclave.clone(), + package_url: new_app_req.package_url.clone(), + hratls_pubkey: new_app_req.hratls_pubkey.clone(), + }; + + let _: Vec = db.insert(()).relation(active_app).await?; + + Ok(()) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct ActiveAppWithNode { pub id: RecordId, @@ -153,6 +207,26 @@ impl ActiveAppWithNode { } } +#[derive(Debug, Serialize)] +pub struct AppNodeResources { + pub avail_no_of_port: u32, + pub avail_vcpus: u32, + pub avail_memory_mb: u32, + pub avail_storage_mb: u32, + pub max_ports_per_app: u32, +} + +impl AppNodeResources { + pub async fn merge( + self, + db: &Surreal, + node_pubkey: &str, + ) -> Result, Error> { + let app_node: Option = db.update((APP_NODE, node_pubkey)).merge(self).await?; + Ok(app_node) + } +} + impl From<&old_brain::BrainData> for Vec { fn from(old_data: &old_brain::BrainData) -> Self { let mut nodes = Vec::new(); diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 3fbdafa..91cdac4 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -136,7 +136,35 @@ impl BrainAppDaemon for AppDaemonServer { return Err(Status::unauthenticated("Could not authenticate the app daemon")); } - todo!() + while let Some(daemon_message) = req_stream.next().await { + match daemon_message { + Ok(msg) => match msg.msg { + Some(daemon_message_app::Msg::NewAppRes(new_app_resp)) => { + if !new_app_resp.error.is_empty() { + db::NewAppReq::submit_error( + &self.db, + &new_app_resp.uuid, + new_app_resp.error, + ) + .await?; + } else { + db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?; + } + } + Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => { + let node_resource: db::AppNodeResources = app_node_resources.into(); + node_resource.merge(&self.db, &pubkey).await?; + } + _ => {} + }, + + Err(e) => { + log::warn!("App Daemon Disconnected: {e:?}") + } + } + } + + Ok(Response::new(Empty {})) } } diff --git a/src/grpc/types.rs b/src/grpc/types.rs index 58ee2b5..c6804a7 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -329,3 +329,15 @@ impl From for BrainMessageApp { } } } + +impl From for db::AppNodeResources { + fn from(value: AppNodeResources) -> Self { + Self { + avail_no_of_port: value.avail_no_of_port, + avail_vcpus: value.avail_vcpus, + avail_memory_mb: value.avail_memory_mb, + avail_storage_mb: value.avail_storage_mb, + max_ports_per_app: value.max_ports_per_app, + } + } +} -- 2.43.0 From 0ccaa4840cf23783fdac17eeba8ce99d2ae69ea8 Mon Sep 17 00:00:00 2001 From: Noor Date: Tue, 13 May 2025 13:10:42 +0530 Subject: [PATCH 06/11] App nodes list modified AppNode filtering proto improve logging in signature checking --- Cargo.lock | 2 +- src/db/app.rs | 45 ++++++++++++++++++++++++++++++++++++++++++--- src/grpc/app.rs | 11 +++++++++-- src/grpc/mod.rs | 4 ++++ 4 files changed, 56 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd6da18..81b5bd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#0d4b712fad1040dd773d2076d7b4b65b18527136" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#637610196f708475230802fa67f1b3ee4d8d0679" dependencies = [ "bincode 2.0.1", "prost", diff --git a/src/db/app.rs b/src/db/app.rs index 3080380..e610eea 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,9 +1,9 @@ -use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, NEW_APP_REQ}; -use crate::db::general::Report; - use super::Error; +use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, NEW_APP_REQ}; use crate::db; +use crate::db::general::Report; use crate::old_brain; +use detee_shared::app_proto; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; use surrealdb::sql::Datetime; @@ -112,6 +112,45 @@ pub struct AppNodeWithReports { pub reports: Vec, } +impl AppNodeWithReports { + pub async fn find_by_filters( + db: &Surreal, + filters: app_proto::AppNodeFilters, + ) -> Result, Error> { + let mut filter_query = format!( + "select *, <-report.* from {APP_NODE} where + avail_ports >= {} && + max_ports_per_app >= {} && + avail_vcpus >= {} && + avail_mem_mb >= {} && + avail_storage_gbs >= {}\n", + filters.free_ports, + filters.free_ports, + filters.vcpus, + filters.memory_mb, + filters.storage_mb + ); + + if !filters.city.is_empty() { + filter_query += &format!("&& city = '{}' ", filters.city); + } + if !filters.region.is_empty() { + filter_query += &format!("&& region = '{}' ", filters.region); + } + if !filters.country.is_empty() { + filter_query += &format!("&& country = '{}' ", filters.country); + } + if !filters.ip.is_empty() { + filter_query += &format!("&& ip = '{}' ", filters.ip); + } + filter_query += ";"; + dbg!(&filter_query); + let mut query_resp = db.query(filter_query).await?; + let app_nodes: Vec = query_resp.take(0)?; + Ok(app_nodes) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct ActiveApp { id: RecordId, diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 91cdac4..26f9ea4 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -219,8 +219,15 @@ impl BrainAppCli for AppCliServer { ) -> Result::ListAppNodesStream>, tonic::Status> { let req = check_sig_from_req(req)?; info!("list_app_nodes process starting for {:?}", req); - - todo!() + let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req).await?; + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for app_node in app_nodes { + let _ = tx.send(Ok(app_node.into())).await; + } + }); + let resp_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(resp_stream))) } async fn get_one_app_node( diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs index 1dec498..4fbf0d4 100644 --- a/src/grpc/mod.rs +++ b/src/grpc/mod.rs @@ -58,6 +58,7 @@ impl_pubkey_getter!(RegisterAppNodeReq); impl_pubkey_getter!(AppNodeFilters); pub fn check_sig_from_req(req: Request) -> Result { + log::trace!("Checking signature from request: {:?}", req); let time = match req.metadata().get("timestamp") { Some(t) => t.clone(), None => return Err(Status::unauthenticated("Timestamp not found in metadata.")), @@ -121,6 +122,9 @@ pub fn check_sig_from_req(req: Request) -> } pub fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> { + log::trace!( + "Checking signature from parts: pubkey: {pubkey}, time: {time}, msg: {msg}, sig: {sig}" + ); let now = chrono::Utc::now(); let parsed_time = chrono::DateTime::parse_from_rfc3339(time) .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; -- 2.43.0 From 1e8014a5d64634e03567800ec752833a26bf53ee Mon Sep 17 00:00:00 2001 From: Noor Date: Tue, 13 May 2025 16:07:57 +0530 Subject: [PATCH 07/11] App node get one app node --- src/db/app.rs | 11 ++++++++--- src/db/general.rs | 2 +- src/grpc/app.rs | 9 +++++++-- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/db/app.rs b/src/db/app.rs index e610eea..017b693 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -94,7 +94,7 @@ impl NewAppReq { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct AppNodeWithReports { pub id: RecordId, pub operator: RecordId, @@ -116,6 +116,7 @@ impl AppNodeWithReports { pub async fn find_by_filters( db: &Surreal, filters: app_proto::AppNodeFilters, + limit_one: bool, ) -> Result, Error> { let mut filter_query = format!( "select *, <-report.* from {APP_NODE} where @@ -123,7 +124,7 @@ impl AppNodeWithReports { max_ports_per_app >= {} && avail_vcpus >= {} && avail_mem_mb >= {} && - avail_storage_gbs >= {}\n", + avail_storage_gbs >= {} ", filters.free_ports, filters.free_ports, filters.vcpus, @@ -143,8 +144,12 @@ impl AppNodeWithReports { if !filters.ip.is_empty() { filter_query += &format!("&& ip = '{}' ", filters.ip); } + + if limit_one { + filter_query += "limit 1"; + } + filter_query += ";"; - dbg!(&filter_query); let mut query_resp = db.query(filter_query).await?; let app_nodes: Vec = query_resp.take(0)?; Ok(app_nodes) diff --git a/src/db/general.rs b/src/db/general.rs index ae01c57..5f6548d 100644 --- a/src/db/general.rs +++ b/src/db/general.rs @@ -113,7 +113,7 @@ pub struct Kick { contract: RecordId, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct Report { #[serde(rename = "in")] from_account: RecordId, diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 26f9ea4..5000721 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -219,7 +219,7 @@ impl BrainAppCli for AppCliServer { ) -> Result::ListAppNodesStream>, tonic::Status> { let req = check_sig_from_req(req)?; info!("list_app_nodes process starting for {:?}", req); - let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req).await?; + let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for app_node in app_nodes { @@ -236,7 +236,12 @@ impl BrainAppCli for AppCliServer { ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("get_one_app_node process starting for {:?}", req); + let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true) + .await? + .first() + .ok_or(Status::not_found("No app node found"))? + .clone(); - todo!() + Ok(Response::new(app_node.into())) } } -- 2.43.0 From 38a73fd0034d7ee613716f20797eadf13581f1d8 Mon Sep 17 00:00:00 2001 From: Noor Date: Wed, 14 May 2025 00:09:36 +0530 Subject: [PATCH 08/11] App contract list Implement retrieve app contracts by UUID, admin, and operator --- src/db/app.rs | 42 ++++++++++++++++++++++++++++++++++++++---- src/grpc/app.rs | 31 ++++++++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/src/db/app.rs b/src/db/app.rs index 017b693..6c30cf8 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -233,6 +233,12 @@ pub struct ActiveAppWithNode { } impl ActiveAppWithNode { + pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { + let contract: Option = + db.query(format!("select * from {ACTIVE_APP}:{uuid} fetch out;")).await?.take(0)?; + Ok(contract) + } + pub async fn list_by_node(db: &Surreal, node_pubkey: &str) -> Result, Error> { let mut query_result = db .query(format!( @@ -244,10 +250,38 @@ impl ActiveAppWithNode { Ok(contract) } - pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { - let contract: Option = - db.query(format!("select * from {ACTIVE_APP}:{uuid} fetch out;")).await?.take(0)?; - Ok(contract) + pub async fn list_by_admin(db: &Surreal, admin: &str) -> Result, Error> { + let mut query_result = db + .query(format!("select * from {ACTIVE_APP} where in = {ACCOUNT}:{admin} fetch out;")) + .await?; + + let app_contracts: Vec = query_result.take(0)?; + + Ok(app_contracts) + } + + pub async fn list_by_operator( + db: &Surreal, + operator: &str, + ) -> Result, Error> { + let mut query_result = db + .query(format!( + "select + (select * from ->operator->app_node<-{ACTIVE_APP} fetch out) + as app_contracts + from {ACCOUNT}:{operator}" + )) + .await?; + #[derive(Deserialize)] + struct Wrapper { + app_contracts: Vec, + } + + let c: Option = query_result.take(0)?; + match c { + Some(contracts_wrapper) => Ok(contracts_wrapper.app_contracts), + None => Ok(vec![]), + } } } diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 5000721..10c69d0 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -210,7 +210,36 @@ impl BrainAppCli for AppCliServer { let req = check_sig_from_req(req)?; info!("list_app_contracts process starting for {:?}", req); - todo!() + let mut app_contracts = Vec::new(); + + if !req.uuid.is_empty() { + if let Some(app_contract) = + db::ActiveAppWithNode::get_by_uuid(&self.db, &req.uuid).await? + { + if app_contract.admin.key().to_string() == req.admin_pubkey { + app_contracts.push(app_contract); + } + // TODO: allow operator to inspect contracts + } + } else if req.as_operator { + app_contracts.append( + &mut db::ActiveAppWithNode::list_by_operator(&self.db, &req.admin_pubkey).await?, + ); + } else { + app_contracts.append( + &mut db::ActiveAppWithNode::list_by_admin(&self.db, &req.admin_pubkey).await?, + ); + } + + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for app_contract in app_contracts { + let _ = tx.send(Ok(app_contract.into())).await; + } + }); + + let resp_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(resp_stream))) } async fn list_app_nodes( -- 2.43.0 From 92f4e15412ea56d6d31d2e66be0c25f1f3aea957 Mon Sep 17 00:00:00 2001 From: Noor Date: Wed, 14 May 2025 13:31:12 +0530 Subject: [PATCH 09/11] App contract delete --- src/db/app.rs | 68 ++++++++++++++++++++++++++++++++++++------------- src/grpc/app.rs | 24 +++++++++-------- 2 files changed, 64 insertions(+), 28 deletions(-) diff --git a/src/db/app.rs b/src/db/app.rs index 6c30cf8..d7f9601 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,5 +1,5 @@ use super::Error; -use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, NEW_APP_REQ}; +use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ}; use crate::db; use crate::db::general::Report; use crate::old_brain; @@ -158,24 +158,47 @@ impl AppNodeWithReports { #[derive(Debug, Serialize, Deserialize)] pub struct ActiveApp { - id: RecordId, + pub id: RecordId, #[serde(rename = "in")] - admin: RecordId, + pub admin: RecordId, #[serde(rename = "out")] - app_node: RecordId, - app_name: String, - mapped_ports: Vec<(u64, u64)>, - host_ipv4: String, - vcpus: u64, - memory_mb: u64, - disk_size_gb: u64, - created_at: Datetime, - price_per_unit: u64, - locked_nano: u64, - collected_at: Datetime, - mr_enclave: String, - package_url: String, - hratls_pubkey: String, + pub app_node: RecordId, + pub app_name: String, + pub mapped_ports: Vec<(u64, u64)>, + pub host_ipv4: String, + pub vcpus: u64, + pub memory_mb: u64, + pub disk_size_gb: u64, + pub created_at: Datetime, + pub price_per_unit: u64, + pub locked_nano: u64, + pub collected_at: Datetime, + pub mr_enclave: String, + pub package_url: String, + pub hratls_pubkey: String, +} + +impl From for DeletedApp { + fn from(value: ActiveApp) -> Self { + Self { + id: value.id, + admin: value.admin, + app_node: value.app_node, + app_name: value.app_name, + mapped_ports: value.mapped_ports, + host_ipv4: value.host_ipv4, + vcpus: value.vcpus, + memory_mb: value.memory_mb, + disk_size_gb: value.disk_size_gb, + created_at: value.created_at, + price_per_unit: value.price_per_unit, + locked_nano: value.locked_nano, + collected_at: value.collected_at, + mr_enclave: value.mr_enclave, + package_url: value.package_url, + hratls_pubkey: value.hratls_pubkey, + } + } } impl ActiveApp { @@ -208,6 +231,17 @@ impl ActiveApp { Ok(()) } + + pub async fn delete(db: &Surreal, id: &str) -> Result { + let deleted_app: Option = db.delete((ACTIVE_APP, id)).await?; + if let Some(deleted_app) = deleted_app { + let deleted_app: DeletedApp = deleted_app.into(); + let _: Vec = db.insert(DELETED_APP).relation(deleted_app).await?; + Ok(true) + } else { + Ok(false) + } + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 10c69d0..d22851c 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -1,4 +1,5 @@ use crate::constants::{ACCOUNT, APP_NODE}; +use crate::db::app::ActiveApp; use crate::db::prelude as db; use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use detee_shared::app_proto::brain_app_cli_server::BrainAppCli; @@ -37,8 +38,7 @@ impl BrainAppDaemon for AppDaemonServer { async fn register_app_node( &self, req: tonic::Request, - ) -> Result::RegisterAppNodeStream>, tonic::Status> - { + ) -> Result::RegisterAppNodeStream>, Status> { let req = check_sig_from_req(req)?; info!("Starting app_node registration process for {:?}", req); @@ -77,7 +77,7 @@ impl BrainAppDaemon for AppDaemonServer { async fn brain_messages( &self, req: tonic::Request, - ) -> Result::BrainMessagesStream>, tonic::Status> { + ) -> Result::BrainMessagesStream>, Status> { let auth = req.into_inner(); let pubkey = auth.pubkey.clone(); check_sig_from_parts( @@ -114,7 +114,7 @@ impl BrainAppDaemon for AppDaemonServer { async fn daemon_messages( &self, req: tonic::Request>, - ) -> Result, tonic::Status> { + ) -> Result, Status> { let mut req_stream = req.into_inner(); let pubkey: String; if let Some(Ok(msg)) = req_stream.next().await { @@ -186,7 +186,7 @@ impl BrainAppCli for AppCliServer { async fn deploy_app( &self, req: tonic::Request, - ) -> Result, tonic::Status> { + ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("deploy_app process starting for {:?}", req); @@ -196,17 +196,19 @@ impl BrainAppCli for AppCliServer { async fn delete_app( &self, req: tonic::Request, - ) -> Result, tonic::Status> { + ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("delete_app process starting for {:?}", req); - - todo!() + match ActiveApp::delete(&self.db, &req.uuid).await? { + true => Ok(Response::new(Empty {})), + false => Err(Status::not_found(format!("Could not find App contract {}", &req.uuid))), + } } async fn list_app_contracts( &self, req: tonic::Request, - ) -> Result::ListAppContractsStream>, tonic::Status> { + ) -> Result::ListAppContractsStream>, Status> { let req = check_sig_from_req(req)?; info!("list_app_contracts process starting for {:?}", req); @@ -245,7 +247,7 @@ impl BrainAppCli for AppCliServer { async fn list_app_nodes( &self, req: tonic::Request, - ) -> Result::ListAppNodesStream>, tonic::Status> { + ) -> Result::ListAppNodesStream>, Status> { let req = check_sig_from_req(req)?; info!("list_app_nodes process starting for {:?}", req); let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?; @@ -262,7 +264,7 @@ impl BrainAppCli for AppCliServer { async fn get_one_app_node( &self, req: tonic::Request, - ) -> Result, tonic::Status> { + ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("get_one_app_node process starting for {:?}", req); let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true) -- 2.43.0 From 8caa55e3251ba83f5650b70ae939adc9c4c0b567 Mon Sep 17 00:00:00 2001 From: Noor Date: Wed, 14 May 2025 20:55:13 +0530 Subject: [PATCH 10/11] App deployment rename rpc method app deploy to new_app improved is_banned_by_node checking app nodes also moved common ErrorFromTable type to db module --- Cargo.lock | 18 ++++++++--------- src/db/app.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++++- src/db/general.rs | 17 ++++++++++++---- src/db/mod.rs | 7 +++++++ src/db/vm.rs | 8 ++------ src/grpc/app.rs | 35 ++++++++++++++++++++++++++------ src/grpc/types.rs | 48 +++++++++++++++++++++++++++++++++++++++++++- 7 files changed, 157 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81b5bd8..0ff1d97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#637610196f708475230802fa67f1b3ee4d8d0679" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#da0f3269a31e0ebfb7328e2115e212aabe4d984a" dependencies = [ "bincode 2.0.1", "prost", @@ -2291,9 +2291,9 @@ dependencies = [ [[package]] name = "multimap" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" [[package]] name = "nanoid" @@ -2756,7 +2756,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.13.0", + "itertools 0.11.0", "log", "multimap", "once_cell", @@ -2776,7 +2776,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.100", @@ -3315,9 +3315,9 @@ dependencies = [ [[package]] name = "rustix" -version = "1.0.5" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf" +checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" dependencies = [ "bitflags", "errno", @@ -4016,9 +4016,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.19.1" +version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" dependencies = [ "fastrand", "getrandom 0.3.2", diff --git a/src/db/app.rs b/src/db/app.rs index d7f9601..478d87d 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use super::Error; use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ}; use crate::db; @@ -7,7 +9,8 @@ use detee_shared::app_proto; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; use surrealdb::sql::Datetime; -use surrealdb::{RecordId, Surreal}; +use surrealdb::{Notification, RecordId, Surreal}; +use tokio_stream::StreamExt; #[derive(Debug, Serialize, Deserialize)] pub struct AppNode { @@ -92,6 +95,11 @@ impl NewAppReq { Ok(record) } + + pub async fn submit(self, db: &Surreal) -> Result, Error> { + let new_app_req: Vec = db.insert(NEW_APP_REQ).relation(self).await?; + Ok(new_app_req) + } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -242,6 +250,47 @@ impl ActiveApp { Ok(false) } } + pub async fn listen(db: &Surreal, app_id: &str) -> Result { + let mut query_response = db + .query(format!( + "live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};" + )) + .query(format!("live select * from {ACTIVE_APP} where id = {ACTIVE_APP}:{app_id};")) + .await?; + + let mut error_stream = query_response.stream::>(0)?; + let mut active_app_stream = query_response.stream::>(1)?; + + tokio::time::timeout(Duration::from_secs(30), async { + loop { + tokio::select! { + Some(err_notif) = error_stream.next() =>{ + match err_notif{ + Ok(err_notif) =>{ + if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){ + return Err(Error::NewAppDaemonResp(err_notif.data.error)) + } + } + Err(e) => return Err(e.into()) + } + + } + Some(active_app_notif) = active_app_stream.next() => { + match active_app_notif { + Ok(active_app_notif) =>{ + if active_app_notif.action == surrealdb::Action::Create { + let _: Option = db.delete((NEW_APP_REQ, app_id)).await?; + return Ok(active_app_notif.data); + } + } + Err(e) => return Err(e.into()) + } + } + } + } + }) + .await? + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/db/general.rs b/src/db/general.rs index 5f6548d..c61808d 100644 --- a/src/db/general.rs +++ b/src/db/general.rs @@ -57,16 +57,25 @@ impl Account { user: &str, node: &str, ) -> Result { - let ban: Option = db + let mut query_response = db .query(format!( "(select operator->ban[0] as ban from vm_node:{node} where operator->ban->account contains account:{user} ).ban;" )) - .await? - .take(0)?; - Ok(ban.is_some()) + .query(format!( + "(select operator->ban[0] as ban + from app_node:{node} + where operator->ban->account contains account:{user} + ).ban;" + )) + .await?; + + let vm_node_ban: Option = query_response.take(0)?; + let app_node_ban: Option = query_response.take(1)?; + + Ok(vm_node_ban.is_some() || app_node_ban.is_some()) } } diff --git a/src/db/mod.rs b/src/db/mod.rs index 725d27a..e22e6d1 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -28,6 +28,8 @@ pub enum Error { UnknownTable(String), #[error("Daemon channel got closed: {0}")] AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), + #[error("AppDaemon Error {0}")] + NewAppDaemonResp(String), } pub mod prelude { @@ -163,3 +165,8 @@ pub async fn live_appnode_msgs< Ok(()) } + +#[derive(Deserialize)] +pub struct ErrorFromTable { + pub error: String, +} diff --git a/src/db/vm.rs b/src/db/vm.rs index 5641dea..516c517 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -5,7 +5,7 @@ use super::Error; use crate::constants::{ ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT, }; -use crate::db::{Account, Report}; +use crate::db::{Account, ErrorFromTable, Report}; use crate::old_brain; use detee_shared::vm_proto; use serde::{Deserialize, Serialize}; @@ -214,17 +214,13 @@ impl WrappedMeasurement { UPDATE_VM_REQ => UPDATE_VM_REQ, _ => NEW_VM_REQ, }; - #[derive(Deserialize)] - struct ErrorMessage { - error: String, - } let mut resp = db .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) .query(format!( "live select * from measurement_args where id = measurement_args:{vm_id};" )) .await?; - let mut error_stream = resp.stream::>(0)?; + let mut error_stream = resp.stream::>(0)?; let mut args_stream = resp.stream::>(1)?; let args: Option = diff --git a/src/grpc/app.rs b/src/grpc/app.rs index d22851c..775c15a 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -4,10 +4,7 @@ use crate::db::prelude as db; use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use detee_shared::app_proto::brain_app_cli_server::BrainAppCli; use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon; -use detee_shared::app_proto::{ - daemon_message_app, AppContract, AppNodeFilters, AppNodeListResp, BrainMessageApp, DaemonAuth, - DaemonMessageApp, DelAppReq, ListAppContractsReq, RegisterAppNodeReq, -}; +use detee_shared::app_proto::*; use detee_shared::common_proto::Empty; use log::info; use std::pin::Pin; @@ -183,14 +180,40 @@ impl BrainAppCli for AppCliServer { type ListAppContractsStream = Pin> + Send>>; type ListAppNodesStream = Pin> + Send>>; - async fn deploy_app( + async fn new_app( &self, req: tonic::Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("deploy_app process starting for {:?}", req); - todo!() + if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { + return Err(Status::permission_denied("This operator banned you. What did you do?")); + } + let new_app_req: db::NewAppReq = req.into(); + let id = new_app_req.id.to_string(); + + let (tx, rx) = tokio::sync::oneshot::channel(); + let db = self.db.clone(); + tokio::spawn(async move { + let _ = tx.send(db::ActiveApp::listen(&db, &id).await); + }); + + new_app_req.submit(&self.db).await?; + + match rx.await { + Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( + "Network timeout. Please try again later or contact the DeTEE devs team.", + )), + Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)), + Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())), + Err(e) => { + log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); + Err(Status::unknown( + "Unknown error. Please try again or contact the DeTEE devs team.", + )) + } + } } async fn delete_app( diff --git a/src/grpc/types.rs b/src/grpc/types.rs index c6804a7..dfde396 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -1,4 +1,4 @@ -use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE}; +use crate::constants::{ACCOUNT, APP_NODE, ID_ALPHABET, NEW_APP_REQ, NEW_VM_REQ, VM_NODE}; use crate::db::prelude as db; use detee_shared::app_proto::AppNodeListResp; use detee_shared::common_proto::MappedPort; @@ -286,6 +286,36 @@ impl From for AppContract { } } +impl From for db::NewAppReq { + fn from(val: NewAppReq) -> Self { + let resource = val.resource.unwrap_or_default(); + + let mr_enclave = val + .public_package_mr_enclave + .unwrap_or_default() + .iter() + .fold(String::new(), |acc, x| acc + &format!("{:02x?}", x)); + + Self { + id: RecordId::from((NEW_APP_REQ, nanoid!(40, &ID_ALPHABET))), + admin: RecordId::from((ACCOUNT, val.admin_pubkey)), + app_node: RecordId::from((APP_NODE, val.node_pubkey)), + app_name: val.app_name, + package_url: val.package_url, + mr_enclave, + hratls_pubkey: val.hratls_pubkey, + ports: resource.ports, + memory_mb: resource.memory_mb, + vcpu: resource.vcpu, + disk_mb: resource.disk_mb, + locked_nano: val.locked_nano, + price_per_unit: val.price_per_unit, + error: String::new(), + created_at: surrealdb::sql::Datetime::default(), + } + } +} + impl From for NewAppReq { fn from(value: db::NewAppReq) -> Self { let resource = AppResource { @@ -341,3 +371,19 @@ impl From for db::AppNodeResources { } } } + +impl From for NewAppRes { + fn from(val: db::ActiveApp) -> Self { + let mapped_ports = val + .mapped_ports + .iter() + .map(|(h, g)| MappedPort { host_port: *h as u32, guest_port: *g as u32 }) + .collect(); + Self { + uuid: val.id.key().to_string(), + ip_address: val.host_ipv4, + mapped_ports, + error: String::new(), + } + } +} -- 2.43.0 From 5c74962ac61b39b0fc0cb0765032e74dc3e5f1ce Mon Sep 17 00:00:00 2001 From: Noor Date: Wed, 14 May 2025 20:58:19 +0530 Subject: [PATCH 11/11] Enhance contract inspect Allow operators to access app and VM contracts to inspect --- src/grpc/app.rs | 5 +++-- src/grpc/vm.rs | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 775c15a..1cf31ee 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -241,10 +241,11 @@ impl BrainAppCli for AppCliServer { if let Some(app_contract) = db::ActiveAppWithNode::get_by_uuid(&self.db, &req.uuid).await? { - if app_contract.admin.key().to_string() == req.admin_pubkey { + if app_contract.admin.key().to_string() == req.admin_pubkey + || app_contract.app_node.operator.key().to_string() == req.admin_pubkey + { app_contracts.push(app_contract); } - // TODO: allow operator to inspect contracts } } else if req.as_operator { app_contracts.append( diff --git a/src/grpc/vm.rs b/src/grpc/vm.rs index ebbd218..d7f6430 100644 --- a/src/grpc/vm.rs +++ b/src/grpc/vm.rs @@ -343,10 +343,11 @@ impl BrainVmCli for VmCliServer { if let Some(specific_contract) = db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? { - if specific_contract.admin.key().to_string() == req.wallet { + if specific_contract.admin.key().to_string() == req.wallet + || specific_contract.vm_node.operator.key().to_string() == req.wallet + { contracts.push(specific_contract); } - // TODO: allow operator to inspect contracts } } else if req.as_operator { contracts -- 2.43.0