From 8caa55e3251ba83f5650b70ae939adc9c4c0b567 Mon Sep 17 00:00:00 2001 From: Noor Date: Wed, 14 May 2025 20:55:13 +0530 Subject: [PATCH] App deployment rename rpc method app deploy to new_app improved is_banned_by_node checking app nodes also moved common ErrorFromTable type to db module --- Cargo.lock | 18 ++++++++--------- src/db/app.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++++- src/db/general.rs | 17 ++++++++++++---- src/db/mod.rs | 7 +++++++ src/db/vm.rs | 8 ++------ src/grpc/app.rs | 35 ++++++++++++++++++++++++++------ src/grpc/types.rs | 48 +++++++++++++++++++++++++++++++++++++++++++- 7 files changed, 157 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81b5bd8..0ff1d97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#637610196f708475230802fa67f1b3ee4d8d0679" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#da0f3269a31e0ebfb7328e2115e212aabe4d984a" dependencies = [ "bincode 2.0.1", "prost", @@ -2291,9 +2291,9 @@ dependencies = [ [[package]] name = "multimap" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" [[package]] name = "nanoid" @@ -2756,7 +2756,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.13.0", + "itertools 0.11.0", "log", "multimap", "once_cell", @@ -2776,7 +2776,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.100", @@ -3315,9 +3315,9 @@ dependencies = [ [[package]] name = "rustix" -version = "1.0.5" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf" +checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" dependencies = [ "bitflags", "errno", @@ -4016,9 +4016,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.19.1" +version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" dependencies = [ "fastrand", "getrandom 0.3.2", diff --git a/src/db/app.rs b/src/db/app.rs index d7f9601..478d87d 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use super::Error; use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ}; use crate::db; @@ -7,7 +9,8 @@ use detee_shared::app_proto; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; use surrealdb::sql::Datetime; -use surrealdb::{RecordId, Surreal}; +use surrealdb::{Notification, RecordId, Surreal}; +use tokio_stream::StreamExt; #[derive(Debug, Serialize, Deserialize)] pub struct AppNode { @@ -92,6 +95,11 @@ impl NewAppReq { Ok(record) } + + pub async fn submit(self, db: &Surreal) -> Result, Error> { + let new_app_req: Vec = db.insert(NEW_APP_REQ).relation(self).await?; + Ok(new_app_req) + } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -242,6 +250,47 @@ impl ActiveApp { Ok(false) } } + pub async fn listen(db: &Surreal, app_id: &str) -> Result { + let mut query_response = db + .query(format!( + "live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};" + )) + .query(format!("live select * from {ACTIVE_APP} where id = {ACTIVE_APP}:{app_id};")) + .await?; + + let mut error_stream = query_response.stream::>(0)?; + let mut active_app_stream = query_response.stream::>(1)?; + + tokio::time::timeout(Duration::from_secs(30), async { + loop { + tokio::select! { + Some(err_notif) = error_stream.next() =>{ + match err_notif{ + Ok(err_notif) =>{ + if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){ + return Err(Error::NewAppDaemonResp(err_notif.data.error)) + } + } + Err(e) => return Err(e.into()) + } + + } + Some(active_app_notif) = active_app_stream.next() => { + match active_app_notif { + Ok(active_app_notif) =>{ + if active_app_notif.action == surrealdb::Action::Create { + let _: Option = db.delete((NEW_APP_REQ, app_id)).await?; + return Ok(active_app_notif.data); + } + } + Err(e) => return Err(e.into()) + } + } + } + } + }) + .await? + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/db/general.rs b/src/db/general.rs index 5f6548d..c61808d 100644 --- a/src/db/general.rs +++ b/src/db/general.rs @@ -57,16 +57,25 @@ impl Account { user: &str, node: &str, ) -> Result { - let ban: Option = db + let mut query_response = db .query(format!( "(select operator->ban[0] as ban from vm_node:{node} where operator->ban->account contains account:{user} ).ban;" )) - .await? - .take(0)?; - Ok(ban.is_some()) + .query(format!( + "(select operator->ban[0] as ban + from app_node:{node} + where operator->ban->account contains account:{user} + ).ban;" + )) + .await?; + + let vm_node_ban: Option = query_response.take(0)?; + let app_node_ban: Option = query_response.take(1)?; + + Ok(vm_node_ban.is_some() || app_node_ban.is_some()) } } diff --git a/src/db/mod.rs b/src/db/mod.rs index 725d27a..e22e6d1 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -28,6 +28,8 @@ pub enum Error { UnknownTable(String), #[error("Daemon channel got closed: {0}")] AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), + #[error("AppDaemon Error {0}")] + NewAppDaemonResp(String), } pub mod prelude { @@ -163,3 +165,8 @@ pub async fn live_appnode_msgs< Ok(()) } + +#[derive(Deserialize)] +pub struct ErrorFromTable { + pub error: String, +} diff --git a/src/db/vm.rs b/src/db/vm.rs index 5641dea..516c517 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -5,7 +5,7 @@ use super::Error; use crate::constants::{ ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT, }; -use crate::db::{Account, Report}; +use crate::db::{Account, ErrorFromTable, Report}; use crate::old_brain; use detee_shared::vm_proto; use serde::{Deserialize, Serialize}; @@ -214,17 +214,13 @@ impl WrappedMeasurement { UPDATE_VM_REQ => UPDATE_VM_REQ, _ => NEW_VM_REQ, }; - #[derive(Deserialize)] - struct ErrorMessage { - error: String, - } let mut resp = db .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) .query(format!( "live select * from measurement_args where id = measurement_args:{vm_id};" )) .await?; - let mut error_stream = resp.stream::>(0)?; + let mut error_stream = resp.stream::>(0)?; let mut args_stream = resp.stream::>(1)?; let args: Option = diff --git a/src/grpc/app.rs b/src/grpc/app.rs index d22851c..775c15a 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -4,10 +4,7 @@ use crate::db::prelude as db; use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use detee_shared::app_proto::brain_app_cli_server::BrainAppCli; use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon; -use detee_shared::app_proto::{ - daemon_message_app, AppContract, AppNodeFilters, AppNodeListResp, BrainMessageApp, DaemonAuth, - DaemonMessageApp, DelAppReq, ListAppContractsReq, RegisterAppNodeReq, -}; +use detee_shared::app_proto::*; use detee_shared::common_proto::Empty; use log::info; use std::pin::Pin; @@ -183,14 +180,40 @@ impl BrainAppCli for AppCliServer { type ListAppContractsStream = Pin> + Send>>; type ListAppNodesStream = Pin> + Send>>; - async fn deploy_app( + async fn new_app( &self, req: tonic::Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("deploy_app process starting for {:?}", req); - todo!() + if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { + return Err(Status::permission_denied("This operator banned you. What did you do?")); + } + let new_app_req: db::NewAppReq = req.into(); + let id = new_app_req.id.to_string(); + + let (tx, rx) = tokio::sync::oneshot::channel(); + let db = self.db.clone(); + tokio::spawn(async move { + let _ = tx.send(db::ActiveApp::listen(&db, &id).await); + }); + + new_app_req.submit(&self.db).await?; + + match rx.await { + Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( + "Network timeout. Please try again later or contact the DeTEE devs team.", + )), + Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)), + Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())), + Err(e) => { + log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); + Err(Status::unknown( + "Unknown error. Please try again or contact the DeTEE devs team.", + )) + } + } } async fn delete_app( diff --git a/src/grpc/types.rs b/src/grpc/types.rs index c6804a7..dfde396 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -1,4 +1,4 @@ -use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE}; +use crate::constants::{ACCOUNT, APP_NODE, ID_ALPHABET, NEW_APP_REQ, NEW_VM_REQ, VM_NODE}; use crate::db::prelude as db; use detee_shared::app_proto::AppNodeListResp; use detee_shared::common_proto::MappedPort; @@ -286,6 +286,36 @@ impl From for AppContract { } } +impl From for db::NewAppReq { + fn from(val: NewAppReq) -> Self { + let resource = val.resource.unwrap_or_default(); + + let mr_enclave = val + .public_package_mr_enclave + .unwrap_or_default() + .iter() + .fold(String::new(), |acc, x| acc + &format!("{:02x?}", x)); + + Self { + id: RecordId::from((NEW_APP_REQ, nanoid!(40, &ID_ALPHABET))), + admin: RecordId::from((ACCOUNT, val.admin_pubkey)), + app_node: RecordId::from((APP_NODE, val.node_pubkey)), + app_name: val.app_name, + package_url: val.package_url, + mr_enclave, + hratls_pubkey: val.hratls_pubkey, + ports: resource.ports, + memory_mb: resource.memory_mb, + vcpu: resource.vcpu, + disk_mb: resource.disk_mb, + locked_nano: val.locked_nano, + price_per_unit: val.price_per_unit, + error: String::new(), + created_at: surrealdb::sql::Datetime::default(), + } + } +} + impl From for NewAppReq { fn from(value: db::NewAppReq) -> Self { let resource = AppResource { @@ -341,3 +371,19 @@ impl From for db::AppNodeResources { } } } + +impl From for NewAppRes { + fn from(val: db::ActiveApp) -> Self { + let mapped_ports = val + .mapped_ports + .iter() + .map(|(h, g)| MappedPort { host_port: *h as u32, guest_port: *g as u32 }) + .collect(); + Self { + uuid: val.id.key().to_string(), + ip_address: val.host_ipv4, + mapped_ports, + error: String::new(), + } + } +}