From 6f40a9c770f21c7b6e6d0a075c741ae5f5bfe582 Mon Sep 17 00:00:00 2001 From: Noor Date: Mon, 12 May 2025 18:09:30 +0530 Subject: [PATCH] App daemon message activate new app in db update node resource in db --- src/db/app.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++++- src/grpc/app.rs | 30 ++++++++++++++++++- src/grpc/types.rs | 12 ++++++++ 3 files changed, 116 insertions(+), 2 deletions(-) diff --git a/src/db/app.rs b/src/db/app.rs index 44f49f4..3080380 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,4 +1,4 @@ -use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE}; +use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, NEW_APP_REQ}; use crate::db::general::Report; use super::Error; @@ -72,6 +72,28 @@ pub struct NewAppReq { pub created_at: Datetime, } +impl NewAppReq { + pub async fn get(db: &Surreal, id: &str) -> Result, Error> { + let new_app_req: Option = db.select((NEW_APP_REQ, id)).await?; + Ok(new_app_req) + } + pub async fn submit_error( + db: &Surreal, + id: &str, + error: String, + ) -> Result, Error> { + #[derive(Serialize)] + struct NewAppError { + error: String, + } + + let record: Option = + db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?; + + Ok(record) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct AppNodeWithReports { pub id: RecordId, @@ -112,6 +134,38 @@ pub struct ActiveApp { hratls_pubkey: String, } +impl ActiveApp { + pub async fn activate(db: &Surreal, id: &str) -> Result<(), Error> { + let new_app_req = match NewAppReq::get(db, id).await? { + Some(r) => r, + None => return Ok(()), + }; + + let active_app = Self { + id: RecordId::from((ACTIVE_APP, id)), + admin: new_app_req.admin, + app_node: new_app_req.app_node, + app_name: new_app_req.app_name, + mapped_ports: vec![], + host_ipv4: String::new(), + vcpus: new_app_req.vcpu as u64, + memory_mb: new_app_req.memory_mb as u64, + disk_size_gb: new_app_req.disk_mb as u64, + created_at: new_app_req.created_at.clone(), + price_per_unit: new_app_req.price_per_unit, + locked_nano: new_app_req.locked_nano, + collected_at: new_app_req.created_at, + mr_enclave: new_app_req.mr_enclave.clone(), + package_url: new_app_req.package_url.clone(), + hratls_pubkey: new_app_req.hratls_pubkey.clone(), + }; + + let _: Vec = db.insert(()).relation(active_app).await?; + + Ok(()) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct ActiveAppWithNode { pub id: RecordId, @@ -153,6 +207,26 @@ impl ActiveAppWithNode { } } +#[derive(Debug, Serialize)] +pub struct AppNodeResources { + pub avail_no_of_port: u32, + pub avail_vcpus: u32, + pub avail_memory_mb: u32, + pub avail_storage_mb: u32, + pub max_ports_per_app: u32, +} + +impl AppNodeResources { + pub async fn merge( + self, + db: &Surreal, + node_pubkey: &str, + ) -> Result, Error> { + let app_node: Option = db.update((APP_NODE, node_pubkey)).merge(self).await?; + Ok(app_node) + } +} + impl From<&old_brain::BrainData> for Vec { fn from(old_data: &old_brain::BrainData) -> Self { let mut nodes = Vec::new(); diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 3fbdafa..91cdac4 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -136,7 +136,35 @@ impl BrainAppDaemon for AppDaemonServer { return Err(Status::unauthenticated("Could not authenticate the app daemon")); } - todo!() + while let Some(daemon_message) = req_stream.next().await { + match daemon_message { + Ok(msg) => match msg.msg { + Some(daemon_message_app::Msg::NewAppRes(new_app_resp)) => { + if !new_app_resp.error.is_empty() { + db::NewAppReq::submit_error( + &self.db, + &new_app_resp.uuid, + new_app_resp.error, + ) + .await?; + } else { + db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?; + } + } + Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => { + let node_resource: db::AppNodeResources = app_node_resources.into(); + node_resource.merge(&self.db, &pubkey).await?; + } + _ => {} + }, + + Err(e) => { + log::warn!("App Daemon Disconnected: {e:?}") + } + } + } + + Ok(Response::new(Empty {})) } } diff --git a/src/grpc/types.rs b/src/grpc/types.rs index 58ee2b5..c6804a7 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -329,3 +329,15 @@ impl From for BrainMessageApp { } } } + +impl From for db::AppNodeResources { + fn from(value: AppNodeResources) -> Self { + Self { + avail_no_of_port: value.avail_no_of_port, + avail_vcpus: value.avail_vcpus, + avail_memory_mb: value.avail_memory_mb, + avail_storage_mb: value.avail_storage_mb, + max_ports_per_app: value.max_ports_per_app, + } + } +}