From e59a3c8fd8bef04c3b4c1b22a1f5fcf0ca1fd9cf Mon Sep 17 00:00:00 2001 From: Noor Date: Tue, 3 Jun 2025 14:54:22 +0530 Subject: [PATCH] Implement new app contract balance locking minimum locking balance on deployment locking balance on app deployment refunding locked nano while error on daemon returning appropreate error on app deployment fixed some typos on logging new timeout constants for daemon respose minor change in schema and proto extensive tests on app deployments fixed some vm tests --- .gitignore | 1 + Cargo.lock | 2 +- Cargo.toml | 2 +- src/bin/migration0.rs | 1 + src/constants.rs | 3 + src/db/app.rs | 202 ++++++++++++++++++++++++++----- src/db/mod.rs | 4 +- src/db/vm.rs | 5 +- src/grpc/app.rs | 22 +++- src/grpc/types.rs | 12 +- src/grpc/vm.rs | 5 + surql/tables.sql | 4 +- tests/common/app_daemon_utils.rs | 144 ++++++++++++++++++++++ tests/common/mod.rs | 3 + tests/common/prepare_test_env.rs | 5 + tests/common/test_utils.rs | 14 ++- tests/common/vm_cli_utils.rs | 2 +- tests/common/vm_daemon_utils.rs | 9 +- tests/grpc_app_cli_test.rs | 91 ++++++++++++++ tests/grpc_vm_cli_test.rs | 6 +- tests/grpc_vm_daemon_test.rs | 2 +- 21 files changed, 478 insertions(+), 61 deletions(-) create mode 100644 tests/common/app_daemon_utils.rs create mode 100644 tests/grpc_app_cli_test.rs diff --git a/.gitignore b/.gitignore index ed3ad69..0a5c507 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target secrets tmp +.env diff --git a/Cargo.lock b/Cargo.lock index 3826a4e..b8a20bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d6ca058d2de78b5257517034bca2b2c7d5929db8" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#694d5811aa0edc9b2b9bdb17c6e972b04a21b96f" dependencies = [ "bincode 2.0.1", "prost", diff --git a/Cargo.toml b/Cargo.toml index 6011662..9d96c45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ serde_yaml = "0.9.34" surrealdb = "2.2.2" tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } tonic = { version = "0.12", features = ["tls"] } -detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain" } +detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" } ed25519-dalek = "2.1.1" bs58 = "0.5.1" tokio-stream = "0.1.17" diff --git a/src/bin/migration0.rs b/src/bin/migration0.rs index e622597..fd3c50f 100644 --- a/src/bin/migration0.rs +++ b/src/bin/migration0.rs @@ -17,6 +17,7 @@ async fn main() -> Result<(), Box> { let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env"); let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap(); + env_logger::builder().filter_level(log::LevelFilter::Trace); db::migration0(&db, &old_brain_data).await?; diff --git a/src/constants.rs b/src/constants.rs index 49098a3..c8aa89e 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -48,3 +48,6 @@ pub const ID_ALPHABET: [char; 62] = [ pub const TOKEN_DECIMAL: u64 = 1_000_000_000; pub const MIN_ESCROW: u64 = 5000 * TOKEN_DECIMAL; + +pub const APP_DAEMON_TIMEOUT: u64 = 20; +pub const VM_DAEMON_TIMEOUT: u64 = 10; diff --git a/src/db/app.rs b/src/db/app.rs index 3abe616..fdaac6d 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,11 +1,13 @@ use std::time::Duration; use super::Error; -use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ}; +use crate::constants::{ + ACCOUNT, ACTIVE_APP, APP_DAEMON_TIMEOUT, APP_NODE, DELETED_APP, NEW_APP_REQ, +}; use crate::db; use crate::db::general::Report; use crate::old_brain; -use detee_shared::app_proto; +use detee_shared::app_proto::{self, NewAppRes}; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; use surrealdb::sql::Datetime; @@ -68,8 +70,8 @@ pub struct NewAppReq { pub hratls_pubkey: String, pub ports: Vec, pub memory_mb: u32, - pub vcpu: u32, - pub disk_mb: u32, + pub vcpus: u32, + pub disk_size_gb: u32, pub locked_nano: u64, pub price_per_unit: u64, pub error: String, @@ -81,26 +83,117 @@ impl NewAppReq { let new_app_req: Option = db.select((NEW_APP_REQ, id)).await?; Ok(new_app_req) } - pub async fn submit_error( - db: &Surreal, - id: &str, - error: String, - ) -> Result, Error> { - #[derive(Serialize)] - struct NewAppError { - error: String, + pub async fn submit_error(db: &Surreal, id: &str, error: String) -> Result<(), Error> { + let tx_query = String::from( + " + BEGIN TRANSACTION; + + LET $new_app_req = $new_app_req_input; + LET $error = $error_input; + LET $record = (select * from $new_app_req)[0]; + LET $admin = $record.in; + + if $record == None {{ + THROW 'app req not exist ' + $new_app_req + }}; + + UPDATE $new_app_req SET error = $error; + + UPDATE $admin SET tmp_locked -= $record.locked_nano; + UPDATE $admin SET balance += $record.locked_nano; + + COMMIT TRANSACTION;", + ); + + log::trace!("submit_new_app_err query: {tx_query}"); + + let mut query_resp = db + .query(tx_query) + .bind(("new_app_req_input", RecordId::from((NEW_APP_REQ, id)))) + .bind(("error_input", error)) + .await?; + + let query_err = query_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in submit_new_app_err: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str { + log::error!("app req not exist: {}", query_err[&4]); + return Err(Error::ContractNotFound); + } else { + log::error!("Unknown error in submit_new_app_err: {query_err:?}"); + return Err(Error::Unknown("submit_new_app_req".to_string())); + } } - let record: Option = - db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?; - - Ok(record) + Ok(()) } - pub async fn submit(self, db: &Surreal) -> Result, Error> { - // TODO: handle financial transaction - let new_app_req: Vec = db.insert(NEW_APP_REQ).relation(self).await?; - Ok(new_app_req) + pub async fn submit(self, db: &Surreal) -> Result<(), Error> { + let locked_nano = self.locked_nano; + let tx_query = format!( " + BEGIN TRANSACTION; + + LET $account = $account_input; + LET $new_app_req = $new_app_req_input; + LET $app_node = $app_node_input; + LET $package_url = $package_url_input; + LET $mr_enclave = $mr_enclave_input; + LET $hratls_pubkey = $hratls_pubkey_input; + LET $app_name = $app_name_input; + + UPDATE $account SET balance -= {locked_nano}; + IF $account.balance < 0 {{ + THROW 'Insufficient funds.' + }}; + UPDATE $account SET tmp_locked += {locked_nano}; + + + RELATE + $account + ->$new_app_req + ->$app_node + CONTENT {{ + created_at: time::now(), app_name: $app_name, package_url: $package_url, + mr_enclave: $mr_enclave, hratls_pubkey: $hratls_pubkey, ports: {:?}, memory_mb: {}, + vcpus: {}, disk_size_gb: {}, locked_nano: {locked_nano}, price_per_unit: {}, error: '', + }}; + + COMMIT TRANSACTION; + ", + self.ports, self.memory_mb, self.vcpus, self.disk_size_gb, self.price_per_unit); + + log::trace!("submit_new_app_req query: {tx_query}"); + + let mut query_resp = db + .query(tx_query) + .bind(("account_input", self.admin)) + .bind(("new_app_req_input", self.id)) + .bind(("app_node_input", self.app_node)) + .bind(("package_url_input", self.package_url)) + .bind(("mr_enclave_input", self.mr_enclave)) + .bind(("hratls_pubkey_input", self.hratls_pubkey)) + .bind(("app_name_input", self.app_name)) + .await?; + + let query_err = query_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in submit_new_app_req: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&8) && query_err[&8].to_string() != tx_fail_err_str { + log::error!("Transaction error: {}", query_err[&8]); + return Err(Error::InsufficientFunds); + } else { + log::error!("Unknown error in submit_new_app_req: {query_err:?}"); + return Err(Error::Unknown("submit_new_app_req".to_string())); + } + } + + Ok(()) } } @@ -221,22 +314,31 @@ impl ActiveApp { (self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64) } - pub async fn activate(db: &Surreal, id: &str) -> Result<(), Error> { - let new_app_req = match NewAppReq::get(db, id).await? { + pub async fn activate( + db: &Surreal, + new_app_res: app_proto::NewAppRes, + ) -> Result<(), Error> { + let new_app_req = match NewAppReq::get(db, &new_app_res.uuid).await? { Some(r) => r, None => return Ok(()), }; + let mapped_ports = new_app_res + .mapped_ports + .into_iter() + .map(|data| (data.host_port, data.guest_port)) + .collect::>(); + let active_app = Self { - id: RecordId::from((ACTIVE_APP, id)), + id: RecordId::from((ACTIVE_APP, &new_app_res.uuid)), admin: new_app_req.admin, app_node: new_app_req.app_node, app_name: new_app_req.app_name, - mapped_ports: vec![], + mapped_ports, host_ipv4: String::new(), - vcpus: new_app_req.vcpu, + vcpus: new_app_req.vcpus, memory_mb: new_app_req.memory_mb, - disk_size_gb: new_app_req.disk_mb, + disk_size_gb: new_app_req.disk_size_gb, created_at: new_app_req.created_at.clone(), price_per_unit: new_app_req.price_per_unit, locked_nano: new_app_req.locked_nano, @@ -246,7 +348,13 @@ impl ActiveApp { hratls_pubkey: new_app_req.hratls_pubkey.clone(), }; + let admin_account = active_app.admin.key().to_string(); + let locked_nano = active_app.locked_nano; + let _: Vec = db.insert(()).relation(active_app).await?; + db.delete::>((NEW_APP_REQ, &new_app_res.uuid)).await?; + db.query(format!("UPDATE {ACCOUNT}:{admin_account} SET tmp_locked -= {locked_nano};")) + .await?; Ok(()) } @@ -261,7 +369,15 @@ impl ActiveApp { Ok(false) } } - pub async fn listen(db: &Surreal, app_id: &str) -> Result { +} + +pub enum WrappedAppResp { + NewAppRes(NewAppRes), + Error(NewAppRes), +} + +impl WrappedAppResp { + pub async fn listen(db: &Surreal, app_id: &str) -> Result { let mut query_response = db .query(format!( "live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};" @@ -272,14 +388,40 @@ impl ActiveApp { let mut error_stream = query_response.stream::>(0)?; let mut active_app_stream = query_response.stream::>(1)?; - tokio::time::timeout(Duration::from_secs(30), async { + let mut error = db + .query(format!("select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};")) + .await?; + if let Some(error_on_newapp_req) = error.take::>(0)? { + if !error_on_newapp_req.error.is_empty() { + let app_daemon_err = NewAppRes { + uuid: app_id.to_string(), + error: error_on_newapp_req.error, + ..Default::default() + }; + + return Ok(Self::Error(app_daemon_err)); + } + } + + if let Some(active_app) = db.select::>((ACTIVE_APP, app_id)).await? { + return Ok(Self::NewAppRes(active_app.into())); + } + log::trace!("listening for table: {NEW_APP_REQ}"); + + tokio::time::timeout(Duration::from_secs(APP_DAEMON_TIMEOUT), async { loop { tokio::select! { Some(err_notif) = error_stream.next() =>{ match err_notif{ Ok(err_notif) =>{ if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){ - return Err(Error::NewAppDaemonResp(err_notif.data.error)) + let app_daemon_err = NewAppRes { + uuid: app_id.to_string(), + error: err_notif.data.error, + ..Default::default() + }; + + return Ok(Self::Error(app_daemon_err)); } } Err(e) => return Err(e.into()) @@ -291,7 +433,7 @@ impl ActiveApp { Ok(active_app_notif) =>{ if active_app_notif.action == surrealdb::Action::Create { let _: Option = db.delete((NEW_APP_REQ, app_id)).await?; - return Ok(active_app_notif.data); + return Ok(Self::NewAppRes(active_app_notif.data.into())); } } Err(e) => return Err(e.into()) diff --git a/src/db/mod.rs b/src/db/mod.rs index 7f22fdb..6af367b 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -31,8 +31,6 @@ pub enum Error { UnknownTable(String), #[error("Daemon channel got closed: {0}")] AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), - #[error("AppDaemon Error {0}")] - NewAppDaemonResp(String), #[error("Minimum escrow amount is {MIN_ESCROW}")] MinimalEscrow, #[error("Insufficient funds, deposit more tokens")] @@ -186,7 +184,7 @@ pub async fn live_appnode_msgs< } Err(e) => { log::error!( - "live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" + "live_appnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" ); return Err(Error::from(e)); } diff --git a/src/db/vm.rs b/src/db/vm.rs index b216206..28c1fb8 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -3,7 +3,8 @@ use std::time::Duration; use super::Error; use crate::constants::{ - ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT, + ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_DAEMON_TIMEOUT, VM_NODE, + VM_UPDATE_EVENT, }; use crate::db::{Account, ErrorFromTable, Report}; use crate::old_brain; @@ -353,7 +354,7 @@ impl WrappedMeasurement { } log::trace!("listening for table: {table}"); - tokio::time::timeout(Duration::from_secs(10), async { + tokio::time::timeout(Duration::from_secs(VM_DAEMON_TIMEOUT), async { loop { tokio::select! { error_notification = error_stream.next() => { diff --git a/src/grpc/app.rs b/src/grpc/app.rs index ec742d7..6ab8565 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -145,7 +145,7 @@ impl BrainAppDaemon for AppDaemonServer { ) .await?; } else { - db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?; + db::ActiveApp::activate(&self.db, new_app_resp).await?; } } Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => { @@ -182,28 +182,38 @@ impl BrainAppCli for AppCliServer { async fn new_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; + // TODO: make it atleast 1 hour + if req.locked_nano < 100 { + log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); + return Err(Status::unknown("lock atleaset 100 nano lps")); + } info!("new_app process starting for {:?}", req); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); } let db_req: db::NewAppReq = req.into(); - let id = db_req.id.to_string(); + let id = db_req.id.key().to_string(); let (tx, rx) = tokio::sync::oneshot::channel(); let db = self.db.clone(); tokio::spawn(async move { - let _ = tx.send(db::ActiveApp::listen(&db, &id).await); + let _ = tx.send(db::WrappedAppResp::listen(&db, &id).await); }); - db_req.submit(&self.db).await?; match rx.await { + Ok(Ok(db::WrappedAppResp::NewAppRes(new_app_resp))) => Ok(Response::new(new_app_resp)), + Ok(Ok(db::WrappedAppResp::Error(err))) => Ok(Response::new(err)), Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( "Network timeout. Please try again later or contact the DeTEE devs team.", )), - Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)), - Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())), + Ok(Err(e)) => { + log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); + Err(Status::unknown( + "Unknown error. Please try again or contact the DeTEE devs team.", + )) + } Err(e) => { log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); Err(Status::unknown( diff --git a/src/grpc/types.rs b/src/grpc/types.rs index 900df5a..bc2b098 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -274,8 +274,8 @@ impl From for AppContract { public_ipv4: value.host_ipv4, resource: Some(AppResource { memory_mb: value.memory_mb, - disk_mb: value.disk_size_gb, - vcpu: value.vcpus, + disk_size_gb: value.disk_size_gb, + vcpus: value.vcpus, ports: value.mapped_ports.iter().map(|(_, g)| *g).collect(), }), mapped_ports: value @@ -316,8 +316,8 @@ impl From for db::NewAppReq { hratls_pubkey: val.hratls_pubkey, ports: resource.ports, memory_mb: resource.memory_mb, - vcpu: resource.vcpu, - disk_mb: resource.disk_mb, + vcpus: resource.vcpus, + disk_size_gb: resource.disk_size_gb, locked_nano: val.locked_nano, price_per_unit: val.price_per_unit, error: String::new(), @@ -329,9 +329,9 @@ impl From for db::NewAppReq { impl From for NewAppReq { fn from(value: db::NewAppReq) -> Self { let resource = AppResource { - vcpu: value.vcpu, + vcpus: value.vcpus, memory_mb: value.memory_mb, - disk_mb: value.disk_mb, + disk_size_gb: value.disk_size_gb, ports: value.ports, }; let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default()); diff --git a/src/grpc/vm.rs b/src/grpc/vm.rs index 9b5e6aa..c3f5ea3 100644 --- a/src/grpc/vm.rs +++ b/src/grpc/vm.rs @@ -222,6 +222,11 @@ impl BrainVmCli for VmCliServer { async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; + // TODO: make it atleast 1 hour + if req.locked_nano < 100 { + log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); + return Err(Status::unknown("lock atleaset 100 nano lps")); + } info!("New VM requested via CLI: {req:?}"); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); diff --git a/surql/tables.sql b/surql/tables.sql index 4ab0a99..65e9381 100644 --- a/surql/tables.sql +++ b/surql/tables.sql @@ -99,8 +99,8 @@ DEFINE FIELD mr_enclave ON TABLE new_app_req TYPE string; DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string; DEFINE FIELD ports ON TABLE new_app_req TYPE array; DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int; -DEFINE FIELD vcpu ON TABLE new_app_req TYPE int; -DEFINE FIELD disk_mb ON TABLE new_app_req TYPE int; +DEFINE FIELD vcpus ON TABLE new_app_req TYPE int; +DEFINE FIELD disk_size_gb ON TABLE new_app_req TYPE int; DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int; DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int; DEFINE FIELD error ON TABLE new_app_req TYPE string; diff --git a/tests/common/app_daemon_utils.rs b/tests/common/app_daemon_utils.rs new file mode 100644 index 0000000..1d4fa52 --- /dev/null +++ b/tests/common/app_daemon_utils.rs @@ -0,0 +1,144 @@ +use anyhow::Result; +use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient; +use detee_shared::app_proto::{self, NewAppRes, RegisterAppNodeReq}; +use detee_shared::common_proto::MappedPort; +use futures::StreamExt; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::transport::Channel; + +use super::test_utils::Key; + +pub async fn mock_app_daemon( + brain_channel: &Channel, + daemon_error: Option, +) -> Result { + let mut daemon_client = BrainAppDaemonClient::new(brain_channel.clone()); + let daemon_key = Key::new(); + + register_app_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?; + + let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); + + tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); + + let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); + tokio::spawn(daemon_msg_sender( + daemon_client.clone(), + daemon_key.clone(), + daemon_msg_tx.clone(), + rx, + )); + + tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error)); + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + Ok(daemon_key.pubkey) +} + +pub async fn register_app_node( + client: &mut BrainAppDaemonClient, + key: &Key, + operator_wallet: &str, +) -> Result> { + log::info!("Registering app_node: {}", key.pubkey); + let node_pubkey = key.pubkey.clone(); + + let req = RegisterAppNodeReq { + node_pubkey, + operator_wallet: operator_wallet.to_string(), + main_ip: String::from("185.243.218.213"), + city: String::from("Oslo"), + country: String::from("Norway"), + region: String::from("EU"), + price: 1200, + }; + + let mut grpc_stream = client.register_app_node(key.sign_request(req)?).await?.into_inner(); + + let mut deleted_app_reqs = Vec::new(); + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(del_app_req) => { + deleted_app_reqs.push(del_app_req); + } + Err(e) => { + panic!("Received error instead of deleted_app_reqs: {e:?}"); + } + } + } + Ok(deleted_app_reqs) +} + +pub async fn daemon_listener( + mut client: BrainAppDaemonClient, + key: Key, + tx: mpsc::Sender, +) -> Result<()> { + log::info!("listening app_daemon"); + let mut grpc_stream = + client.brain_messages(key.sign_stream_auth_app(vec![])?).await?.into_inner(); + + while let Some(Ok(stream_update)) = grpc_stream.next().await { + log::info!("app deamon got notified: {:?}", &stream_update); + let _ = tx.send(stream_update).await; + } + + Ok(()) +} + +pub async fn daemon_msg_sender( + mut client: BrainAppDaemonClient, + key: Key, + tx: mpsc::Sender, + rx: mpsc::Receiver, +) -> Result<()> { + log::info!("sender app_daemon"); + let rx_stream = ReceiverStream::new(rx); + tx.send(app_proto::DaemonMessageApp { + msg: Some(app_proto::daemon_message_app::Msg::Auth(key.sign_stream_auth_app(vec![])?)), + }) + .await?; + client.daemon_messages(rx_stream).await?; + Ok(()) +} + +pub async fn daemon_engine( + tx: mpsc::Sender, + mut rx: mpsc::Receiver, + new_app_err: Option, +) -> Result<()> { + log::info!("daemon engine app_daemon"); + while let Some(brain_msg) = rx.recv().await { + match brain_msg.msg { + Some(app_proto::brain_message_app::Msg::NewAppReq(new_app_req)) => { + let exposed_ports = + [vec![34500], new_app_req.resource.unwrap_or_default().ports].concat(); + + let mapped_ports = exposed_ports + .into_iter() + .map(|port| MappedPort { host_port: port, guest_port: port }) + .collect::>(); + + let res_data = NewAppRes { + uuid: new_app_req.uuid, + mapped_ports, + ip_address: "127.0.0.1".to_string(), + error: new_app_err.clone().unwrap_or_default(), + }; + + let res = app_proto::DaemonMessageApp { + msg: Some(app_proto::daemon_message_app::Msg::NewAppRes(res_data)), + }; + tx.send(res).await?; + } + Some(app_proto::brain_message_app::Msg::DeleteAppReq(_del_app_req)) => { + todo!() + } + None => todo!(), + } + } + + Ok(()) +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 09e1a27..e5947ff 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -6,3 +6,6 @@ pub mod test_utils; pub mod vm_cli_utils; #[allow(dead_code)] pub mod vm_daemon_utils; + +#[allow(dead_code)] +pub mod app_daemon_utils; diff --git a/tests/common/prepare_test_env.rs b/tests/common/prepare_test_env.rs index 42e6e0c..c2aba72 100644 --- a/tests/common/prepare_test_env.rs +++ b/tests/common/prepare_test_env.rs @@ -1,4 +1,6 @@ use anyhow::Result; +use detee_shared::app_proto::brain_app_cli_server::BrainAppCliServer; +use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemonServer; use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; @@ -7,6 +9,7 @@ use hyper_util::rt::TokioIo; use std::net::SocketAddr; use std::sync::Arc; use surreal_brain::constants::DB_SCHEMA_FILES; +use surreal_brain::grpc::app::{AppCliServer, AppDaemonServer}; use surreal_brain::grpc::general::GeneralCliServer; use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; use surrealdb::engine::remote::ws::Client; @@ -96,6 +99,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream { .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) + .add_service(BrainAppCliServer::new(AppCliServer::new(db_arc.clone()))) + .add_service(BrainAppDaemonServer::new(AppDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await?; diff --git a/tests/common/test_utils.rs b/tests/common/test_utils.rs index a99efb0..0c5b884 100644 --- a/tests/common/test_utils.rs +++ b/tests/common/test_utils.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use detee_shared::app_proto as sgx_proto; use detee_shared::vm_proto as snp_proto; use ed25519_dalek::{Signer, SigningKey}; use itertools::Itertools; @@ -63,11 +64,22 @@ impl Key { Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) } - pub fn sign_stream_auth(&self, contracts: Vec) -> Result { + pub fn sign_stream_auth_vm( + &self, + contracts: Vec, + ) -> Result { let pubkey = self.pubkey.clone(); let timestamp = chrono::Utc::now().to_rfc3339(); let signature = self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature }) } + + pub fn sign_stream_auth_app(&self, contracts: Vec) -> Result { + let pubkey = self.pubkey.clone(); + let timestamp = chrono::Utc::now().to_rfc3339(); + let signature = + self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; + Ok(sgx_proto::DaemonAuth { timestamp, pubkey, contracts, signature }) + } } diff --git a/tests/common/vm_cli_utils.rs b/tests/common/vm_cli_utils.rs index 38aa876..7206d31 100644 --- a/tests/common/vm_cli_utils.rs +++ b/tests/common/vm_cli_utils.rs @@ -35,7 +35,7 @@ pub async fn create_new_vm( node_pubkey: node_pubkey.to_string(), price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; diff --git a/tests/common/vm_daemon_utils.rs b/tests/common/vm_daemon_utils.rs index dc3c367..96abacc 100644 --- a/tests/common/vm_daemon_utils.rs +++ b/tests/common/vm_daemon_utils.rs @@ -59,8 +59,8 @@ pub async fn register_vm_node( let mut deleted_vm_reqs = Vec::new(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { - Ok(del_vm_rq) => { - deleted_vm_reqs.push(del_vm_rq); + Ok(del_vm_req) => { + deleted_vm_reqs.push(del_vm_req); } Err(e) => { panic!("Received error instead of deleted_vm_reqs: {e:?}"); @@ -76,7 +76,8 @@ pub async fn daemon_listener( tx: mpsc::Sender, ) -> Result<()> { log::info!("listening vm_daemon"); - let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner(); + let mut grpc_stream = + client.brain_messages(key.sign_stream_auth_vm(vec![])?).await?.into_inner(); while let Some(Ok(stream_update)) = grpc_stream.next().await { log::info!("vm deamon got notified: {:?}", &stream_update); @@ -95,7 +96,7 @@ pub async fn daemon_msg_sender( log::info!("sender vm_daemon"); let rx_stream = ReceiverStream::new(rx); tx.send(vm_proto::VmDaemonMessage { - msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)), + msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth_vm(vec![])?)), }) .await?; client.daemon_messages(rx_stream).await?; diff --git a/tests/grpc_app_cli_test.rs b/tests/grpc_app_cli_test.rs new file mode 100644 index 0000000..574f3b3 --- /dev/null +++ b/tests/grpc_app_cli_test.rs @@ -0,0 +1,91 @@ +use common::app_daemon_utils::mock_app_daemon; +use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; +use common::test_utils::Key; +use common::vm_cli_utils::airdrop; +use detee_shared::app_proto; +use detee_shared::app_proto::brain_app_cli_client::BrainAppCliClient; +use std::vec; +use surreal_brain::constants::{ACCOUNT, ACTIVE_APP, NEW_APP_REQ, TOKEN_DECIMAL}; +use surreal_brain::db::prelude as db; + +mod common; + +#[tokio::test] +async fn test_app_creation() { + /* + env_logger::builder() + .filter_level(log::LevelFilter::Trace) + .filter_module("tungstenite", log::LevelFilter::Debug) + .filter_module("tokio_tungstenite", log::LevelFilter::Debug) + .init(); + */ + let db = prepare_test_db().await.unwrap(); + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap(); + + let key = Key::new(); + + let mut new_app_req = app_proto::NewAppReq { + admin_pubkey: key.pubkey.clone(), + node_pubkey: daemon_key.clone(), + price_per_unit: 1200, + resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }), + locked_nano: 100, + ..Default::default() + }; + + let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); + let new_app_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await; + assert!(new_app_resp.is_err()); + let new_app_err = new_app_resp.err().unwrap(); + assert!(new_app_err.to_string().contains("Insufficient funds")); + + let airdrop_amount = 10; + airdrop(&brain_channel, &key.pubkey, airdrop_amount).await.unwrap(); + + let new_app_resp = client_app_cli + .new_app(key.sign_request(new_app_req.clone()).unwrap()) + .await + .unwrap() + .into_inner(); + let active_app = + db.select::>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap(); + assert!(active_app.is_some()); + + let daemon_key_02 = + mock_app_daemon(&brain_channel, Some("something went wrong 01".to_string())).await.unwrap(); + + new_app_req.node_pubkey = daemon_key_02.clone(); + + let new_app_resp = client_app_cli + .new_app(key.sign_request(new_app_req.clone()).unwrap()) + .await + .unwrap() + .into_inner(); + assert!(!new_app_resp.error.is_empty()); + + let app_req_db = + db.select::>((NEW_APP_REQ, new_app_resp.uuid)).await.unwrap(); + + assert!(!app_req_db.unwrap().error.is_empty()); + + let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100)); + assert_eq!(acc_db.tmp_locked, 0); + + let locking_nano = 1288; + new_app_req.node_pubkey = daemon_key; + new_app_req.locked_nano = locking_nano; + + let new_app_resp = + client_app_cli.new_app(key.sign_request(new_app_req).unwrap()).await.unwrap().into_inner(); + assert!(new_app_resp.error.is_empty()); + + let active_app = + db.select::>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap(); + assert!(active_app.is_some()); + + let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100)); + assert_eq!(acc_db.tmp_locked, 0); +} diff --git a/tests/grpc_vm_cli_test.rs b/tests/grpc_vm_cli_test.rs index 475e11b..5d68ce1 100644 --- a/tests/grpc_vm_cli_test.rs +++ b/tests/grpc_vm_cli_test.rs @@ -73,7 +73,7 @@ async fn test_vm_creation() { } let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); - assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL); + assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100)); assert_eq!(acc_db.tmp_locked, 0); new_vm_req.node_pubkey = daemon_key; @@ -87,7 +87,7 @@ async fn test_vm_creation() { assert!(active_vm.is_some()); let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); - assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - locking_nano); + assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100)); assert_eq!(acc_db.tmp_locked, 0); } @@ -109,7 +109,7 @@ async fn test_timeout_vm_creation() { node_pubkey: daemon_key.pubkey, price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; diff --git a/tests/grpc_vm_daemon_test.rs b/tests/grpc_vm_daemon_test.rs index f8ce79a..5a684e2 100644 --- a/tests/grpc_vm_daemon_test.rs +++ b/tests/grpc_vm_daemon_test.rs @@ -39,7 +39,7 @@ async fn test_brain_message() { node_pubkey: daemon_key, price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; airdrop(&brain_channel, &cli_key.pubkey, 10).await.unwrap();