diff --git a/.gitignore b/.gitignore index ed3ad69..0a5c507 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target secrets tmp +.env diff --git a/.env b/.sample.env similarity index 100% rename from .env rename to .sample.env diff --git a/Cargo.lock b/Cargo.lock index 3826a4e..5e912e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d6ca058d2de78b5257517034bca2b2c7d5929db8" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#005677153b3fcd3251b64111a736c806106fdc04" dependencies = [ "bincode 2.0.1", "prost", diff --git a/Cargo.toml b/Cargo.toml index 6011662..9d96c45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ serde_yaml = "0.9.34" surrealdb = "2.2.2" tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } tonic = { version = "0.12", features = ["tls"] } -detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain" } +detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" } ed25519-dalek = "2.1.1" bs58 = "0.5.1" tokio-stream = "0.1.17" diff --git a/src/bin/migration0.rs b/src/bin/migration0.rs index e622597..fd3c50f 100644 --- a/src/bin/migration0.rs +++ b/src/bin/migration0.rs @@ -17,6 +17,7 @@ async fn main() -> Result<(), Box> { let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env"); let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap(); + env_logger::builder().filter_level(log::LevelFilter::Trace); db::migration0(&db, &old_brain_data).await?; diff --git a/src/constants.rs b/src/constants.rs index 49098a3..c8aa89e 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -48,3 +48,6 @@ pub const ID_ALPHABET: [char; 62] = [ pub const TOKEN_DECIMAL: u64 = 1_000_000_000; pub const MIN_ESCROW: u64 = 5000 * TOKEN_DECIMAL; + +pub const APP_DAEMON_TIMEOUT: u64 = 20; +pub const VM_DAEMON_TIMEOUT: u64 = 10; diff --git a/src/db/app.rs b/src/db/app.rs index 3abe616..3753d58 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,11 +1,13 @@ use std::time::Duration; use super::Error; -use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ}; +use crate::constants::{ + ACCOUNT, ACTIVE_APP, APP_DAEMON_TIMEOUT, APP_NODE, DELETED_APP, NEW_APP_REQ, +}; use crate::db; use crate::db::general::Report; use crate::old_brain; -use detee_shared::app_proto; +use detee_shared::app_proto::{self, NewAppRes}; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; use surrealdb::sql::Datetime; @@ -68,8 +70,8 @@ pub struct NewAppReq { pub hratls_pubkey: String, pub ports: Vec, pub memory_mb: u32, - pub vcpu: u32, - pub disk_mb: u32, + pub vcpus: u32, + pub disk_size_gb: u32, pub locked_nano: u64, pub price_per_unit: u64, pub error: String, @@ -81,26 +83,117 @@ impl NewAppReq { let new_app_req: Option = db.select((NEW_APP_REQ, id)).await?; Ok(new_app_req) } - pub async fn submit_error( - db: &Surreal, - id: &str, - error: String, - ) -> Result, Error> { - #[derive(Serialize)] - struct NewAppError { - error: String, + pub async fn submit_error(db: &Surreal, id: &str, error: String) -> Result<(), Error> { + let tx_query = String::from( + " + BEGIN TRANSACTION; + + LET $new_app_req = $new_app_req_input; + LET $error = $error_input; + LET $record = (select * from $new_app_req)[0]; + LET $admin = $record.in; + + if $record == None {{ + THROW 'app req not exist ' + $new_app_req + }}; + + UPDATE $new_app_req SET error = $error; + + UPDATE $admin SET tmp_locked -= $record.locked_nano; + UPDATE $admin SET balance += $record.locked_nano; + + COMMIT TRANSACTION;", + ); + + log::trace!("submit_new_app_err query: {tx_query}"); + + let mut query_resp = db + .query(tx_query) + .bind(("new_app_req_input", RecordId::from((NEW_APP_REQ, id)))) + .bind(("error_input", error)) + .await?; + + let query_err = query_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in submit_new_app_err: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str { + log::error!("app req not exist: {}", query_err[&4]); + return Err(Error::ContractNotFound); + } else { + log::error!("Unknown error in submit_new_app_err: {query_err:?}"); + return Err(Error::Unknown("submit_new_app_req".to_string())); + } } - let record: Option = - db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?; - - Ok(record) + Ok(()) } - pub async fn submit(self, db: &Surreal) -> Result, Error> { - // TODO: handle financial transaction - let new_app_req: Vec = db.insert(NEW_APP_REQ).relation(self).await?; - Ok(new_app_req) + pub async fn submit(self, db: &Surreal) -> Result<(), Error> { + let locked_nano = self.locked_nano; + let tx_query = format!( " + BEGIN TRANSACTION; + + LET $account = $account_input; + LET $new_app_req = $new_app_req_input; + LET $app_node = $app_node_input; + LET $package_url = $package_url_input; + LET $mr_enclave = $mr_enclave_input; + LET $hratls_pubkey = $hratls_pubkey_input; + LET $app_name = $app_name_input; + + UPDATE $account SET balance -= {locked_nano}; + IF $account.balance < 0 {{ + THROW 'Insufficient funds.' + }}; + UPDATE $account SET tmp_locked += {locked_nano}; + + + RELATE + $account + ->$new_app_req + ->$app_node + CONTENT {{ + created_at: time::now(), app_name: $app_name, package_url: $package_url, + mr_enclave: $mr_enclave, hratls_pubkey: $hratls_pubkey, ports: {:?}, memory_mb: {}, + vcpus: {}, disk_size_gb: {}, locked_nano: {locked_nano}, price_per_unit: {}, error: '', + }}; + + COMMIT TRANSACTION; + ", + self.ports, self.memory_mb, self.vcpus, self.disk_size_gb, self.price_per_unit); + + log::trace!("submit_new_app_req query: {tx_query}"); + + let mut query_resp = db + .query(tx_query) + .bind(("account_input", self.admin)) + .bind(("new_app_req_input", self.id)) + .bind(("app_node_input", self.app_node)) + .bind(("package_url_input", self.package_url)) + .bind(("mr_enclave_input", self.mr_enclave)) + .bind(("hratls_pubkey_input", self.hratls_pubkey)) + .bind(("app_name_input", self.app_name)) + .await?; + + let query_err = query_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in submit_new_app_req: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&8) && query_err[&8].to_string() != tx_fail_err_str { + log::error!("Transaction error: {}", query_err[&8]); + return Err(Error::InsufficientFunds); + } else { + log::error!("Unknown error in submit_new_app_req: {query_err:?}"); + return Err(Error::Unknown("submit_new_app_req".to_string())); + } + } + + Ok(()) } } @@ -202,8 +295,6 @@ impl From for DeletedApp { disk_size_gb: value.disk_size_gb, created_at: value.created_at, price_per_unit: value.price_per_unit, - locked_nano: value.locked_nano, - collected_at: value.collected_at, mr_enclave: value.mr_enclave, package_url: value.package_url, hratls_pubkey: value.hratls_pubkey, @@ -221,22 +312,31 @@ impl ActiveApp { (self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64) } - pub async fn activate(db: &Surreal, id: &str) -> Result<(), Error> { - let new_app_req = match NewAppReq::get(db, id).await? { + pub async fn activate( + db: &Surreal, + new_app_res: app_proto::NewAppRes, + ) -> Result<(), Error> { + let new_app_req = match NewAppReq::get(db, &new_app_res.uuid).await? { Some(r) => r, None => return Ok(()), }; + let mapped_ports = new_app_res + .mapped_ports + .into_iter() + .map(|data| (data.host_port, data.guest_port)) + .collect::>(); + let active_app = Self { - id: RecordId::from((ACTIVE_APP, id)), + id: RecordId::from((ACTIVE_APP, &new_app_res.uuid)), admin: new_app_req.admin, app_node: new_app_req.app_node, app_name: new_app_req.app_name, - mapped_ports: vec![], + mapped_ports, host_ipv4: String::new(), - vcpus: new_app_req.vcpu, + vcpus: new_app_req.vcpus, memory_mb: new_app_req.memory_mb, - disk_size_gb: new_app_req.disk_mb, + disk_size_gb: new_app_req.disk_size_gb, created_at: new_app_req.created_at.clone(), price_per_unit: new_app_req.price_per_unit, locked_nano: new_app_req.locked_nano, @@ -246,22 +346,63 @@ impl ActiveApp { hratls_pubkey: new_app_req.hratls_pubkey.clone(), }; + let admin_account = active_app.admin.key().to_string(); + let locked_nano = active_app.locked_nano; + let _: Vec = db.insert(()).relation(active_app).await?; + db.delete::>((NEW_APP_REQ, &new_app_res.uuid)).await?; + db.query(format!("UPDATE {ACCOUNT}:{admin_account} SET tmp_locked -= {locked_nano};")) + .await?; Ok(()) } - pub async fn delete(db: &Surreal, id: &str) -> Result { - let deleted_app: Option = db.delete((ACTIVE_APP, id)).await?; - if let Some(deleted_app) = deleted_app { - let deleted_app: DeletedApp = deleted_app.into(); - let _: Vec = db.insert(DELETED_APP).relation(deleted_app).await?; - Ok(true) - } else { - Ok(false) + pub async fn delete(db: &Surreal, admin: &str, id: &str) -> Result<(), Error> { + let mut app_del_resp = db + .query( + " + BEGIN TRANSACTION; + + LET $active_app = $app_id_input; + LET $admin = $admin_input; + + IF $active_app.in != $admin { + THROW 'Unauthorized' + }; + + return fn::delete_app($active_app); + + COMMIT TRANSACTION; + ", + ) + .bind(("app_id_input", RecordId::from((ACTIVE_APP, id)))) + .bind(("admin_input", RecordId::from((ACCOUNT, admin)))) + .await?; + + log::trace!("delete_app query response: {app_del_resp:?}"); + + let query_err = app_del_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in delete_app: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str { + log::error!("Unauthorized: {}", query_err[&2]); + return Err(Error::AccessDenied); + } } + Ok(()) } - pub async fn listen(db: &Surreal, app_id: &str) -> Result { +} + +pub enum WrappedAppResp { + NewAppRes(NewAppRes), + Error(NewAppRes), +} + +impl WrappedAppResp { + pub async fn listen(db: &Surreal, app_id: &str) -> Result { let mut query_response = db .query(format!( "live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};" @@ -272,14 +413,40 @@ impl ActiveApp { let mut error_stream = query_response.stream::>(0)?; let mut active_app_stream = query_response.stream::>(1)?; - tokio::time::timeout(Duration::from_secs(30), async { + let mut error = db + .query(format!("select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};")) + .await?; + if let Some(error_on_newapp_req) = error.take::>(0)? { + if !error_on_newapp_req.error.is_empty() { + let app_daemon_err = NewAppRes { + uuid: app_id.to_string(), + error: error_on_newapp_req.error, + ..Default::default() + }; + + return Ok(Self::Error(app_daemon_err)); + } + } + + if let Some(active_app) = db.select::>((ACTIVE_APP, app_id)).await? { + return Ok(Self::NewAppRes(active_app.into())); + } + log::trace!("listening for table: {NEW_APP_REQ}"); + + tokio::time::timeout(Duration::from_secs(APP_DAEMON_TIMEOUT), async { loop { tokio::select! { Some(err_notif) = error_stream.next() =>{ match err_notif{ Ok(err_notif) =>{ if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){ - return Err(Error::NewAppDaemonResp(err_notif.data.error)) + let app_daemon_err = NewAppRes { + uuid: app_id.to_string(), + error: err_notif.data.error, + ..Default::default() + }; + + return Ok(Self::Error(app_daemon_err)); } } Err(e) => return Err(e.into()) @@ -291,7 +458,7 @@ impl ActiveApp { Ok(active_app_notif) =>{ if active_app_notif.action == surrealdb::Action::Create { let _: Option = db.delete((NEW_APP_REQ, app_id)).await?; - return Ok(active_app_notif.data); + return Ok(Self::NewAppRes(active_app_notif.data.into())); } } Err(e) => return Err(e.into()) @@ -511,9 +678,17 @@ pub struct DeletedApp { pub disk_size_gb: u32, pub created_at: Datetime, pub price_per_unit: u64, - pub locked_nano: u64, - pub collected_at: Datetime, pub mr_enclave: String, pub package_url: String, pub hratls_pubkey: String, } + +impl DeletedApp { + pub async fn list_by_node(db: &Surreal, node_pubkey: &str) -> Result, Error> { + let mut result = db + .query(format!("select * from {DELETED_APP} where out = {APP_NODE}:{node_pubkey};")) + .await?; + let contracts: Vec = result.take(0)?; + Ok(contracts) + } +} diff --git a/src/db/mod.rs b/src/db/mod.rs index 8cd5ce3..6af367b 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -31,8 +31,6 @@ pub enum Error { UnknownTable(String), #[error("Daemon channel got closed: {0}")] AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), - #[error("AppDaemon Error {0}")] - NewAppDaemonResp(String), #[error("Minimum escrow amount is {MIN_ESCROW}")] MinimalEscrow, #[error("Insufficient funds, deposit more tokens")] @@ -186,7 +184,7 @@ pub async fn live_appnode_msgs< } Err(e) => { log::error!( - "live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" + "live_appnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" ); return Err(Error::from(e)); } @@ -196,7 +194,7 @@ pub async fn live_appnode_msgs< Ok(()) } -#[derive(Deserialize)] +#[derive(Deserialize, Debug, Clone)] pub struct ErrorFromTable { pub error: String, } diff --git a/src/db/vm.rs b/src/db/vm.rs index 9841aba..8dabcac 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -3,7 +3,8 @@ use std::time::Duration; use super::Error; use crate::constants::{ - ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT, + ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_DAEMON_TIMEOUT, VM_NODE, + VM_UPDATE_EVENT, }; use crate::db::{Account, ErrorFromTable, Report}; use crate::old_brain; @@ -329,18 +330,22 @@ impl WrappedMeasurement { UPDATE_VM_REQ => UPDATE_VM_REQ, _ => NEW_VM_REQ, }; - - let mut args_stream = db + let mut resp = db + .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) .query(format!( "live select * from measurement_args where id = measurement_args:{vm_id};" )) - .await? - .stream::>(0)?; + .await?; + let mut error_stream = resp.stream::>(0)?; + let mut args_stream = resp.stream::>(1)?; - let mut error_stream = db - .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) - .await? - .stream::>(0)?; + let mut error = + db.query(format!("select error from {table} where id = {NEW_VM_REQ}:{vm_id}")).await?; + if let Some(error_on_newvm_req) = error.take::>(0)? { + if !error_on_newvm_req.error.is_empty() { + return Ok(Self::Error(vm_id.to_string(), error_on_newvm_req.error)); + } + } let args: Option = db.delete(("measurement_args", vm_id)).await?; @@ -349,7 +354,7 @@ impl WrappedMeasurement { } log::trace!("listening for table: {table}"); - tokio::time::timeout(Duration::from_secs(10), async { + tokio::time::timeout(Duration::from_secs(VM_DAEMON_TIMEOUT), async { loop { tokio::select! { error_notification = error_stream.next() => { @@ -358,10 +363,7 @@ impl WrappedMeasurement { Ok(err_notif) => { if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty() { - return Ok::( - Self::Error(vm_id.to_string(), - err_notif.data.error) - ); + return Ok(Self::Error(vm_id.to_string(), err_notif.data.error)); }; }, Err(e) => return Err(e.into()), @@ -544,15 +546,42 @@ impl ActiveVm { Ok(false) } - pub async fn delete(db: &Surreal, id: &str) -> Result { - let deleted_vm: Option = db.delete((ACTIVE_VM, id)).await?; - if let Some(deleted_vm) = deleted_vm { - let deleted_vm: DeletedVm = deleted_vm.into(); - let _: Vec = db.insert(DELETED_VM).relation(deleted_vm).await?; - Ok(true) - } else { - Ok(false) + pub async fn delete(db: &Surreal, admin: &str, id: &str) -> Result<(), Error> { + let mut vm_del_resp = db + .query( + " + BEGIN TRANSACTION; + + LET $active_vm = $vm_id_input; + LET $admin = $admin_input; + + IF $active_vm.in != $admin { + THROW 'Unauthorized' + }; + + return fn::delete_vm($active_vm); + + COMMIT TRANSACTION; + ", + ) + .bind(("vm_id_input", RecordId::from((ACTIVE_VM, id)))) + .bind(("admin_input", RecordId::from((ACCOUNT, admin)))) + .await?; + + log::trace!("delete_vm query response: {vm_del_resp:?}"); + + let query_err = vm_del_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in delete_vm: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str { + log::error!("Unauthorized: {}", query_err[&2]); + return Err(Error::AccessDenied); + } } + Ok(()) } pub async fn extend_time( diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 78be5af..0f5094c 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -15,7 +15,7 @@ use surrealdb::Surreal; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; -use tonic::{Response, Status, Streaming}; +use tonic::{Request, Response, Status, Streaming}; pub struct AppDaemonServer { pub db: Arc>, @@ -29,13 +29,13 @@ impl AppDaemonServer { #[tonic::async_trait] impl BrainAppDaemon for AppDaemonServer { - type RegisterAppNodeStream = Pin> + Send>>; + type RegisterAppNodeStream = Pin> + Send>>; type BrainMessagesStream = Pin> + Send>>; async fn register_app_node( &self, - req: tonic::Request, - ) -> Result::RegisterAppNodeStream>, Status> { + req: Request, + ) -> Result::RegisterAppNodeStream>, Status> { let req = check_sig_from_req(req)?; info!("Starting app_node registration process for {:?}", req); @@ -59,11 +59,11 @@ impl BrainAppDaemon for AppDaemonServer { app_node.register(&self.db).await?; info!("Sending existing contracts to {}", req.node_pubkey); - let contracts = db::ActiveAppWithNode::list_by_node(&self.db, &req.node_pubkey).await?; + let deleted_apps = db::DeletedApp::list_by_node(&self.db, &req.node_pubkey).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { - for contract in contracts { - let _ = tx.send(Ok(contract.into())).await; + for deleted_app in deleted_apps { + let _ = tx.send(Ok(deleted_app.into())).await; } }); @@ -73,8 +73,8 @@ impl BrainAppDaemon for AppDaemonServer { async fn brain_messages( &self, - req: tonic::Request, - ) -> Result::BrainMessagesStream>, Status> { + req: Request, + ) -> Result::BrainMessagesStream>, Status> { let auth = req.into_inner(); let pubkey = auth.pubkey.clone(); check_sig_from_parts( @@ -110,8 +110,8 @@ impl BrainAppDaemon for AppDaemonServer { async fn daemon_messages( &self, - req: tonic::Request>, - ) -> Result, Status> { + req: Request>, + ) -> Result, Status> { let mut req_stream = req.into_inner(); let pubkey: String; if let Some(Ok(msg)) = req_stream.next().await { @@ -145,7 +145,7 @@ impl BrainAppDaemon for AppDaemonServer { ) .await?; } else { - db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?; + db::ActiveApp::activate(&self.db, new_app_resp).await?; } } Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => { @@ -180,33 +180,40 @@ impl BrainAppCli for AppCliServer { type ListAppContractsStream = Pin> + Send>>; type ListAppNodesStream = Pin> + Send>>; - async fn new_app( - &self, - req: tonic::Request, - ) -> Result, Status> { + async fn new_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; - info!("deploy_app process starting for {:?}", req); - + // TODO: make it atleast 1 hour + if req.locked_nano < 100 { + log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); + return Err(Status::unknown("lock atleaset 100 nano lps")); + } + info!("new_app process starting for {:?}", req); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); } - let new_app_req: db::NewAppReq = req.into(); - let id = new_app_req.id.to_string(); + + let db_req: db::NewAppReq = req.into(); + let id = db_req.id.key().to_string(); let (tx, rx) = tokio::sync::oneshot::channel(); let db = self.db.clone(); tokio::spawn(async move { - let _ = tx.send(db::ActiveApp::listen(&db, &id).await); + let _ = tx.send(db::WrappedAppResp::listen(&db, &id).await); }); - - new_app_req.submit(&self.db).await?; + db_req.submit(&self.db).await?; match rx.await { + Ok(Ok(db::WrappedAppResp::NewAppRes(new_app_resp))) => Ok(Response::new(new_app_resp)), + Ok(Ok(db::WrappedAppResp::Error(err))) => Ok(Response::new(err)), Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( "Network timeout. Please try again later or contact the DeTEE devs team.", )), - Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)), - Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())), + Ok(Err(e)) => { + log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); + Err(Status::unknown( + "Unknown error. Please try again or contact the DeTEE devs team.", + )) + } Err(e) => { log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); Err(Status::unknown( @@ -216,22 +223,25 @@ impl BrainAppCli for AppCliServer { } } - async fn delete_app( - &self, - req: tonic::Request, - ) -> Result, Status> { + async fn delete_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("delete_app process starting for {:?}", req); - match ActiveApp::delete(&self.db, &req.uuid).await? { - true => Ok(Response::new(Empty {})), - false => Err(Status::not_found(format!("Could not find App contract {}", &req.uuid))), + match ActiveApp::delete(&self.db, &req.admin_pubkey, &req.uuid).await { + Ok(()) => Ok(Response::new(Empty {})), + Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")), + Err(e) => { + log::error!("Error deleting app contract {}: {e}", &req.uuid); + Err(Status::unknown( + "Unknown error. Please try again or contact the DeTEE devs team.", + )) + } } } async fn list_app_contracts( &self, - req: tonic::Request, - ) -> Result::ListAppContractsStream>, Status> { + req: Request, + ) -> Result::ListAppContractsStream>, Status> { let req = check_sig_from_req(req)?; info!("list_app_contracts process starting for {:?}", req); @@ -270,8 +280,8 @@ impl BrainAppCli for AppCliServer { async fn list_app_nodes( &self, - req: tonic::Request, - ) -> Result::ListAppNodesStream>, Status> { + req: Request, + ) -> Result::ListAppNodesStream>, Status> { let req = check_sig_from_req(req)?; info!("list_app_nodes process starting for {:?}", req); let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?; @@ -287,8 +297,8 @@ impl BrainAppCli for AppCliServer { async fn get_one_app_node( &self, - req: tonic::Request, - ) -> Result, Status> { + req: Request, + ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("get_one_app_node process starting for {:?}", req); let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true) diff --git a/src/grpc/types.rs b/src/grpc/types.rs index 900df5a..bc2b098 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -274,8 +274,8 @@ impl From for AppContract { public_ipv4: value.host_ipv4, resource: Some(AppResource { memory_mb: value.memory_mb, - disk_mb: value.disk_size_gb, - vcpu: value.vcpus, + disk_size_gb: value.disk_size_gb, + vcpus: value.vcpus, ports: value.mapped_ports.iter().map(|(_, g)| *g).collect(), }), mapped_ports: value @@ -316,8 +316,8 @@ impl From for db::NewAppReq { hratls_pubkey: val.hratls_pubkey, ports: resource.ports, memory_mb: resource.memory_mb, - vcpu: resource.vcpu, - disk_mb: resource.disk_mb, + vcpus: resource.vcpus, + disk_size_gb: resource.disk_size_gb, locked_nano: val.locked_nano, price_per_unit: val.price_per_unit, error: String::new(), @@ -329,9 +329,9 @@ impl From for db::NewAppReq { impl From for NewAppReq { fn from(value: db::NewAppReq) -> Self { let resource = AppResource { - vcpu: value.vcpu, + vcpus: value.vcpus, memory_mb: value.memory_mb, - disk_mb: value.disk_mb, + disk_size_gb: value.disk_size_gb, ports: value.ports, }; let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default()); diff --git a/src/grpc/vm.rs b/src/grpc/vm.rs index 9b5e6aa..211f97d 100644 --- a/src/grpc/vm.rs +++ b/src/grpc/vm.rs @@ -222,6 +222,11 @@ impl BrainVmCli for VmCliServer { async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; + // TODO: make it atleast 1 hour + if req.locked_nano < 100 { + log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); + return Err(Status::unknown("lock atleaset 100 nano lps")); + } info!("New VM requested via CLI: {req:?}"); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); @@ -337,9 +342,15 @@ impl BrainVmCli for VmCliServer { async fn delete_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; - match db::ActiveVm::delete(&self.db, &req.uuid).await? { - true => Ok(Response::new(Empty {})), - false => Err(Status::not_found(format!("Could not find VM contract {}", &req.uuid))), + match db::ActiveVm::delete(&self.db, &req.admin_pubkey, &req.uuid).await { + Ok(()) => Ok(Response::new(Empty {})), + Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")), + Err(e) => { + log::error!("Error deleting VM contract {}: {e}", &req.uuid); + Err(Status::unknown( + "Unknown error. Please try again or contact the DeTEE devs team.", + )) + } } } diff --git a/surql/functions.sql b/surql/functions.sql index 062b62d..a864e5a 100644 --- a/surql/functions.sql +++ b/surql/functions.sql @@ -22,7 +22,7 @@ DEFINE FUNCTION OVERWRITE fn::delete_vm( UPDATE $account SET balance += $vm.locked_nano; }; INSERT RELATION INTO deleted_vm ( $deleted_vm ); - DELETE $vm.id; + RETURN DELETE $vm.id RETURN BEFORE; }; DEFINE FUNCTION OVERWRITE fn::app_price_per_minute( @@ -34,4 +34,21 @@ DEFINE FUNCTION OVERWRITE fn::app_price_per_minute( ($app.memory_mb / 200) + ($app.disk_size_gb / 10)) * $app.price_per_unit; +}; + +DEFINE FUNCTION OVERWRITE fn::delete_app( + $app_id: record +) { + LET $app = (select * from $app_id)[0]; + LET $account = $app.in; + LET $deleted_app = $app.patch([{ + 'op': 'replace', + 'path': 'id', + 'value': type::record("deleted_app:" + record::id($app.id)) + }]); + IF $app.locked_nano >= 0 { + UPDATE $account SET balance += $app.locked_nano; + }; + INSERT RELATION INTO deleted_app ( $deleted_app ); + RETURN DELETE $app.id RETURN BEFORE; }; \ No newline at end of file diff --git a/surql/tables.sql b/surql/tables.sql index 4ab0a99..65e9381 100644 --- a/surql/tables.sql +++ b/surql/tables.sql @@ -99,8 +99,8 @@ DEFINE FIELD mr_enclave ON TABLE new_app_req TYPE string; DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string; DEFINE FIELD ports ON TABLE new_app_req TYPE array; DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int; -DEFINE FIELD vcpu ON TABLE new_app_req TYPE int; -DEFINE FIELD disk_mb ON TABLE new_app_req TYPE int; +DEFINE FIELD vcpus ON TABLE new_app_req TYPE int; +DEFINE FIELD disk_size_gb ON TABLE new_app_req TYPE int; DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int; DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int; DEFINE FIELD error ON TABLE new_app_req TYPE string; diff --git a/surql/timer.sql b/surql/timer.sql index f310afb..8d5869d 100644 --- a/surql/timer.sql +++ b/surql/timer.sql @@ -27,3 +27,5 @@ FOR $contract IN (select * from active_vm fetch out) { fn::delete_vm($contract.id); }; }; + +-- TODO: implement for active_app \ No newline at end of file diff --git a/tests/common/app_cli_utils.rs b/tests/common/app_cli_utils.rs new file mode 100644 index 0000000..d7389de --- /dev/null +++ b/tests/common/app_cli_utils.rs @@ -0,0 +1,31 @@ +use anyhow::Result; +use detee_shared::app_proto::{ + brain_app_cli_client::BrainAppCliClient, AppResource, NewAppReq, NewAppRes, +}; +use tonic::transport::Channel; + +use crate::common::test_utils::Key; + +pub async fn create_new_app( + key: &Key, + node_pubkey: &str, + brain_channel: &Channel, +) -> Result { + let new_app_req = NewAppReq { + admin_pubkey: key.pubkey.clone(), + node_pubkey: node_pubkey.to_string(), + price_per_unit: 1200, + resource: Some(AppResource { ports: vec![8080, 8081], ..Default::default() }), + locked_nano: 100, + ..Default::default() + }; + + let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); + let new_app_resp = + client_app_cli.new_app(key.sign_request(new_app_req.clone())?).await?.into_inner(); + + assert!(new_app_resp.error.is_empty()); + assert!(new_app_resp.uuid.len() == 40); + + Ok(new_app_resp) +} diff --git a/tests/common/app_daemon_utils.rs b/tests/common/app_daemon_utils.rs new file mode 100644 index 0000000..97e42a8 --- /dev/null +++ b/tests/common/app_daemon_utils.rs @@ -0,0 +1,144 @@ +use anyhow::Result; +use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient; +use detee_shared::app_proto::{self, NewAppRes, RegisterAppNodeReq}; +use detee_shared::common_proto::MappedPort; +use futures::StreamExt; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::transport::Channel; + +use super::test_utils::Key; + +pub async fn mock_app_daemon( + brain_channel: &Channel, + daemon_error: Option, +) -> Result { + let mut daemon_client = BrainAppDaemonClient::new(brain_channel.clone()); + let daemon_key = Key::new(); + + register_app_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?; + + let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); + + tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); + + let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); + tokio::spawn(daemon_msg_sender( + daemon_client.clone(), + daemon_key.clone(), + daemon_msg_tx.clone(), + rx, + )); + + tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error)); + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + Ok(daemon_key.pubkey) +} + +pub async fn register_app_node( + client: &mut BrainAppDaemonClient, + key: &Key, + operator_wallet: &str, +) -> Result> { + log::info!("Registering app_node: {}", key.pubkey); + let node_pubkey = key.pubkey.clone(); + + let req = RegisterAppNodeReq { + node_pubkey, + operator_wallet: operator_wallet.to_string(), + main_ip: String::from("185.243.218.213"), + city: String::from("Oslo"), + country: String::from("Norway"), + region: String::from("EU"), + price: 1200, + }; + + let mut grpc_stream = client.register_app_node(key.sign_request(req)?).await?.into_inner(); + + let mut deleted_app_reqs = Vec::new(); + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(del_app_req) => { + deleted_app_reqs.push(del_app_req); + } + Err(e) => { + panic!("Received error instead of deleted_app_reqs: {e:?}"); + } + } + } + Ok(deleted_app_reqs) +} + +pub async fn daemon_listener( + mut client: BrainAppDaemonClient, + key: Key, + tx: mpsc::Sender, +) -> Result<()> { + log::info!("listening app_daemon"); + let mut grpc_stream = + client.brain_messages(key.sign_stream_auth_app(vec![])?).await?.into_inner(); + + while let Some(Ok(stream_update)) = grpc_stream.next().await { + log::info!("app deamon got notified: {:?}", &stream_update); + let _ = tx.send(stream_update).await; + } + + Ok(()) +} + +pub async fn daemon_msg_sender( + mut client: BrainAppDaemonClient, + key: Key, + tx: mpsc::Sender, + rx: mpsc::Receiver, +) -> Result<()> { + log::info!("sender app_daemon"); + let rx_stream = ReceiverStream::new(rx); + tx.send(app_proto::DaemonMessageApp { + msg: Some(app_proto::daemon_message_app::Msg::Auth(key.sign_stream_auth_app(vec![])?)), + }) + .await?; + client.daemon_messages(rx_stream).await?; + Ok(()) +} + +pub async fn daemon_engine( + tx: mpsc::Sender, + mut rx: mpsc::Receiver, + new_app_err: Option, +) -> Result<()> { + log::info!("daemon engine app_daemon"); + while let Some(brain_msg) = rx.recv().await { + match brain_msg.msg { + Some(app_proto::brain_message_app::Msg::NewAppReq(new_app_req)) => { + let exposed_ports = + [vec![34500], new_app_req.resource.unwrap_or_default().ports].concat(); + + let mapped_ports = exposed_ports + .into_iter() + .map(|port| MappedPort { host_port: port, guest_port: port }) + .collect::>(); + + let res_data = NewAppRes { + uuid: new_app_req.uuid, + mapped_ports, + ip_address: "127.0.0.1".to_string(), + error: new_app_err.clone().unwrap_or_default(), + }; + + let res = app_proto::DaemonMessageApp { + msg: Some(app_proto::daemon_message_app::Msg::NewAppRes(res_data)), + }; + tx.send(res).await?; + } + Some(app_proto::brain_message_app::Msg::DeleteAppReq(del_app_req)) => { + println!("MOCK_APP_DAEMON:: delete app request for {}", del_app_req.uuid); + } + None => todo!(), + } + } + + Ok(()) +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 09e1a27..6588420 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -6,3 +6,9 @@ pub mod test_utils; pub mod vm_cli_utils; #[allow(dead_code)] pub mod vm_daemon_utils; + +#[allow(dead_code)] +pub mod app_daemon_utils; + +#[allow(dead_code)] +pub mod app_cli_utils; diff --git a/tests/common/prepare_test_env.rs b/tests/common/prepare_test_env.rs index 42e6e0c..c2aba72 100644 --- a/tests/common/prepare_test_env.rs +++ b/tests/common/prepare_test_env.rs @@ -1,4 +1,6 @@ use anyhow::Result; +use detee_shared::app_proto::brain_app_cli_server::BrainAppCliServer; +use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemonServer; use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; @@ -7,6 +9,7 @@ use hyper_util::rt::TokioIo; use std::net::SocketAddr; use std::sync::Arc; use surreal_brain::constants::DB_SCHEMA_FILES; +use surreal_brain::grpc::app::{AppCliServer, AppDaemonServer}; use surreal_brain::grpc::general::GeneralCliServer; use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; use surrealdb::engine::remote::ws::Client; @@ -96,6 +99,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream { .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) + .add_service(BrainAppCliServer::new(AppCliServer::new(db_arc.clone()))) + .add_service(BrainAppDaemonServer::new(AppDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await?; diff --git a/tests/common/test_utils.rs b/tests/common/test_utils.rs index a99efb0..868b32f 100644 --- a/tests/common/test_utils.rs +++ b/tests/common/test_utils.rs @@ -1,9 +1,14 @@ use anyhow::Result; +use detee_shared::app_proto as sgx_proto; +use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; +use detee_shared::general_proto::AirdropReq; use detee_shared::vm_proto as snp_proto; use ed25519_dalek::{Signer, SigningKey}; use itertools::Itertools; use std::sync::OnceLock; +use surreal_brain::constants::TOKEN_DECIMAL; use tonic::metadata::AsciiMetadataValue; +use tonic::transport::Channel; use tonic::Request; pub static ADMIN_KEYS: OnceLock> = OnceLock::new(); @@ -63,11 +68,33 @@ impl Key { Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) } - pub fn sign_stream_auth(&self, contracts: Vec) -> Result { + pub fn sign_stream_auth_vm( + &self, + contracts: Vec, + ) -> Result { let pubkey = self.pubkey.clone(); let timestamp = chrono::Utc::now().to_rfc3339(); let signature = self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature }) } + + pub fn sign_stream_auth_app(&self, contracts: Vec) -> Result { + let pubkey = self.pubkey.clone(); + let timestamp = chrono::Utc::now().to_rfc3339(); + let signature = + self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; + Ok(sgx_proto::DaemonAuth { timestamp, pubkey, contracts, signature }) + } +} + +pub async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> { + let mut client = BrainGeneralCliClient::new(brain_channel.clone()); + let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount * TOKEN_DECIMAL }; + + let admin_key = admin_keys()[0].clone(); + + client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?; + + Ok(()) } diff --git a/tests/common/vm_cli_utils.rs b/tests/common/vm_cli_utils.rs index 38aa876..52116c4 100644 --- a/tests/common/vm_cli_utils.rs +++ b/tests/common/vm_cli_utils.rs @@ -1,29 +1,18 @@ -use super::test_utils::{admin_keys, Key}; +use super::test_utils::Key; use anyhow::{anyhow, Result}; use detee_shared::app_proto; use detee_shared::common_proto::Empty; use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; -use detee_shared::general_proto::{Account, AirdropReq, RegOperatorReq, ReportNodeReq}; +use detee_shared::general_proto::{Account, RegOperatorReq, ReportNodeReq}; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use futures::StreamExt; -use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL}; +use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; use surreal_brain::db::prelude as db; use surrealdb::engine::remote::ws::Client; use surrealdb::Surreal; use tonic::transport::Channel; -pub async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> { - let mut client = BrainGeneralCliClient::new(brain_channel.clone()); - let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount * TOKEN_DECIMAL }; - - let admin_key = admin_keys()[0].clone(); - - client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?; - - Ok(()) -} - pub async fn create_new_vm( db: &Surreal, key: &Key, @@ -35,7 +24,7 @@ pub async fn create_new_vm( node_pubkey: node_pubkey.to_string(), price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; diff --git a/tests/common/vm_daemon_utils.rs b/tests/common/vm_daemon_utils.rs index dc3c367..8af4f00 100644 --- a/tests/common/vm_daemon_utils.rs +++ b/tests/common/vm_daemon_utils.rs @@ -59,8 +59,8 @@ pub async fn register_vm_node( let mut deleted_vm_reqs = Vec::new(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { - Ok(del_vm_rq) => { - deleted_vm_reqs.push(del_vm_rq); + Ok(del_vm_req) => { + deleted_vm_reqs.push(del_vm_req); } Err(e) => { panic!("Received error instead of deleted_vm_reqs: {e:?}"); @@ -76,7 +76,8 @@ pub async fn daemon_listener( tx: mpsc::Sender, ) -> Result<()> { log::info!("listening vm_daemon"); - let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner(); + let mut grpc_stream = + client.brain_messages(key.sign_stream_auth_vm(vec![])?).await?.into_inner(); while let Some(Ok(stream_update)) = grpc_stream.next().await { log::info!("vm deamon got notified: {:?}", &stream_update); @@ -95,7 +96,7 @@ pub async fn daemon_msg_sender( log::info!("sender vm_daemon"); let rx_stream = ReceiverStream::new(rx); tx.send(vm_proto::VmDaemonMessage { - msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)), + msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth_vm(vec![])?)), }) .await?; client.daemon_messages(rx_stream).await?; @@ -135,8 +136,8 @@ pub async fn daemon_engine( Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { todo!() } - Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { - todo!() + Some(vm_proto::brain_vm_message::Msg::DeleteVm(del_vm_req)) => { + println!("MOCK_VM_DAEMON:: delete vm request for {}", del_vm_req.uuid); } None => todo!(), } diff --git a/tests/grpc_app_cli_test.rs b/tests/grpc_app_cli_test.rs new file mode 100644 index 0000000..1c21b28 --- /dev/null +++ b/tests/grpc_app_cli_test.rs @@ -0,0 +1,182 @@ +use common::app_daemon_utils::mock_app_daemon; +use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; +use common::test_utils::{airdrop, Key}; +use detee_shared::app_proto::brain_app_cli_client::BrainAppCliClient; +use detee_shared::app_proto::{self, DelAppReq}; +use std::vec; +use surreal_brain::constants::{ACCOUNT, ACTIVE_APP, DELETED_APP, NEW_APP_REQ, TOKEN_DECIMAL}; +use surreal_brain::db::prelude as db; + +use crate::common::app_cli_utils::create_new_app; + +mod common; + +#[tokio::test] +async fn test_app_creation() { + /* + env_logger::builder() + .filter_level(log::LevelFilter::Trace) + .filter_module("tungstenite", log::LevelFilter::Debug) + .filter_module("tokio_tungstenite", log::LevelFilter::Debug) + .init(); + */ + let db = prepare_test_db().await.unwrap(); + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap(); + + let key = Key::new(); + + let mut new_app_req = app_proto::NewAppReq { + admin_pubkey: key.pubkey.clone(), + node_pubkey: daemon_key.clone(), + price_per_unit: 1200, + resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }), + locked_nano: 100, + ..Default::default() + }; + + let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); + let new_app_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await; + assert!(new_app_resp.is_err()); + let new_app_err = new_app_resp.err().unwrap(); + assert!(new_app_err.to_string().contains("Insufficient funds")); + + let airdrop_amount = 10; + airdrop(&brain_channel, &key.pubkey, airdrop_amount).await.unwrap(); + + let new_app_resp = client_app_cli + .new_app(key.sign_request(new_app_req.clone()).unwrap()) + .await + .unwrap() + .into_inner(); + let active_app = + db.select::>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap(); + assert!(active_app.is_some()); + + let daemon_key_02 = + mock_app_daemon(&brain_channel, Some("something went wrong 01".to_string())).await.unwrap(); + + new_app_req.node_pubkey = daemon_key_02.clone(); + + let new_app_resp = client_app_cli + .new_app(key.sign_request(new_app_req.clone()).unwrap()) + .await + .unwrap() + .into_inner(); + assert!(!new_app_resp.error.is_empty()); + + let app_req_db = + db.select::>((NEW_APP_REQ, new_app_resp.uuid)).await.unwrap(); + + assert!(!app_req_db.unwrap().error.is_empty()); + + let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100)); + assert_eq!(acc_db.tmp_locked, 0); + + let locking_nano = 1288; + new_app_req.node_pubkey = daemon_key; + new_app_req.locked_nano = locking_nano; + + let new_app_resp = + client_app_cli.new_app(key.sign_request(new_app_req).unwrap()).await.unwrap().into_inner(); + assert!(new_app_resp.error.is_empty()); + + let active_app = + db.select::>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap(); + assert!(active_app.is_some()); + + let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100)); + assert_eq!(acc_db.tmp_locked, 0); +} + +#[tokio::test] +async fn test_timeout_app_creation() { + let _ = prepare_test_db().await.unwrap(); + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = Key::new().pubkey.clone(); + + let key = Key::new(); + + let new_app_req = app_proto::NewAppReq { + admin_pubkey: key.pubkey.clone(), + node_pubkey: daemon_key.clone(), + price_per_unit: 1200, + resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }), + locked_nano: 100, + ..Default::default() + }; + + airdrop(&brain_channel, &key.pubkey, 10).await.unwrap(); + + let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); + let timeout_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await; + assert!(timeout_resp.is_err()); + let timeout_err = timeout_resp.err().unwrap(); + assert_eq!( + timeout_err.message(), + "Network timeout. Please try again later or contact the DeTEE devs team." + ); +} + +#[tokio::test] +async fn test_app_deletion() { + let db = prepare_test_db().await.unwrap(); + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap(); + + let key = Key::new(); + airdrop(&brain_channel, &key.pubkey, 10).await.unwrap(); + + let new_app_res = create_new_app(&key, &daemon_key, &brain_channel).await.unwrap(); + + let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); + + let del_app_req = DelAppReq { admin_pubkey: key.pubkey.clone(), uuid: new_app_res.uuid }; + let _ = client_app_cli.delete_app(key.sign_request(del_app_req).unwrap()).await.unwrap(); + + let key_02 = Key::new(); + + // delete random app + let mut del_app_req = DelAppReq { + admin_pubkey: key_02.pubkey.clone(), + uuid: "9ae3VH8nJg2i8pqTQ6mJtvYuS2kd9n1XLLco8GUPfT95".to_string(), + }; + + let del_err = client_app_cli + .delete_app(key_02.sign_request(del_app_req.clone()).unwrap()) + .await + .err() + .unwrap(); + assert_eq!(del_err.message(), "Unauthorized"); + + let new_app_res_02 = create_new_app(&key, &daemon_key, &brain_channel).await.unwrap(); + del_app_req.uuid = new_app_res_02.uuid; + + let del_err = client_app_cli + .delete_app(key_02.sign_request(del_app_req.clone()).unwrap()) + .await + .err() + .unwrap(); + assert_eq!(del_err.message(), "Unauthorized"); + + // test refund + let key_03 = Key::new(); + airdrop(&brain_channel, &key_03.pubkey, 10).await.unwrap(); + + let new_app_res_03 = create_new_app(&key_03, &daemon_key, &brain_channel).await.unwrap(); + + let del_app_req = + DelAppReq { admin_pubkey: key_03.pubkey.clone(), uuid: new_app_res_03.uuid.clone() }; + let _ = client_app_cli.delete_app(key_03.sign_request(del_app_req).unwrap()).await.unwrap(); + + let acc: db::Account = db.select((ACCOUNT, key_03.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc.balance, 10 * TOKEN_DECIMAL); + + let deleted_app = + db.select::>((DELETED_APP, new_app_res_03.uuid)).await.unwrap(); + assert!(deleted_app.is_some()); +} + +// TODO: test register app node, delete app contract while node offline, kick, etc.. diff --git a/tests/grpc_general_test.rs b/tests/grpc_general_test.rs index fc0c096..f115392 100644 --- a/tests/grpc_general_test.rs +++ b/tests/grpc_general_test.rs @@ -1,10 +1,10 @@ use common::prepare_test_env::{ prepare_test_db, run_service_for_stream, run_service_in_background, }; -use common::test_utils::{admin_keys, Key}; +use common::test_utils::{admin_keys, airdrop, Key}; use common::vm_cli_utils::{ - airdrop, create_new_vm, list_accounts, list_all_app_contracts, list_all_vm_contracts, - register_operator, report_node, + create_new_vm, list_accounts, list_all_app_contracts, list_all_vm_contracts, register_operator, + report_node, }; use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; use detee_shared::common_proto::{Empty, Pubkey}; diff --git a/tests/grpc_vm_cli_test.rs b/tests/grpc_vm_cli_test.rs index 475e11b..dd5ebf6 100644 --- a/tests/grpc_vm_cli_test.rs +++ b/tests/grpc_vm_cli_test.rs @@ -1,14 +1,14 @@ use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; -use common::test_utils::Key; -use common::vm_cli_utils::{airdrop, create_new_vm, user_list_vm_contracts}; +use common::test_utils::{airdrop, Key}; +use common::vm_cli_utils::{create_new_vm, user_list_vm_contracts}; use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; -use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; +use detee_shared::vm_proto::{self, DeleteVmReq}; use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq}; use futures::StreamExt; use std::vec; -use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL}; +use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, TOKEN_DECIMAL}; use surreal_brain::db::prelude as db; mod common; @@ -73,7 +73,7 @@ async fn test_vm_creation() { } let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); - assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL); + assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100)); assert_eq!(acc_db.tmp_locked, 0); new_vm_req.node_pubkey = daemon_key; @@ -87,7 +87,7 @@ async fn test_vm_creation() { assert!(active_vm.is_some()); let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); - assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - locking_nano); + assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100)); assert_eq!(acc_db.tmp_locked, 0); } @@ -109,7 +109,7 @@ async fn test_timeout_vm_creation() { node_pubkey: daemon_key.pubkey, price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; @@ -125,6 +125,65 @@ async fn test_timeout_vm_creation() { ) } +#[tokio::test] +async fn test_vm_deletion() { + let db = prepare_test_db().await.unwrap(); + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_vm_daemon(&brain_channel, None).await.unwrap(); + + let key = Key::new(); + airdrop(&brain_channel, &key.pubkey, 10).await.unwrap(); + + let new_vm_uuid = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap(); + + let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); + + let del_app_req = DeleteVmReq { admin_pubkey: key.pubkey.clone(), uuid: new_vm_uuid }; + let _ = client_vm_cli.delete_vm(key.sign_request(del_app_req).unwrap()).await.unwrap(); + + let key_02 = Key::new(); + + // delete random vm + let mut del_vm_req = DeleteVmReq { + admin_pubkey: key_02.pubkey.clone(), + uuid: "9ae3VH8nJg2i8pqTQ6mJtvYuS2kd9n1XLLco8GUPfT95".to_string(), + }; + + let del_err = client_vm_cli + .delete_vm(key_02.sign_request(del_vm_req.clone()).unwrap()) + .await + .err() + .unwrap(); + assert_eq!(del_err.message(), "Unauthorized"); + + let new_vm_uuid_02 = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap(); + del_vm_req.uuid = new_vm_uuid_02; + + let del_err = client_vm_cli + .delete_vm(key_02.sign_request(del_vm_req.clone()).unwrap()) + .await + .err() + .unwrap(); + assert_eq!(del_err.message(), "Unauthorized"); + + // test refund + let key_03 = Key::new(); + airdrop(&brain_channel, &key_03.pubkey, 10).await.unwrap(); + + let new_vm_uuid_03 = create_new_vm(&db, &key_03, &daemon_key, &brain_channel).await.unwrap(); + + let del_vm_req = + DeleteVmReq { admin_pubkey: key_03.pubkey.clone(), uuid: new_vm_uuid_03.clone() }; + let _ = client_vm_cli.delete_vm(key_03.sign_request(del_vm_req).unwrap()).await.unwrap(); + + let acc: db::Account = db.select((ACCOUNT, key_03.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc.balance, 10 * TOKEN_DECIMAL); + + let deleted_vm = + db.select::>((DELETED_VM, new_vm_uuid_03)).await.unwrap(); + assert!(deleted_vm.is_some()); +} + #[tokio::test] // TODO: create vm for this user before testing this async fn test_list_vm_contracts() { @@ -240,3 +299,5 @@ async fn test_extend_vm() { let acc: db::Account = db_conn.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); assert_eq!(acc.balance, expected_bal_02); } + +// TODO: test register vm node, delete vm contract while node offline, kick, etc.. diff --git a/tests/grpc_vm_daemon_test.rs b/tests/grpc_vm_daemon_test.rs index f8ce79a..2e47c42 100644 --- a/tests/grpc_vm_daemon_test.rs +++ b/tests/grpc_vm_daemon_test.rs @@ -1,8 +1,7 @@ use common::prepare_test_env::{ prepare_test_db, run_service_for_stream, run_service_in_background, }; -use common::test_utils::Key; -use common::vm_cli_utils::airdrop; +use common::test_utils::{airdrop, Key}; use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; @@ -39,7 +38,7 @@ async fn test_brain_message() { node_pubkey: daemon_key, price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; airdrop(&brain_channel, &cli_key.pubkey, 10).await.unwrap();