From 7807e14167d0492ae109e7f6effaf6bd6ac3d38f Mon Sep 17 00:00:00 2001 From: Noor Date: Fri, 9 May 2025 17:11:34 +0530 Subject: [PATCH] WIP App: daemon register enhance logging levels --- Cargo.lock | 1 + Cargo.toml | 1 + src/bin/brain.rs | 6 +++++- src/constants.rs | 1 + src/db/app.rs | 22 +++++++++++++++++++++- src/grpc/app.rs | 38 ++++++++++++++++++++++++++++++++++++-- src/grpc/types.rs | 35 +++++++++++++++++++++++++++++++++++ 7 files changed, 100 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 394680f..dd6da18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3796,6 +3796,7 @@ dependencies = [ "ed25519-dalek", "env_logger", "futures", + "hex", "hyper-util", "itertools 0.14.0", "log", diff --git a/Cargo.toml b/Cargo.toml index c63e043..9d96c45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ env_logger = "0.11.8" thiserror = "2.0.12" nanoid = "0.4.0" dotenv = "0.15.0" +hex = "0.4.3" [profile.release] lto = true diff --git a/src/bin/brain.rs b/src/bin/brain.rs index f549f6d..4798063 100644 --- a/src/bin/brain.rs +++ b/src/bin/brain.rs @@ -17,7 +17,11 @@ async fn main() { if dotenv::from_filename("/etc/detee/brain/config.ini").is_err() { dotenv().ok(); } - env_logger::builder().filter_level(log::LevelFilter::Debug).init(); + env_logger::builder() + .filter_level(log::LevelFilter::Trace) + .filter_module("tungstenite", log::LevelFilter::Debug) + .filter_module("tokio_tungstenite", log::LevelFilter::Debug) + .init(); let db_url = std::env::var("DB_URL").expect("the environment variable DB_URL is not set"); let db_user = std::env::var("DB_USER").expect("the environment variable DB_USER is not set"); diff --git a/src/constants.rs b/src/constants.rs index bc95da8..d398407 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -32,6 +32,7 @@ pub const DELETED_VM: &str = "deleted_vm"; pub const VM_CONTRACT: &str = "vm_contract"; pub const ACTIVE_APP: &str = "active_app"; +pub const APP_NODE: &str = "app_node"; pub const ID_ALPHABET: [char; 62] = [ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', diff --git a/src/db/app.rs b/src/db/app.rs index c49bb0b..9ef5009 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,7 +1,8 @@ -use crate::constants::{ACCOUNT, ACTIVE_APP}; +use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE}; use crate::db::general::Report; use super::Error; +use crate::db; use crate::old_brain; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; @@ -25,6 +26,14 @@ pub struct AppNode { pub offline_minutes: u64, } +impl AppNode { + pub async fn register(self, db: &Surreal) -> Result { + db::Account::get_or_create(db, &self.operator.key().to_string()).await?; + let app_node: Option = db.upsert(self.id.clone()).content(self).await?; + app_node.ok_or(Error::FailedToCreateDBEntry) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct AppNodeWithReports { pub id: RecordId, @@ -88,6 +97,17 @@ pub struct ActiveAppWithNode { } impl ActiveAppWithNode { + pub async fn list_by_node(db: &Surreal, node_pubkey: &str) -> Result, Error> { + let mut query_result = db + .query(format!( + "select * from {ACTIVE_APP} where out = {APP_NODE}:{node_pubkey} fetch out;" + )) + .await?; + + let contract: Vec = query_result.take(0)?; + Ok(contract) + } + pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { let contract: Option = db.query(format!("select * from {ACTIVE_APP}:{uuid} fetch out;")).await?.take(0)?; diff --git a/src/grpc/app.rs b/src/grpc/app.rs index b672cef..d8e2fac 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -1,3 +1,5 @@ +use crate::constants::{ACCOUNT, APP_NODE}; +use crate::db::prelude as db; use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use detee_shared::app_proto::brain_app_cli_server::BrainAppCli; use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon; @@ -10,9 +12,12 @@ use log::info; use std::pin::Pin; use std::sync::Arc; use surrealdb::engine::remote::ws::Client; +use surrealdb::RecordId; use surrealdb::Surreal; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; -use tonic::{Status, Streaming}; +use tonic::{Response, Status, Streaming}; pub struct AppDaemonServer { pub db: Arc>, @@ -37,7 +42,36 @@ impl BrainAppDaemon for AppDaemonServer { let req = check_sig_from_req(req)?; info!("Starting app_node registration process for {:?}", req); - todo!() + let app_node = db::AppNode { + id: RecordId::from((APP_NODE, req.node_pubkey.clone())), + operator: RecordId::from((ACCOUNT, req.operator_wallet)), + country: req.country, + region: req.region, + city: req.city, + ip: req.main_ip, + price: req.price, + + avail_mem_mb: 0, + avail_vcpus: 0, + avail_storage_gbs: 0, + avail_ports: 0, + max_ports_per_app: 0, + offline_minutes: 0, + }; + + app_node.register(&self.db).await?; + info!("Sending existing contracts to {}", req.node_pubkey); + + let contracts = db::ActiveAppWithNode::list_by_node(&self.db, &req.node_pubkey).await?; + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for contract in contracts { + let _ = tx.send(Ok(contract.into())).await; + } + }); + + let resp_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(resp_stream))) } async fn brain_messages( diff --git a/src/grpc/types.rs b/src/grpc/types.rs index 59c7af0..ee85a69 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -250,3 +250,38 @@ impl From for db::VmNodeResources { } } } + +impl From for AppContract { + fn from(value: db::ActiveAppWithNode) -> Self { + let public_package_mr_enclave = + Some(hex::decode(value.mr_enclave.clone()).unwrap_or_default()); + + AppContract { + uuid: value.id.key().to_string(), + package_url: value.package_url, + admin_pubkey: value.admin.key().to_string(), + node_pubkey: value.app_node.id.key().to_string(), + public_ipv4: value.host_ipv4, + resource: Some(AppResource { + memory_mb: value.memory_mb as u32, + disk_mb: value.disk_size_gb as u32, + vcpu: value.vcpus as u32, + ports: value.mapped_ports.iter().map(|(_, g)| *g as u32).collect(), + }), + mapped_ports: value + .mapped_ports + .iter() + .map(|(h, g)| MappedPort { host_port: *h as u32, guest_port: *g as u32 }) + .collect(), + + created_at: value.created_at.to_rfc3339(), + updated_at: value.created_at.to_rfc3339(), + nano_per_minute: value.price_per_unit, + locked_nano: value.locked_nano, + collected_at: value.collected_at.to_rfc3339(), + hratls_pubkey: value.mr_enclave, + public_package_mr_enclave, + app_name: value.app_name, + } + } +}