From a9541d8de7d44f4f8c98b791d78c1c0587b4c9e4 Mon Sep 17 00:00:00 2001 From: Noor Date: Fri, 30 May 2025 18:52:32 +0530 Subject: [PATCH 1/3] fix new_vm error reverted stream query checking err on db before listening error stream refactor app grpc code --- src/db/mod.rs | 2 +- src/db/vm.rs | 25 +++++++++++++------------ src/grpc/app.rs | 46 ++++++++++++++++++++-------------------------- 3 files changed, 34 insertions(+), 39 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 8cd5ce3..7f22fdb 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -196,7 +196,7 @@ pub async fn live_appnode_msgs< Ok(()) } -#[derive(Deserialize)] +#[derive(Deserialize, Debug, Clone)] pub struct ErrorFromTable { pub error: String, } diff --git a/src/db/vm.rs b/src/db/vm.rs index 9841aba..b216206 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -329,18 +329,22 @@ impl WrappedMeasurement { UPDATE_VM_REQ => UPDATE_VM_REQ, _ => NEW_VM_REQ, }; - - let mut args_stream = db + let mut resp = db + .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) .query(format!( "live select * from measurement_args where id = measurement_args:{vm_id};" )) - .await? - .stream::>(0)?; + .await?; + let mut error_stream = resp.stream::>(0)?; + let mut args_stream = resp.stream::>(1)?; - let mut error_stream = db - .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) - .await? - .stream::>(0)?; + let mut error = + db.query(format!("select error from {table} where id = {NEW_VM_REQ}:{vm_id}")).await?; + if let Some(error_on_newvm_req) = error.take::>(0)? { + if !error_on_newvm_req.error.is_empty() { + return Ok(Self::Error(vm_id.to_string(), error_on_newvm_req.error)); + } + } let args: Option = db.delete(("measurement_args", vm_id)).await?; @@ -358,10 +362,7 @@ impl WrappedMeasurement { Ok(err_notif) => { if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty() { - return Ok::( - Self::Error(vm_id.to_string(), - err_notif.data.error) - ); + return Ok(Self::Error(vm_id.to_string(), err_notif.data.error)); }; }, Err(e) => return Err(e.into()), diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 78be5af..ec742d7 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -15,7 +15,7 @@ use surrealdb::Surreal; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; -use tonic::{Response, Status, Streaming}; +use tonic::{Request, Response, Status, Streaming}; pub struct AppDaemonServer { pub db: Arc>, @@ -34,8 +34,8 @@ impl BrainAppDaemon for AppDaemonServer { async fn register_app_node( &self, - req: tonic::Request, - ) -> Result::RegisterAppNodeStream>, Status> { + req: Request, + ) -> Result::RegisterAppNodeStream>, Status> { let req = check_sig_from_req(req)?; info!("Starting app_node registration process for {:?}", req); @@ -73,8 +73,8 @@ impl BrainAppDaemon for AppDaemonServer { async fn brain_messages( &self, - req: tonic::Request, - ) -> Result::BrainMessagesStream>, Status> { + req: Request, + ) -> Result::BrainMessagesStream>, Status> { let auth = req.into_inner(); let pubkey = auth.pubkey.clone(); check_sig_from_parts( @@ -110,8 +110,8 @@ impl BrainAppDaemon for AppDaemonServer { async fn daemon_messages( &self, - req: tonic::Request>, - ) -> Result, Status> { + req: Request>, + ) -> Result, Status> { let mut req_stream = req.into_inner(); let pubkey: String; if let Some(Ok(msg)) = req_stream.next().await { @@ -180,18 +180,15 @@ impl BrainAppCli for AppCliServer { type ListAppContractsStream = Pin> + Send>>; type ListAppNodesStream = Pin> + Send>>; - async fn new_app( - &self, - req: tonic::Request, - ) -> Result, Status> { + async fn new_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; - info!("deploy_app process starting for {:?}", req); - + info!("new_app process starting for {:?}", req); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); } - let new_app_req: db::NewAppReq = req.into(); - let id = new_app_req.id.to_string(); + + let db_req: db::NewAppReq = req.into(); + let id = db_req.id.to_string(); let (tx, rx) = tokio::sync::oneshot::channel(); let db = self.db.clone(); @@ -199,7 +196,7 @@ impl BrainAppCli for AppCliServer { let _ = tx.send(db::ActiveApp::listen(&db, &id).await); }); - new_app_req.submit(&self.db).await?; + db_req.submit(&self.db).await?; match rx.await { Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( @@ -216,10 +213,7 @@ impl BrainAppCli for AppCliServer { } } - async fn delete_app( - &self, - req: tonic::Request, - ) -> Result, Status> { + async fn delete_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("delete_app process starting for {:?}", req); match ActiveApp::delete(&self.db, &req.uuid).await? { @@ -230,8 +224,8 @@ impl BrainAppCli for AppCliServer { async fn list_app_contracts( &self, - req: tonic::Request, - ) -> Result::ListAppContractsStream>, Status> { + req: Request, + ) -> Result::ListAppContractsStream>, Status> { let req = check_sig_from_req(req)?; info!("list_app_contracts process starting for {:?}", req); @@ -270,8 +264,8 @@ impl BrainAppCli for AppCliServer { async fn list_app_nodes( &self, - req: tonic::Request, - ) -> Result::ListAppNodesStream>, Status> { + req: Request, + ) -> Result::ListAppNodesStream>, Status> { let req = check_sig_from_req(req)?; info!("list_app_nodes process starting for {:?}", req); let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?; @@ -287,8 +281,8 @@ impl BrainAppCli for AppCliServer { async fn get_one_app_node( &self, - req: tonic::Request, - ) -> Result, Status> { + req: Request, + ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("get_one_app_node process starting for {:?}", req); let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true) -- 2.43.0 From 81ca9cf9e41fcf4e069b1bc64d5937046defb3c8 Mon Sep 17 00:00:00 2001 From: Noor Date: Tue, 3 Jun 2025 14:54:22 +0530 Subject: [PATCH 2/3] Implement new app contract balance locking minimum locking balance on deployment locking balance on app deployment refunding locked nano while error on daemon returning appropreate error on app deployment fixed some typos on logging new timeout constants for daemon respose minor change in schema and proto extensive tests on app deployments fixed some vm tests --- .gitignore | 1 + Cargo.lock | 2 +- Cargo.toml | 2 +- src/bin/migration0.rs | 1 + src/constants.rs | 3 + src/db/app.rs | 202 ++++++++++++++++++++++++++----- src/db/mod.rs | 4 +- src/db/vm.rs | 5 +- src/grpc/app.rs | 22 +++- src/grpc/types.rs | 12 +- src/grpc/vm.rs | 5 + surql/tables.sql | 4 +- tests/common/app_daemon_utils.rs | 144 ++++++++++++++++++++++ tests/common/mod.rs | 3 + tests/common/prepare_test_env.rs | 5 + tests/common/test_utils.rs | 14 ++- tests/common/vm_cli_utils.rs | 2 +- tests/common/vm_daemon_utils.rs | 9 +- tests/grpc_app_cli_test.rs | 91 ++++++++++++++ tests/grpc_vm_cli_test.rs | 6 +- tests/grpc_vm_daemon_test.rs | 2 +- 21 files changed, 478 insertions(+), 61 deletions(-) create mode 100644 tests/common/app_daemon_utils.rs create mode 100644 tests/grpc_app_cli_test.rs diff --git a/.gitignore b/.gitignore index ed3ad69..0a5c507 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target secrets tmp +.env diff --git a/Cargo.lock b/Cargo.lock index 3826a4e..b8a20bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d6ca058d2de78b5257517034bca2b2c7d5929db8" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#694d5811aa0edc9b2b9bdb17c6e972b04a21b96f" dependencies = [ "bincode 2.0.1", "prost", diff --git a/Cargo.toml b/Cargo.toml index 6011662..9d96c45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ serde_yaml = "0.9.34" surrealdb = "2.2.2" tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } tonic = { version = "0.12", features = ["tls"] } -detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain" } +detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" } ed25519-dalek = "2.1.1" bs58 = "0.5.1" tokio-stream = "0.1.17" diff --git a/src/bin/migration0.rs b/src/bin/migration0.rs index e622597..fd3c50f 100644 --- a/src/bin/migration0.rs +++ b/src/bin/migration0.rs @@ -17,6 +17,7 @@ async fn main() -> Result<(), Box> { let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env"); let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap(); + env_logger::builder().filter_level(log::LevelFilter::Trace); db::migration0(&db, &old_brain_data).await?; diff --git a/src/constants.rs b/src/constants.rs index 49098a3..c8aa89e 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -48,3 +48,6 @@ pub const ID_ALPHABET: [char; 62] = [ pub const TOKEN_DECIMAL: u64 = 1_000_000_000; pub const MIN_ESCROW: u64 = 5000 * TOKEN_DECIMAL; + +pub const APP_DAEMON_TIMEOUT: u64 = 20; +pub const VM_DAEMON_TIMEOUT: u64 = 10; diff --git a/src/db/app.rs b/src/db/app.rs index 3abe616..fdaac6d 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,11 +1,13 @@ use std::time::Duration; use super::Error; -use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ}; +use crate::constants::{ + ACCOUNT, ACTIVE_APP, APP_DAEMON_TIMEOUT, APP_NODE, DELETED_APP, NEW_APP_REQ, +}; use crate::db; use crate::db::general::Report; use crate::old_brain; -use detee_shared::app_proto; +use detee_shared::app_proto::{self, NewAppRes}; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; use surrealdb::sql::Datetime; @@ -68,8 +70,8 @@ pub struct NewAppReq { pub hratls_pubkey: String, pub ports: Vec, pub memory_mb: u32, - pub vcpu: u32, - pub disk_mb: u32, + pub vcpus: u32, + pub disk_size_gb: u32, pub locked_nano: u64, pub price_per_unit: u64, pub error: String, @@ -81,26 +83,117 @@ impl NewAppReq { let new_app_req: Option = db.select((NEW_APP_REQ, id)).await?; Ok(new_app_req) } - pub async fn submit_error( - db: &Surreal, - id: &str, - error: String, - ) -> Result, Error> { - #[derive(Serialize)] - struct NewAppError { - error: String, + pub async fn submit_error(db: &Surreal, id: &str, error: String) -> Result<(), Error> { + let tx_query = String::from( + " + BEGIN TRANSACTION; + + LET $new_app_req = $new_app_req_input; + LET $error = $error_input; + LET $record = (select * from $new_app_req)[0]; + LET $admin = $record.in; + + if $record == None {{ + THROW 'app req not exist ' + $new_app_req + }}; + + UPDATE $new_app_req SET error = $error; + + UPDATE $admin SET tmp_locked -= $record.locked_nano; + UPDATE $admin SET balance += $record.locked_nano; + + COMMIT TRANSACTION;", + ); + + log::trace!("submit_new_app_err query: {tx_query}"); + + let mut query_resp = db + .query(tx_query) + .bind(("new_app_req_input", RecordId::from((NEW_APP_REQ, id)))) + .bind(("error_input", error)) + .await?; + + let query_err = query_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in submit_new_app_err: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str { + log::error!("app req not exist: {}", query_err[&4]); + return Err(Error::ContractNotFound); + } else { + log::error!("Unknown error in submit_new_app_err: {query_err:?}"); + return Err(Error::Unknown("submit_new_app_req".to_string())); + } } - let record: Option = - db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?; - - Ok(record) + Ok(()) } - pub async fn submit(self, db: &Surreal) -> Result, Error> { - // TODO: handle financial transaction - let new_app_req: Vec = db.insert(NEW_APP_REQ).relation(self).await?; - Ok(new_app_req) + pub async fn submit(self, db: &Surreal) -> Result<(), Error> { + let locked_nano = self.locked_nano; + let tx_query = format!( " + BEGIN TRANSACTION; + + LET $account = $account_input; + LET $new_app_req = $new_app_req_input; + LET $app_node = $app_node_input; + LET $package_url = $package_url_input; + LET $mr_enclave = $mr_enclave_input; + LET $hratls_pubkey = $hratls_pubkey_input; + LET $app_name = $app_name_input; + + UPDATE $account SET balance -= {locked_nano}; + IF $account.balance < 0 {{ + THROW 'Insufficient funds.' + }}; + UPDATE $account SET tmp_locked += {locked_nano}; + + + RELATE + $account + ->$new_app_req + ->$app_node + CONTENT {{ + created_at: time::now(), app_name: $app_name, package_url: $package_url, + mr_enclave: $mr_enclave, hratls_pubkey: $hratls_pubkey, ports: {:?}, memory_mb: {}, + vcpus: {}, disk_size_gb: {}, locked_nano: {locked_nano}, price_per_unit: {}, error: '', + }}; + + COMMIT TRANSACTION; + ", + self.ports, self.memory_mb, self.vcpus, self.disk_size_gb, self.price_per_unit); + + log::trace!("submit_new_app_req query: {tx_query}"); + + let mut query_resp = db + .query(tx_query) + .bind(("account_input", self.admin)) + .bind(("new_app_req_input", self.id)) + .bind(("app_node_input", self.app_node)) + .bind(("package_url_input", self.package_url)) + .bind(("mr_enclave_input", self.mr_enclave)) + .bind(("hratls_pubkey_input", self.hratls_pubkey)) + .bind(("app_name_input", self.app_name)) + .await?; + + let query_err = query_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in submit_new_app_req: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&8) && query_err[&8].to_string() != tx_fail_err_str { + log::error!("Transaction error: {}", query_err[&8]); + return Err(Error::InsufficientFunds); + } else { + log::error!("Unknown error in submit_new_app_req: {query_err:?}"); + return Err(Error::Unknown("submit_new_app_req".to_string())); + } + } + + Ok(()) } } @@ -221,22 +314,31 @@ impl ActiveApp { (self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64) } - pub async fn activate(db: &Surreal, id: &str) -> Result<(), Error> { - let new_app_req = match NewAppReq::get(db, id).await? { + pub async fn activate( + db: &Surreal, + new_app_res: app_proto::NewAppRes, + ) -> Result<(), Error> { + let new_app_req = match NewAppReq::get(db, &new_app_res.uuid).await? { Some(r) => r, None => return Ok(()), }; + let mapped_ports = new_app_res + .mapped_ports + .into_iter() + .map(|data| (data.host_port, data.guest_port)) + .collect::>(); + let active_app = Self { - id: RecordId::from((ACTIVE_APP, id)), + id: RecordId::from((ACTIVE_APP, &new_app_res.uuid)), admin: new_app_req.admin, app_node: new_app_req.app_node, app_name: new_app_req.app_name, - mapped_ports: vec![], + mapped_ports, host_ipv4: String::new(), - vcpus: new_app_req.vcpu, + vcpus: new_app_req.vcpus, memory_mb: new_app_req.memory_mb, - disk_size_gb: new_app_req.disk_mb, + disk_size_gb: new_app_req.disk_size_gb, created_at: new_app_req.created_at.clone(), price_per_unit: new_app_req.price_per_unit, locked_nano: new_app_req.locked_nano, @@ -246,7 +348,13 @@ impl ActiveApp { hratls_pubkey: new_app_req.hratls_pubkey.clone(), }; + let admin_account = active_app.admin.key().to_string(); + let locked_nano = active_app.locked_nano; + let _: Vec = db.insert(()).relation(active_app).await?; + db.delete::>((NEW_APP_REQ, &new_app_res.uuid)).await?; + db.query(format!("UPDATE {ACCOUNT}:{admin_account} SET tmp_locked -= {locked_nano};")) + .await?; Ok(()) } @@ -261,7 +369,15 @@ impl ActiveApp { Ok(false) } } - pub async fn listen(db: &Surreal, app_id: &str) -> Result { +} + +pub enum WrappedAppResp { + NewAppRes(NewAppRes), + Error(NewAppRes), +} + +impl WrappedAppResp { + pub async fn listen(db: &Surreal, app_id: &str) -> Result { let mut query_response = db .query(format!( "live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};" @@ -272,14 +388,40 @@ impl ActiveApp { let mut error_stream = query_response.stream::>(0)?; let mut active_app_stream = query_response.stream::>(1)?; - tokio::time::timeout(Duration::from_secs(30), async { + let mut error = db + .query(format!("select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};")) + .await?; + if let Some(error_on_newapp_req) = error.take::>(0)? { + if !error_on_newapp_req.error.is_empty() { + let app_daemon_err = NewAppRes { + uuid: app_id.to_string(), + error: error_on_newapp_req.error, + ..Default::default() + }; + + return Ok(Self::Error(app_daemon_err)); + } + } + + if let Some(active_app) = db.select::>((ACTIVE_APP, app_id)).await? { + return Ok(Self::NewAppRes(active_app.into())); + } + log::trace!("listening for table: {NEW_APP_REQ}"); + + tokio::time::timeout(Duration::from_secs(APP_DAEMON_TIMEOUT), async { loop { tokio::select! { Some(err_notif) = error_stream.next() =>{ match err_notif{ Ok(err_notif) =>{ if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){ - return Err(Error::NewAppDaemonResp(err_notif.data.error)) + let app_daemon_err = NewAppRes { + uuid: app_id.to_string(), + error: err_notif.data.error, + ..Default::default() + }; + + return Ok(Self::Error(app_daemon_err)); } } Err(e) => return Err(e.into()) @@ -291,7 +433,7 @@ impl ActiveApp { Ok(active_app_notif) =>{ if active_app_notif.action == surrealdb::Action::Create { let _: Option = db.delete((NEW_APP_REQ, app_id)).await?; - return Ok(active_app_notif.data); + return Ok(Self::NewAppRes(active_app_notif.data.into())); } } Err(e) => return Err(e.into()) diff --git a/src/db/mod.rs b/src/db/mod.rs index 7f22fdb..6af367b 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -31,8 +31,6 @@ pub enum Error { UnknownTable(String), #[error("Daemon channel got closed: {0}")] AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), - #[error("AppDaemon Error {0}")] - NewAppDaemonResp(String), #[error("Minimum escrow amount is {MIN_ESCROW}")] MinimalEscrow, #[error("Insufficient funds, deposit more tokens")] @@ -186,7 +184,7 @@ pub async fn live_appnode_msgs< } Err(e) => { log::error!( - "live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" + "live_appnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" ); return Err(Error::from(e)); } diff --git a/src/db/vm.rs b/src/db/vm.rs index b216206..28c1fb8 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -3,7 +3,8 @@ use std::time::Duration; use super::Error; use crate::constants::{ - ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT, + ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_DAEMON_TIMEOUT, VM_NODE, + VM_UPDATE_EVENT, }; use crate::db::{Account, ErrorFromTable, Report}; use crate::old_brain; @@ -353,7 +354,7 @@ impl WrappedMeasurement { } log::trace!("listening for table: {table}"); - tokio::time::timeout(Duration::from_secs(10), async { + tokio::time::timeout(Duration::from_secs(VM_DAEMON_TIMEOUT), async { loop { tokio::select! { error_notification = error_stream.next() => { diff --git a/src/grpc/app.rs b/src/grpc/app.rs index ec742d7..6ab8565 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -145,7 +145,7 @@ impl BrainAppDaemon for AppDaemonServer { ) .await?; } else { - db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?; + db::ActiveApp::activate(&self.db, new_app_resp).await?; } } Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => { @@ -182,28 +182,38 @@ impl BrainAppCli for AppCliServer { async fn new_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; + // TODO: make it atleast 1 hour + if req.locked_nano < 100 { + log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); + return Err(Status::unknown("lock atleaset 100 nano lps")); + } info!("new_app process starting for {:?}", req); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); } let db_req: db::NewAppReq = req.into(); - let id = db_req.id.to_string(); + let id = db_req.id.key().to_string(); let (tx, rx) = tokio::sync::oneshot::channel(); let db = self.db.clone(); tokio::spawn(async move { - let _ = tx.send(db::ActiveApp::listen(&db, &id).await); + let _ = tx.send(db::WrappedAppResp::listen(&db, &id).await); }); - db_req.submit(&self.db).await?; match rx.await { + Ok(Ok(db::WrappedAppResp::NewAppRes(new_app_resp))) => Ok(Response::new(new_app_resp)), + Ok(Ok(db::WrappedAppResp::Error(err))) => Ok(Response::new(err)), Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( "Network timeout. Please try again later or contact the DeTEE devs team.", )), - Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)), - Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())), + Ok(Err(e)) => { + log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); + Err(Status::unknown( + "Unknown error. Please try again or contact the DeTEE devs team.", + )) + } Err(e) => { log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); Err(Status::unknown( diff --git a/src/grpc/types.rs b/src/grpc/types.rs index 900df5a..bc2b098 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -274,8 +274,8 @@ impl From for AppContract { public_ipv4: value.host_ipv4, resource: Some(AppResource { memory_mb: value.memory_mb, - disk_mb: value.disk_size_gb, - vcpu: value.vcpus, + disk_size_gb: value.disk_size_gb, + vcpus: value.vcpus, ports: value.mapped_ports.iter().map(|(_, g)| *g).collect(), }), mapped_ports: value @@ -316,8 +316,8 @@ impl From for db::NewAppReq { hratls_pubkey: val.hratls_pubkey, ports: resource.ports, memory_mb: resource.memory_mb, - vcpu: resource.vcpu, - disk_mb: resource.disk_mb, + vcpus: resource.vcpus, + disk_size_gb: resource.disk_size_gb, locked_nano: val.locked_nano, price_per_unit: val.price_per_unit, error: String::new(), @@ -329,9 +329,9 @@ impl From for db::NewAppReq { impl From for NewAppReq { fn from(value: db::NewAppReq) -> Self { let resource = AppResource { - vcpu: value.vcpu, + vcpus: value.vcpus, memory_mb: value.memory_mb, - disk_mb: value.disk_mb, + disk_size_gb: value.disk_size_gb, ports: value.ports, }; let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default()); diff --git a/src/grpc/vm.rs b/src/grpc/vm.rs index 9b5e6aa..c3f5ea3 100644 --- a/src/grpc/vm.rs +++ b/src/grpc/vm.rs @@ -222,6 +222,11 @@ impl BrainVmCli for VmCliServer { async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; + // TODO: make it atleast 1 hour + if req.locked_nano < 100 { + log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); + return Err(Status::unknown("lock atleaset 100 nano lps")); + } info!("New VM requested via CLI: {req:?}"); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); diff --git a/surql/tables.sql b/surql/tables.sql index 4ab0a99..65e9381 100644 --- a/surql/tables.sql +++ b/surql/tables.sql @@ -99,8 +99,8 @@ DEFINE FIELD mr_enclave ON TABLE new_app_req TYPE string; DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string; DEFINE FIELD ports ON TABLE new_app_req TYPE array; DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int; -DEFINE FIELD vcpu ON TABLE new_app_req TYPE int; -DEFINE FIELD disk_mb ON TABLE new_app_req TYPE int; +DEFINE FIELD vcpus ON TABLE new_app_req TYPE int; +DEFINE FIELD disk_size_gb ON TABLE new_app_req TYPE int; DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int; DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int; DEFINE FIELD error ON TABLE new_app_req TYPE string; diff --git a/tests/common/app_daemon_utils.rs b/tests/common/app_daemon_utils.rs new file mode 100644 index 0000000..1d4fa52 --- /dev/null +++ b/tests/common/app_daemon_utils.rs @@ -0,0 +1,144 @@ +use anyhow::Result; +use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient; +use detee_shared::app_proto::{self, NewAppRes, RegisterAppNodeReq}; +use detee_shared::common_proto::MappedPort; +use futures::StreamExt; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::transport::Channel; + +use super::test_utils::Key; + +pub async fn mock_app_daemon( + brain_channel: &Channel, + daemon_error: Option, +) -> Result { + let mut daemon_client = BrainAppDaemonClient::new(brain_channel.clone()); + let daemon_key = Key::new(); + + register_app_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?; + + let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); + + tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); + + let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); + tokio::spawn(daemon_msg_sender( + daemon_client.clone(), + daemon_key.clone(), + daemon_msg_tx.clone(), + rx, + )); + + tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error)); + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + Ok(daemon_key.pubkey) +} + +pub async fn register_app_node( + client: &mut BrainAppDaemonClient, + key: &Key, + operator_wallet: &str, +) -> Result> { + log::info!("Registering app_node: {}", key.pubkey); + let node_pubkey = key.pubkey.clone(); + + let req = RegisterAppNodeReq { + node_pubkey, + operator_wallet: operator_wallet.to_string(), + main_ip: String::from("185.243.218.213"), + city: String::from("Oslo"), + country: String::from("Norway"), + region: String::from("EU"), + price: 1200, + }; + + let mut grpc_stream = client.register_app_node(key.sign_request(req)?).await?.into_inner(); + + let mut deleted_app_reqs = Vec::new(); + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(del_app_req) => { + deleted_app_reqs.push(del_app_req); + } + Err(e) => { + panic!("Received error instead of deleted_app_reqs: {e:?}"); + } + } + } + Ok(deleted_app_reqs) +} + +pub async fn daemon_listener( + mut client: BrainAppDaemonClient, + key: Key, + tx: mpsc::Sender, +) -> Result<()> { + log::info!("listening app_daemon"); + let mut grpc_stream = + client.brain_messages(key.sign_stream_auth_app(vec![])?).await?.into_inner(); + + while let Some(Ok(stream_update)) = grpc_stream.next().await { + log::info!("app deamon got notified: {:?}", &stream_update); + let _ = tx.send(stream_update).await; + } + + Ok(()) +} + +pub async fn daemon_msg_sender( + mut client: BrainAppDaemonClient, + key: Key, + tx: mpsc::Sender, + rx: mpsc::Receiver, +) -> Result<()> { + log::info!("sender app_daemon"); + let rx_stream = ReceiverStream::new(rx); + tx.send(app_proto::DaemonMessageApp { + msg: Some(app_proto::daemon_message_app::Msg::Auth(key.sign_stream_auth_app(vec![])?)), + }) + .await?; + client.daemon_messages(rx_stream).await?; + Ok(()) +} + +pub async fn daemon_engine( + tx: mpsc::Sender, + mut rx: mpsc::Receiver, + new_app_err: Option, +) -> Result<()> { + log::info!("daemon engine app_daemon"); + while let Some(brain_msg) = rx.recv().await { + match brain_msg.msg { + Some(app_proto::brain_message_app::Msg::NewAppReq(new_app_req)) => { + let exposed_ports = + [vec![34500], new_app_req.resource.unwrap_or_default().ports].concat(); + + let mapped_ports = exposed_ports + .into_iter() + .map(|port| MappedPort { host_port: port, guest_port: port }) + .collect::>(); + + let res_data = NewAppRes { + uuid: new_app_req.uuid, + mapped_ports, + ip_address: "127.0.0.1".to_string(), + error: new_app_err.clone().unwrap_or_default(), + }; + + let res = app_proto::DaemonMessageApp { + msg: Some(app_proto::daemon_message_app::Msg::NewAppRes(res_data)), + }; + tx.send(res).await?; + } + Some(app_proto::brain_message_app::Msg::DeleteAppReq(_del_app_req)) => { + todo!() + } + None => todo!(), + } + } + + Ok(()) +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 09e1a27..e5947ff 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -6,3 +6,6 @@ pub mod test_utils; pub mod vm_cli_utils; #[allow(dead_code)] pub mod vm_daemon_utils; + +#[allow(dead_code)] +pub mod app_daemon_utils; diff --git a/tests/common/prepare_test_env.rs b/tests/common/prepare_test_env.rs index 42e6e0c..c2aba72 100644 --- a/tests/common/prepare_test_env.rs +++ b/tests/common/prepare_test_env.rs @@ -1,4 +1,6 @@ use anyhow::Result; +use detee_shared::app_proto::brain_app_cli_server::BrainAppCliServer; +use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemonServer; use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; @@ -7,6 +9,7 @@ use hyper_util::rt::TokioIo; use std::net::SocketAddr; use std::sync::Arc; use surreal_brain::constants::DB_SCHEMA_FILES; +use surreal_brain::grpc::app::{AppCliServer, AppDaemonServer}; use surreal_brain::grpc::general::GeneralCliServer; use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; use surrealdb::engine::remote::ws::Client; @@ -96,6 +99,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream { .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) + .add_service(BrainAppCliServer::new(AppCliServer::new(db_arc.clone()))) + .add_service(BrainAppDaemonServer::new(AppDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await?; diff --git a/tests/common/test_utils.rs b/tests/common/test_utils.rs index a99efb0..0c5b884 100644 --- a/tests/common/test_utils.rs +++ b/tests/common/test_utils.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use detee_shared::app_proto as sgx_proto; use detee_shared::vm_proto as snp_proto; use ed25519_dalek::{Signer, SigningKey}; use itertools::Itertools; @@ -63,11 +64,22 @@ impl Key { Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) } - pub fn sign_stream_auth(&self, contracts: Vec) -> Result { + pub fn sign_stream_auth_vm( + &self, + contracts: Vec, + ) -> Result { let pubkey = self.pubkey.clone(); let timestamp = chrono::Utc::now().to_rfc3339(); let signature = self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature }) } + + pub fn sign_stream_auth_app(&self, contracts: Vec) -> Result { + let pubkey = self.pubkey.clone(); + let timestamp = chrono::Utc::now().to_rfc3339(); + let signature = + self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; + Ok(sgx_proto::DaemonAuth { timestamp, pubkey, contracts, signature }) + } } diff --git a/tests/common/vm_cli_utils.rs b/tests/common/vm_cli_utils.rs index 38aa876..7206d31 100644 --- a/tests/common/vm_cli_utils.rs +++ b/tests/common/vm_cli_utils.rs @@ -35,7 +35,7 @@ pub async fn create_new_vm( node_pubkey: node_pubkey.to_string(), price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; diff --git a/tests/common/vm_daemon_utils.rs b/tests/common/vm_daemon_utils.rs index dc3c367..96abacc 100644 --- a/tests/common/vm_daemon_utils.rs +++ b/tests/common/vm_daemon_utils.rs @@ -59,8 +59,8 @@ pub async fn register_vm_node( let mut deleted_vm_reqs = Vec::new(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { - Ok(del_vm_rq) => { - deleted_vm_reqs.push(del_vm_rq); + Ok(del_vm_req) => { + deleted_vm_reqs.push(del_vm_req); } Err(e) => { panic!("Received error instead of deleted_vm_reqs: {e:?}"); @@ -76,7 +76,8 @@ pub async fn daemon_listener( tx: mpsc::Sender, ) -> Result<()> { log::info!("listening vm_daemon"); - let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner(); + let mut grpc_stream = + client.brain_messages(key.sign_stream_auth_vm(vec![])?).await?.into_inner(); while let Some(Ok(stream_update)) = grpc_stream.next().await { log::info!("vm deamon got notified: {:?}", &stream_update); @@ -95,7 +96,7 @@ pub async fn daemon_msg_sender( log::info!("sender vm_daemon"); let rx_stream = ReceiverStream::new(rx); tx.send(vm_proto::VmDaemonMessage { - msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)), + msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth_vm(vec![])?)), }) .await?; client.daemon_messages(rx_stream).await?; diff --git a/tests/grpc_app_cli_test.rs b/tests/grpc_app_cli_test.rs new file mode 100644 index 0000000..574f3b3 --- /dev/null +++ b/tests/grpc_app_cli_test.rs @@ -0,0 +1,91 @@ +use common::app_daemon_utils::mock_app_daemon; +use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; +use common::test_utils::Key; +use common::vm_cli_utils::airdrop; +use detee_shared::app_proto; +use detee_shared::app_proto::brain_app_cli_client::BrainAppCliClient; +use std::vec; +use surreal_brain::constants::{ACCOUNT, ACTIVE_APP, NEW_APP_REQ, TOKEN_DECIMAL}; +use surreal_brain::db::prelude as db; + +mod common; + +#[tokio::test] +async fn test_app_creation() { + /* + env_logger::builder() + .filter_level(log::LevelFilter::Trace) + .filter_module("tungstenite", log::LevelFilter::Debug) + .filter_module("tokio_tungstenite", log::LevelFilter::Debug) + .init(); + */ + let db = prepare_test_db().await.unwrap(); + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap(); + + let key = Key::new(); + + let mut new_app_req = app_proto::NewAppReq { + admin_pubkey: key.pubkey.clone(), + node_pubkey: daemon_key.clone(), + price_per_unit: 1200, + resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }), + locked_nano: 100, + ..Default::default() + }; + + let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); + let new_app_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await; + assert!(new_app_resp.is_err()); + let new_app_err = new_app_resp.err().unwrap(); + assert!(new_app_err.to_string().contains("Insufficient funds")); + + let airdrop_amount = 10; + airdrop(&brain_channel, &key.pubkey, airdrop_amount).await.unwrap(); + + let new_app_resp = client_app_cli + .new_app(key.sign_request(new_app_req.clone()).unwrap()) + .await + .unwrap() + .into_inner(); + let active_app = + db.select::>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap(); + assert!(active_app.is_some()); + + let daemon_key_02 = + mock_app_daemon(&brain_channel, Some("something went wrong 01".to_string())).await.unwrap(); + + new_app_req.node_pubkey = daemon_key_02.clone(); + + let new_app_resp = client_app_cli + .new_app(key.sign_request(new_app_req.clone()).unwrap()) + .await + .unwrap() + .into_inner(); + assert!(!new_app_resp.error.is_empty()); + + let app_req_db = + db.select::>((NEW_APP_REQ, new_app_resp.uuid)).await.unwrap(); + + assert!(!app_req_db.unwrap().error.is_empty()); + + let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100)); + assert_eq!(acc_db.tmp_locked, 0); + + let locking_nano = 1288; + new_app_req.node_pubkey = daemon_key; + new_app_req.locked_nano = locking_nano; + + let new_app_resp = + client_app_cli.new_app(key.sign_request(new_app_req).unwrap()).await.unwrap().into_inner(); + assert!(new_app_resp.error.is_empty()); + + let active_app = + db.select::>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap(); + assert!(active_app.is_some()); + + let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100)); + assert_eq!(acc_db.tmp_locked, 0); +} diff --git a/tests/grpc_vm_cli_test.rs b/tests/grpc_vm_cli_test.rs index 475e11b..5d68ce1 100644 --- a/tests/grpc_vm_cli_test.rs +++ b/tests/grpc_vm_cli_test.rs @@ -73,7 +73,7 @@ async fn test_vm_creation() { } let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); - assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL); + assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100)); assert_eq!(acc_db.tmp_locked, 0); new_vm_req.node_pubkey = daemon_key; @@ -87,7 +87,7 @@ async fn test_vm_creation() { assert!(active_vm.is_some()); let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); - assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - locking_nano); + assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100)); assert_eq!(acc_db.tmp_locked, 0); } @@ -109,7 +109,7 @@ async fn test_timeout_vm_creation() { node_pubkey: daemon_key.pubkey, price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; diff --git a/tests/grpc_vm_daemon_test.rs b/tests/grpc_vm_daemon_test.rs index f8ce79a..5a684e2 100644 --- a/tests/grpc_vm_daemon_test.rs +++ b/tests/grpc_vm_daemon_test.rs @@ -39,7 +39,7 @@ async fn test_brain_message() { node_pubkey: daemon_key, price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; airdrop(&brain_channel, &cli_key.pubkey, 10).await.unwrap(); -- 2.43.0 From cbd71b2308e04943d6409e9ffe984478f66318f0 Mon Sep 17 00:00:00 2001 From: Noor Date: Wed, 4 Jun 2025 21:04:32 +0530 Subject: [PATCH 3/3] Implement refund on delete app and vm fix authentication and refund on delete vm refactor app daemon register to get delete app req db function for delete app sample environment variable extensive tests for delete app and vm refactor test utilities --- .env => .sample.env | 0 Cargo.lock | 2 +- src/db/app.rs | 57 ++++++++++++++---- src/db/vm.rs | 43 +++++++++++--- src/grpc/app.rs | 20 ++++--- src/grpc/vm.rs | 12 +++- surql/functions.sql | 19 +++++- surql/timer.sql | 2 + tests/common/app_cli_utils.rs | 31 ++++++++++ tests/common/app_daemon_utils.rs | 6 +- tests/common/mod.rs | 3 + tests/common/test_utils.rs | 15 +++++ tests/common/vm_cli_utils.rs | 17 +----- tests/common/vm_daemon_utils.rs | 4 +- tests/grpc_app_cli_test.rs | 99 ++++++++++++++++++++++++++++++-- tests/grpc_general_test.rs | 6 +- tests/grpc_vm_cli_test.rs | 69 ++++++++++++++++++++-- tests/grpc_vm_daemon_test.rs | 3 +- 18 files changed, 344 insertions(+), 64 deletions(-) rename .env => .sample.env (100%) create mode 100644 tests/common/app_cli_utils.rs diff --git a/.env b/.sample.env similarity index 100% rename from .env rename to .sample.env diff --git a/Cargo.lock b/Cargo.lock index b8a20bf..5e912e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#694d5811aa0edc9b2b9bdb17c6e972b04a21b96f" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#005677153b3fcd3251b64111a736c806106fdc04" dependencies = [ "bincode 2.0.1", "prost", diff --git a/src/db/app.rs b/src/db/app.rs index fdaac6d..3753d58 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -295,8 +295,6 @@ impl From for DeletedApp { disk_size_gb: value.disk_size_gb, created_at: value.created_at, price_per_unit: value.price_per_unit, - locked_nano: value.locked_nano, - collected_at: value.collected_at, mr_enclave: value.mr_enclave, package_url: value.package_url, hratls_pubkey: value.hratls_pubkey, @@ -359,15 +357,42 @@ impl ActiveApp { Ok(()) } - pub async fn delete(db: &Surreal, id: &str) -> Result { - let deleted_app: Option = db.delete((ACTIVE_APP, id)).await?; - if let Some(deleted_app) = deleted_app { - let deleted_app: DeletedApp = deleted_app.into(); - let _: Vec = db.insert(DELETED_APP).relation(deleted_app).await?; - Ok(true) - } else { - Ok(false) + pub async fn delete(db: &Surreal, admin: &str, id: &str) -> Result<(), Error> { + let mut app_del_resp = db + .query( + " + BEGIN TRANSACTION; + + LET $active_app = $app_id_input; + LET $admin = $admin_input; + + IF $active_app.in != $admin { + THROW 'Unauthorized' + }; + + return fn::delete_app($active_app); + + COMMIT TRANSACTION; + ", + ) + .bind(("app_id_input", RecordId::from((ACTIVE_APP, id)))) + .bind(("admin_input", RecordId::from((ACCOUNT, admin)))) + .await?; + + log::trace!("delete_app query response: {app_del_resp:?}"); + + let query_err = app_del_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in delete_app: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str { + log::error!("Unauthorized: {}", query_err[&2]); + return Err(Error::AccessDenied); + } } + Ok(()) } } @@ -653,9 +678,17 @@ pub struct DeletedApp { pub disk_size_gb: u32, pub created_at: Datetime, pub price_per_unit: u64, - pub locked_nano: u64, - pub collected_at: Datetime, pub mr_enclave: String, pub package_url: String, pub hratls_pubkey: String, } + +impl DeletedApp { + pub async fn list_by_node(db: &Surreal, node_pubkey: &str) -> Result, Error> { + let mut result = db + .query(format!("select * from {DELETED_APP} where out = {APP_NODE}:{node_pubkey};")) + .await?; + let contracts: Vec = result.take(0)?; + Ok(contracts) + } +} diff --git a/src/db/vm.rs b/src/db/vm.rs index 28c1fb8..8dabcac 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -546,15 +546,42 @@ impl ActiveVm { Ok(false) } - pub async fn delete(db: &Surreal, id: &str) -> Result { - let deleted_vm: Option = db.delete((ACTIVE_VM, id)).await?; - if let Some(deleted_vm) = deleted_vm { - let deleted_vm: DeletedVm = deleted_vm.into(); - let _: Vec = db.insert(DELETED_VM).relation(deleted_vm).await?; - Ok(true) - } else { - Ok(false) + pub async fn delete(db: &Surreal, admin: &str, id: &str) -> Result<(), Error> { + let mut vm_del_resp = db + .query( + " + BEGIN TRANSACTION; + + LET $active_vm = $vm_id_input; + LET $admin = $admin_input; + + IF $active_vm.in != $admin { + THROW 'Unauthorized' + }; + + return fn::delete_vm($active_vm); + + COMMIT TRANSACTION; + ", + ) + .bind(("vm_id_input", RecordId::from((ACTIVE_VM, id)))) + .bind(("admin_input", RecordId::from((ACCOUNT, admin)))) + .await?; + + log::trace!("delete_vm query response: {vm_del_resp:?}"); + + let query_err = vm_del_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in delete_vm: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str { + log::error!("Unauthorized: {}", query_err[&2]); + return Err(Error::AccessDenied); + } } + Ok(()) } pub async fn extend_time( diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 6ab8565..0f5094c 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -29,7 +29,7 @@ impl AppDaemonServer { #[tonic::async_trait] impl BrainAppDaemon for AppDaemonServer { - type RegisterAppNodeStream = Pin> + Send>>; + type RegisterAppNodeStream = Pin> + Send>>; type BrainMessagesStream = Pin> + Send>>; async fn register_app_node( @@ -59,11 +59,11 @@ impl BrainAppDaemon for AppDaemonServer { app_node.register(&self.db).await?; info!("Sending existing contracts to {}", req.node_pubkey); - let contracts = db::ActiveAppWithNode::list_by_node(&self.db, &req.node_pubkey).await?; + let deleted_apps = db::DeletedApp::list_by_node(&self.db, &req.node_pubkey).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { - for contract in contracts { - let _ = tx.send(Ok(contract.into())).await; + for deleted_app in deleted_apps { + let _ = tx.send(Ok(deleted_app.into())).await; } }); @@ -226,9 +226,15 @@ impl BrainAppCli for AppCliServer { async fn delete_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("delete_app process starting for {:?}", req); - match ActiveApp::delete(&self.db, &req.uuid).await? { - true => Ok(Response::new(Empty {})), - false => Err(Status::not_found(format!("Could not find App contract {}", &req.uuid))), + match ActiveApp::delete(&self.db, &req.admin_pubkey, &req.uuid).await { + Ok(()) => Ok(Response::new(Empty {})), + Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")), + Err(e) => { + log::error!("Error deleting app contract {}: {e}", &req.uuid); + Err(Status::unknown( + "Unknown error. Please try again or contact the DeTEE devs team.", + )) + } } } diff --git a/src/grpc/vm.rs b/src/grpc/vm.rs index c3f5ea3..211f97d 100644 --- a/src/grpc/vm.rs +++ b/src/grpc/vm.rs @@ -342,9 +342,15 @@ impl BrainVmCli for VmCliServer { async fn delete_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; - match db::ActiveVm::delete(&self.db, &req.uuid).await? { - true => Ok(Response::new(Empty {})), - false => Err(Status::not_found(format!("Could not find VM contract {}", &req.uuid))), + match db::ActiveVm::delete(&self.db, &req.admin_pubkey, &req.uuid).await { + Ok(()) => Ok(Response::new(Empty {})), + Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")), + Err(e) => { + log::error!("Error deleting VM contract {}: {e}", &req.uuid); + Err(Status::unknown( + "Unknown error. Please try again or contact the DeTEE devs team.", + )) + } } } diff --git a/surql/functions.sql b/surql/functions.sql index 062b62d..a864e5a 100644 --- a/surql/functions.sql +++ b/surql/functions.sql @@ -22,7 +22,7 @@ DEFINE FUNCTION OVERWRITE fn::delete_vm( UPDATE $account SET balance += $vm.locked_nano; }; INSERT RELATION INTO deleted_vm ( $deleted_vm ); - DELETE $vm.id; + RETURN DELETE $vm.id RETURN BEFORE; }; DEFINE FUNCTION OVERWRITE fn::app_price_per_minute( @@ -34,4 +34,21 @@ DEFINE FUNCTION OVERWRITE fn::app_price_per_minute( ($app.memory_mb / 200) + ($app.disk_size_gb / 10)) * $app.price_per_unit; +}; + +DEFINE FUNCTION OVERWRITE fn::delete_app( + $app_id: record +) { + LET $app = (select * from $app_id)[0]; + LET $account = $app.in; + LET $deleted_app = $app.patch([{ + 'op': 'replace', + 'path': 'id', + 'value': type::record("deleted_app:" + record::id($app.id)) + }]); + IF $app.locked_nano >= 0 { + UPDATE $account SET balance += $app.locked_nano; + }; + INSERT RELATION INTO deleted_app ( $deleted_app ); + RETURN DELETE $app.id RETURN BEFORE; }; \ No newline at end of file diff --git a/surql/timer.sql b/surql/timer.sql index f310afb..8d5869d 100644 --- a/surql/timer.sql +++ b/surql/timer.sql @@ -27,3 +27,5 @@ FOR $contract IN (select * from active_vm fetch out) { fn::delete_vm($contract.id); }; }; + +-- TODO: implement for active_app \ No newline at end of file diff --git a/tests/common/app_cli_utils.rs b/tests/common/app_cli_utils.rs new file mode 100644 index 0000000..d7389de --- /dev/null +++ b/tests/common/app_cli_utils.rs @@ -0,0 +1,31 @@ +use anyhow::Result; +use detee_shared::app_proto::{ + brain_app_cli_client::BrainAppCliClient, AppResource, NewAppReq, NewAppRes, +}; +use tonic::transport::Channel; + +use crate::common::test_utils::Key; + +pub async fn create_new_app( + key: &Key, + node_pubkey: &str, + brain_channel: &Channel, +) -> Result { + let new_app_req = NewAppReq { + admin_pubkey: key.pubkey.clone(), + node_pubkey: node_pubkey.to_string(), + price_per_unit: 1200, + resource: Some(AppResource { ports: vec![8080, 8081], ..Default::default() }), + locked_nano: 100, + ..Default::default() + }; + + let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); + let new_app_resp = + client_app_cli.new_app(key.sign_request(new_app_req.clone())?).await?.into_inner(); + + assert!(new_app_resp.error.is_empty()); + assert!(new_app_resp.uuid.len() == 40); + + Ok(new_app_resp) +} diff --git a/tests/common/app_daemon_utils.rs b/tests/common/app_daemon_utils.rs index 1d4fa52..97e42a8 100644 --- a/tests/common/app_daemon_utils.rs +++ b/tests/common/app_daemon_utils.rs @@ -41,7 +41,7 @@ pub async fn register_app_node( client: &mut BrainAppDaemonClient, key: &Key, operator_wallet: &str, -) -> Result> { +) -> Result> { log::info!("Registering app_node: {}", key.pubkey); let node_pubkey = key.pubkey.clone(); @@ -133,8 +133,8 @@ pub async fn daemon_engine( }; tx.send(res).await?; } - Some(app_proto::brain_message_app::Msg::DeleteAppReq(_del_app_req)) => { - todo!() + Some(app_proto::brain_message_app::Msg::DeleteAppReq(del_app_req)) => { + println!("MOCK_APP_DAEMON:: delete app request for {}", del_app_req.uuid); } None => todo!(), } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index e5947ff..6588420 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -9,3 +9,6 @@ pub mod vm_daemon_utils; #[allow(dead_code)] pub mod app_daemon_utils; + +#[allow(dead_code)] +pub mod app_cli_utils; diff --git a/tests/common/test_utils.rs b/tests/common/test_utils.rs index 0c5b884..868b32f 100644 --- a/tests/common/test_utils.rs +++ b/tests/common/test_utils.rs @@ -1,10 +1,14 @@ use anyhow::Result; use detee_shared::app_proto as sgx_proto; +use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; +use detee_shared::general_proto::AirdropReq; use detee_shared::vm_proto as snp_proto; use ed25519_dalek::{Signer, SigningKey}; use itertools::Itertools; use std::sync::OnceLock; +use surreal_brain::constants::TOKEN_DECIMAL; use tonic::metadata::AsciiMetadataValue; +use tonic::transport::Channel; use tonic::Request; pub static ADMIN_KEYS: OnceLock> = OnceLock::new(); @@ -83,3 +87,14 @@ impl Key { Ok(sgx_proto::DaemonAuth { timestamp, pubkey, contracts, signature }) } } + +pub async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> { + let mut client = BrainGeneralCliClient::new(brain_channel.clone()); + let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount * TOKEN_DECIMAL }; + + let admin_key = admin_keys()[0].clone(); + + client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?; + + Ok(()) +} diff --git a/tests/common/vm_cli_utils.rs b/tests/common/vm_cli_utils.rs index 7206d31..52116c4 100644 --- a/tests/common/vm_cli_utils.rs +++ b/tests/common/vm_cli_utils.rs @@ -1,29 +1,18 @@ -use super::test_utils::{admin_keys, Key}; +use super::test_utils::Key; use anyhow::{anyhow, Result}; use detee_shared::app_proto; use detee_shared::common_proto::Empty; use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; -use detee_shared::general_proto::{Account, AirdropReq, RegOperatorReq, ReportNodeReq}; +use detee_shared::general_proto::{Account, RegOperatorReq, ReportNodeReq}; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use futures::StreamExt; -use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL}; +use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; use surreal_brain::db::prelude as db; use surrealdb::engine::remote::ws::Client; use surrealdb::Surreal; use tonic::transport::Channel; -pub async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> { - let mut client = BrainGeneralCliClient::new(brain_channel.clone()); - let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount * TOKEN_DECIMAL }; - - let admin_key = admin_keys()[0].clone(); - - client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?; - - Ok(()) -} - pub async fn create_new_vm( db: &Surreal, key: &Key, diff --git a/tests/common/vm_daemon_utils.rs b/tests/common/vm_daemon_utils.rs index 96abacc..8af4f00 100644 --- a/tests/common/vm_daemon_utils.rs +++ b/tests/common/vm_daemon_utils.rs @@ -136,8 +136,8 @@ pub async fn daemon_engine( Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { todo!() } - Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { - todo!() + Some(vm_proto::brain_vm_message::Msg::DeleteVm(del_vm_req)) => { + println!("MOCK_VM_DAEMON:: delete vm request for {}", del_vm_req.uuid); } None => todo!(), } diff --git a/tests/grpc_app_cli_test.rs b/tests/grpc_app_cli_test.rs index 574f3b3..1c21b28 100644 --- a/tests/grpc_app_cli_test.rs +++ b/tests/grpc_app_cli_test.rs @@ -1,13 +1,14 @@ use common::app_daemon_utils::mock_app_daemon; use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; -use common::test_utils::Key; -use common::vm_cli_utils::airdrop; -use detee_shared::app_proto; +use common::test_utils::{airdrop, Key}; use detee_shared::app_proto::brain_app_cli_client::BrainAppCliClient; +use detee_shared::app_proto::{self, DelAppReq}; use std::vec; -use surreal_brain::constants::{ACCOUNT, ACTIVE_APP, NEW_APP_REQ, TOKEN_DECIMAL}; +use surreal_brain::constants::{ACCOUNT, ACTIVE_APP, DELETED_APP, NEW_APP_REQ, TOKEN_DECIMAL}; use surreal_brain::db::prelude as db; +use crate::common::app_cli_utils::create_new_app; + mod common; #[tokio::test] @@ -89,3 +90,93 @@ async fn test_app_creation() { assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100)); assert_eq!(acc_db.tmp_locked, 0); } + +#[tokio::test] +async fn test_timeout_app_creation() { + let _ = prepare_test_db().await.unwrap(); + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = Key::new().pubkey.clone(); + + let key = Key::new(); + + let new_app_req = app_proto::NewAppReq { + admin_pubkey: key.pubkey.clone(), + node_pubkey: daemon_key.clone(), + price_per_unit: 1200, + resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }), + locked_nano: 100, + ..Default::default() + }; + + airdrop(&brain_channel, &key.pubkey, 10).await.unwrap(); + + let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); + let timeout_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await; + assert!(timeout_resp.is_err()); + let timeout_err = timeout_resp.err().unwrap(); + assert_eq!( + timeout_err.message(), + "Network timeout. Please try again later or contact the DeTEE devs team." + ); +} + +#[tokio::test] +async fn test_app_deletion() { + let db = prepare_test_db().await.unwrap(); + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap(); + + let key = Key::new(); + airdrop(&brain_channel, &key.pubkey, 10).await.unwrap(); + + let new_app_res = create_new_app(&key, &daemon_key, &brain_channel).await.unwrap(); + + let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); + + let del_app_req = DelAppReq { admin_pubkey: key.pubkey.clone(), uuid: new_app_res.uuid }; + let _ = client_app_cli.delete_app(key.sign_request(del_app_req).unwrap()).await.unwrap(); + + let key_02 = Key::new(); + + // delete random app + let mut del_app_req = DelAppReq { + admin_pubkey: key_02.pubkey.clone(), + uuid: "9ae3VH8nJg2i8pqTQ6mJtvYuS2kd9n1XLLco8GUPfT95".to_string(), + }; + + let del_err = client_app_cli + .delete_app(key_02.sign_request(del_app_req.clone()).unwrap()) + .await + .err() + .unwrap(); + assert_eq!(del_err.message(), "Unauthorized"); + + let new_app_res_02 = create_new_app(&key, &daemon_key, &brain_channel).await.unwrap(); + del_app_req.uuid = new_app_res_02.uuid; + + let del_err = client_app_cli + .delete_app(key_02.sign_request(del_app_req.clone()).unwrap()) + .await + .err() + .unwrap(); + assert_eq!(del_err.message(), "Unauthorized"); + + // test refund + let key_03 = Key::new(); + airdrop(&brain_channel, &key_03.pubkey, 10).await.unwrap(); + + let new_app_res_03 = create_new_app(&key_03, &daemon_key, &brain_channel).await.unwrap(); + + let del_app_req = + DelAppReq { admin_pubkey: key_03.pubkey.clone(), uuid: new_app_res_03.uuid.clone() }; + let _ = client_app_cli.delete_app(key_03.sign_request(del_app_req).unwrap()).await.unwrap(); + + let acc: db::Account = db.select((ACCOUNT, key_03.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc.balance, 10 * TOKEN_DECIMAL); + + let deleted_app = + db.select::>((DELETED_APP, new_app_res_03.uuid)).await.unwrap(); + assert!(deleted_app.is_some()); +} + +// TODO: test register app node, delete app contract while node offline, kick, etc.. diff --git a/tests/grpc_general_test.rs b/tests/grpc_general_test.rs index fc0c096..f115392 100644 --- a/tests/grpc_general_test.rs +++ b/tests/grpc_general_test.rs @@ -1,10 +1,10 @@ use common::prepare_test_env::{ prepare_test_db, run_service_for_stream, run_service_in_background, }; -use common::test_utils::{admin_keys, Key}; +use common::test_utils::{admin_keys, airdrop, Key}; use common::vm_cli_utils::{ - airdrop, create_new_vm, list_accounts, list_all_app_contracts, list_all_vm_contracts, - register_operator, report_node, + create_new_vm, list_accounts, list_all_app_contracts, list_all_vm_contracts, register_operator, + report_node, }; use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; use detee_shared::common_proto::{Empty, Pubkey}; diff --git a/tests/grpc_vm_cli_test.rs b/tests/grpc_vm_cli_test.rs index 5d68ce1..dd5ebf6 100644 --- a/tests/grpc_vm_cli_test.rs +++ b/tests/grpc_vm_cli_test.rs @@ -1,14 +1,14 @@ use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; -use common::test_utils::Key; -use common::vm_cli_utils::{airdrop, create_new_vm, user_list_vm_contracts}; +use common::test_utils::{airdrop, Key}; +use common::vm_cli_utils::{create_new_vm, user_list_vm_contracts}; use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; -use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; +use detee_shared::vm_proto::{self, DeleteVmReq}; use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq}; use futures::StreamExt; use std::vec; -use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL}; +use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, TOKEN_DECIMAL}; use surreal_brain::db::prelude as db; mod common; @@ -125,6 +125,65 @@ async fn test_timeout_vm_creation() { ) } +#[tokio::test] +async fn test_vm_deletion() { + let db = prepare_test_db().await.unwrap(); + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_vm_daemon(&brain_channel, None).await.unwrap(); + + let key = Key::new(); + airdrop(&brain_channel, &key.pubkey, 10).await.unwrap(); + + let new_vm_uuid = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap(); + + let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); + + let del_app_req = DeleteVmReq { admin_pubkey: key.pubkey.clone(), uuid: new_vm_uuid }; + let _ = client_vm_cli.delete_vm(key.sign_request(del_app_req).unwrap()).await.unwrap(); + + let key_02 = Key::new(); + + // delete random vm + let mut del_vm_req = DeleteVmReq { + admin_pubkey: key_02.pubkey.clone(), + uuid: "9ae3VH8nJg2i8pqTQ6mJtvYuS2kd9n1XLLco8GUPfT95".to_string(), + }; + + let del_err = client_vm_cli + .delete_vm(key_02.sign_request(del_vm_req.clone()).unwrap()) + .await + .err() + .unwrap(); + assert_eq!(del_err.message(), "Unauthorized"); + + let new_vm_uuid_02 = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap(); + del_vm_req.uuid = new_vm_uuid_02; + + let del_err = client_vm_cli + .delete_vm(key_02.sign_request(del_vm_req.clone()).unwrap()) + .await + .err() + .unwrap(); + assert_eq!(del_err.message(), "Unauthorized"); + + // test refund + let key_03 = Key::new(); + airdrop(&brain_channel, &key_03.pubkey, 10).await.unwrap(); + + let new_vm_uuid_03 = create_new_vm(&db, &key_03, &daemon_key, &brain_channel).await.unwrap(); + + let del_vm_req = + DeleteVmReq { admin_pubkey: key_03.pubkey.clone(), uuid: new_vm_uuid_03.clone() }; + let _ = client_vm_cli.delete_vm(key_03.sign_request(del_vm_req).unwrap()).await.unwrap(); + + let acc: db::Account = db.select((ACCOUNT, key_03.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc.balance, 10 * TOKEN_DECIMAL); + + let deleted_vm = + db.select::>((DELETED_VM, new_vm_uuid_03)).await.unwrap(); + assert!(deleted_vm.is_some()); +} + #[tokio::test] // TODO: create vm for this user before testing this async fn test_list_vm_contracts() { @@ -240,3 +299,5 @@ async fn test_extend_vm() { let acc: db::Account = db_conn.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); assert_eq!(acc.balance, expected_bal_02); } + +// TODO: test register vm node, delete vm contract while node offline, kick, etc.. diff --git a/tests/grpc_vm_daemon_test.rs b/tests/grpc_vm_daemon_test.rs index 5a684e2..2e47c42 100644 --- a/tests/grpc_vm_daemon_test.rs +++ b/tests/grpc_vm_daemon_test.rs @@ -1,8 +1,7 @@ use common::prepare_test_env::{ prepare_test_db, run_service_for_stream, run_service_in_background, }; -use common::test_utils::Key; -use common::vm_cli_utils::airdrop; +use common::test_utils::{airdrop, Key}; use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; -- 2.43.0