diff --git a/.gitignore b/.gitignore index ed3ad69..0a5c507 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target secrets tmp +.env diff --git a/Cargo.lock b/Cargo.lock index 3826a4e..b8a20bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d6ca058d2de78b5257517034bca2b2c7d5929db8" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#694d5811aa0edc9b2b9bdb17c6e972b04a21b96f" dependencies = [ "bincode 2.0.1", "prost", diff --git a/Cargo.toml b/Cargo.toml index 6011662..9d96c45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ serde_yaml = "0.9.34" surrealdb = "2.2.2" tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } tonic = { version = "0.12", features = ["tls"] } -detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain" } +detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" } ed25519-dalek = "2.1.1" bs58 = "0.5.1" tokio-stream = "0.1.17" diff --git a/src/bin/migration0.rs b/src/bin/migration0.rs index e622597..fd3c50f 100644 --- a/src/bin/migration0.rs +++ b/src/bin/migration0.rs @@ -17,6 +17,7 @@ async fn main() -> Result<(), Box> { let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env"); let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap(); + env_logger::builder().filter_level(log::LevelFilter::Trace); db::migration0(&db, &old_brain_data).await?; diff --git a/src/constants.rs b/src/constants.rs index 49098a3..c8aa89e 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -48,3 +48,6 @@ pub const ID_ALPHABET: [char; 62] = [ pub const TOKEN_DECIMAL: u64 = 1_000_000_000; pub const MIN_ESCROW: u64 = 5000 * TOKEN_DECIMAL; + +pub const APP_DAEMON_TIMEOUT: u64 = 20; +pub const VM_DAEMON_TIMEOUT: u64 = 10; diff --git a/src/db/app.rs b/src/db/app.rs index 3abe616..fdaac6d 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -1,11 +1,13 @@ use std::time::Duration; use super::Error; -use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ}; +use crate::constants::{ + ACCOUNT, ACTIVE_APP, APP_DAEMON_TIMEOUT, APP_NODE, DELETED_APP, NEW_APP_REQ, +}; use crate::db; use crate::db::general::Report; use crate::old_brain; -use detee_shared::app_proto; +use detee_shared::app_proto::{self, NewAppRes}; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; use surrealdb::sql::Datetime; @@ -68,8 +70,8 @@ pub struct NewAppReq { pub hratls_pubkey: String, pub ports: Vec, pub memory_mb: u32, - pub vcpu: u32, - pub disk_mb: u32, + pub vcpus: u32, + pub disk_size_gb: u32, pub locked_nano: u64, pub price_per_unit: u64, pub error: String, @@ -81,26 +83,117 @@ impl NewAppReq { let new_app_req: Option = db.select((NEW_APP_REQ, id)).await?; Ok(new_app_req) } - pub async fn submit_error( - db: &Surreal, - id: &str, - error: String, - ) -> Result, Error> { - #[derive(Serialize)] - struct NewAppError { - error: String, + pub async fn submit_error(db: &Surreal, id: &str, error: String) -> Result<(), Error> { + let tx_query = String::from( + " + BEGIN TRANSACTION; + + LET $new_app_req = $new_app_req_input; + LET $error = $error_input; + LET $record = (select * from $new_app_req)[0]; + LET $admin = $record.in; + + if $record == None {{ + THROW 'app req not exist ' + $new_app_req + }}; + + UPDATE $new_app_req SET error = $error; + + UPDATE $admin SET tmp_locked -= $record.locked_nano; + UPDATE $admin SET balance += $record.locked_nano; + + COMMIT TRANSACTION;", + ); + + log::trace!("submit_new_app_err query: {tx_query}"); + + let mut query_resp = db + .query(tx_query) + .bind(("new_app_req_input", RecordId::from((NEW_APP_REQ, id)))) + .bind(("error_input", error)) + .await?; + + let query_err = query_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in submit_new_app_err: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str { + log::error!("app req not exist: {}", query_err[&4]); + return Err(Error::ContractNotFound); + } else { + log::error!("Unknown error in submit_new_app_err: {query_err:?}"); + return Err(Error::Unknown("submit_new_app_req".to_string())); + } } - let record: Option = - db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?; - - Ok(record) + Ok(()) } - pub async fn submit(self, db: &Surreal) -> Result, Error> { - // TODO: handle financial transaction - let new_app_req: Vec = db.insert(NEW_APP_REQ).relation(self).await?; - Ok(new_app_req) + pub async fn submit(self, db: &Surreal) -> Result<(), Error> { + let locked_nano = self.locked_nano; + let tx_query = format!( " + BEGIN TRANSACTION; + + LET $account = $account_input; + LET $new_app_req = $new_app_req_input; + LET $app_node = $app_node_input; + LET $package_url = $package_url_input; + LET $mr_enclave = $mr_enclave_input; + LET $hratls_pubkey = $hratls_pubkey_input; + LET $app_name = $app_name_input; + + UPDATE $account SET balance -= {locked_nano}; + IF $account.balance < 0 {{ + THROW 'Insufficient funds.' + }}; + UPDATE $account SET tmp_locked += {locked_nano}; + + + RELATE + $account + ->$new_app_req + ->$app_node + CONTENT {{ + created_at: time::now(), app_name: $app_name, package_url: $package_url, + mr_enclave: $mr_enclave, hratls_pubkey: $hratls_pubkey, ports: {:?}, memory_mb: {}, + vcpus: {}, disk_size_gb: {}, locked_nano: {locked_nano}, price_per_unit: {}, error: '', + }}; + + COMMIT TRANSACTION; + ", + self.ports, self.memory_mb, self.vcpus, self.disk_size_gb, self.price_per_unit); + + log::trace!("submit_new_app_req query: {tx_query}"); + + let mut query_resp = db + .query(tx_query) + .bind(("account_input", self.admin)) + .bind(("new_app_req_input", self.id)) + .bind(("app_node_input", self.app_node)) + .bind(("package_url_input", self.package_url)) + .bind(("mr_enclave_input", self.mr_enclave)) + .bind(("hratls_pubkey_input", self.hratls_pubkey)) + .bind(("app_name_input", self.app_name)) + .await?; + + let query_err = query_resp.take_errors(); + if !query_err.is_empty() { + log::trace!("errors in submit_new_app_req: {query_err:?}"); + let tx_fail_err_str = + String::from("The query was not executed due to a failed transaction"); + + if query_err.contains_key(&8) && query_err[&8].to_string() != tx_fail_err_str { + log::error!("Transaction error: {}", query_err[&8]); + return Err(Error::InsufficientFunds); + } else { + log::error!("Unknown error in submit_new_app_req: {query_err:?}"); + return Err(Error::Unknown("submit_new_app_req".to_string())); + } + } + + Ok(()) } } @@ -221,22 +314,31 @@ impl ActiveApp { (self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64) } - pub async fn activate(db: &Surreal, id: &str) -> Result<(), Error> { - let new_app_req = match NewAppReq::get(db, id).await? { + pub async fn activate( + db: &Surreal, + new_app_res: app_proto::NewAppRes, + ) -> Result<(), Error> { + let new_app_req = match NewAppReq::get(db, &new_app_res.uuid).await? { Some(r) => r, None => return Ok(()), }; + let mapped_ports = new_app_res + .mapped_ports + .into_iter() + .map(|data| (data.host_port, data.guest_port)) + .collect::>(); + let active_app = Self { - id: RecordId::from((ACTIVE_APP, id)), + id: RecordId::from((ACTIVE_APP, &new_app_res.uuid)), admin: new_app_req.admin, app_node: new_app_req.app_node, app_name: new_app_req.app_name, - mapped_ports: vec![], + mapped_ports, host_ipv4: String::new(), - vcpus: new_app_req.vcpu, + vcpus: new_app_req.vcpus, memory_mb: new_app_req.memory_mb, - disk_size_gb: new_app_req.disk_mb, + disk_size_gb: new_app_req.disk_size_gb, created_at: new_app_req.created_at.clone(), price_per_unit: new_app_req.price_per_unit, locked_nano: new_app_req.locked_nano, @@ -246,7 +348,13 @@ impl ActiveApp { hratls_pubkey: new_app_req.hratls_pubkey.clone(), }; + let admin_account = active_app.admin.key().to_string(); + let locked_nano = active_app.locked_nano; + let _: Vec = db.insert(()).relation(active_app).await?; + db.delete::>((NEW_APP_REQ, &new_app_res.uuid)).await?; + db.query(format!("UPDATE {ACCOUNT}:{admin_account} SET tmp_locked -= {locked_nano};")) + .await?; Ok(()) } @@ -261,7 +369,15 @@ impl ActiveApp { Ok(false) } } - pub async fn listen(db: &Surreal, app_id: &str) -> Result { +} + +pub enum WrappedAppResp { + NewAppRes(NewAppRes), + Error(NewAppRes), +} + +impl WrappedAppResp { + pub async fn listen(db: &Surreal, app_id: &str) -> Result { let mut query_response = db .query(format!( "live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};" @@ -272,14 +388,40 @@ impl ActiveApp { let mut error_stream = query_response.stream::>(0)?; let mut active_app_stream = query_response.stream::>(1)?; - tokio::time::timeout(Duration::from_secs(30), async { + let mut error = db + .query(format!("select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};")) + .await?; + if let Some(error_on_newapp_req) = error.take::>(0)? { + if !error_on_newapp_req.error.is_empty() { + let app_daemon_err = NewAppRes { + uuid: app_id.to_string(), + error: error_on_newapp_req.error, + ..Default::default() + }; + + return Ok(Self::Error(app_daemon_err)); + } + } + + if let Some(active_app) = db.select::>((ACTIVE_APP, app_id)).await? { + return Ok(Self::NewAppRes(active_app.into())); + } + log::trace!("listening for table: {NEW_APP_REQ}"); + + tokio::time::timeout(Duration::from_secs(APP_DAEMON_TIMEOUT), async { loop { tokio::select! { Some(err_notif) = error_stream.next() =>{ match err_notif{ Ok(err_notif) =>{ if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){ - return Err(Error::NewAppDaemonResp(err_notif.data.error)) + let app_daemon_err = NewAppRes { + uuid: app_id.to_string(), + error: err_notif.data.error, + ..Default::default() + }; + + return Ok(Self::Error(app_daemon_err)); } } Err(e) => return Err(e.into()) @@ -291,7 +433,7 @@ impl ActiveApp { Ok(active_app_notif) =>{ if active_app_notif.action == surrealdb::Action::Create { let _: Option = db.delete((NEW_APP_REQ, app_id)).await?; - return Ok(active_app_notif.data); + return Ok(Self::NewAppRes(active_app_notif.data.into())); } } Err(e) => return Err(e.into()) diff --git a/src/db/mod.rs b/src/db/mod.rs index 7f22fdb..6af367b 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -31,8 +31,6 @@ pub enum Error { UnknownTable(String), #[error("Daemon channel got closed: {0}")] AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), - #[error("AppDaemon Error {0}")] - NewAppDaemonResp(String), #[error("Minimum escrow amount is {MIN_ESCROW}")] MinimalEscrow, #[error("Insufficient funds, deposit more tokens")] @@ -186,7 +184,7 @@ pub async fn live_appnode_msgs< } Err(e) => { log::error!( - "live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" + "live_appnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" ); return Err(Error::from(e)); } diff --git a/src/db/vm.rs b/src/db/vm.rs index b216206..28c1fb8 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -3,7 +3,8 @@ use std::time::Duration; use super::Error; use crate::constants::{ - ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT, + ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_DAEMON_TIMEOUT, VM_NODE, + VM_UPDATE_EVENT, }; use crate::db::{Account, ErrorFromTable, Report}; use crate::old_brain; @@ -353,7 +354,7 @@ impl WrappedMeasurement { } log::trace!("listening for table: {table}"); - tokio::time::timeout(Duration::from_secs(10), async { + tokio::time::timeout(Duration::from_secs(VM_DAEMON_TIMEOUT), async { loop { tokio::select! { error_notification = error_stream.next() => { diff --git a/src/grpc/app.rs b/src/grpc/app.rs index ec742d7..6ab8565 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -145,7 +145,7 @@ impl BrainAppDaemon for AppDaemonServer { ) .await?; } else { - db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?; + db::ActiveApp::activate(&self.db, new_app_resp).await?; } } Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => { @@ -182,28 +182,38 @@ impl BrainAppCli for AppCliServer { async fn new_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; + // TODO: make it atleast 1 hour + if req.locked_nano < 100 { + log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); + return Err(Status::unknown("lock atleaset 100 nano lps")); + } info!("new_app process starting for {:?}", req); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); } let db_req: db::NewAppReq = req.into(); - let id = db_req.id.to_string(); + let id = db_req.id.key().to_string(); let (tx, rx) = tokio::sync::oneshot::channel(); let db = self.db.clone(); tokio::spawn(async move { - let _ = tx.send(db::ActiveApp::listen(&db, &id).await); + let _ = tx.send(db::WrappedAppResp::listen(&db, &id).await); }); - db_req.submit(&self.db).await?; match rx.await { + Ok(Ok(db::WrappedAppResp::NewAppRes(new_app_resp))) => Ok(Response::new(new_app_resp)), + Ok(Ok(db::WrappedAppResp::Error(err))) => Ok(Response::new(err)), Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( "Network timeout. Please try again later or contact the DeTEE devs team.", )), - Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)), - Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())), + Ok(Err(e)) => { + log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); + Err(Status::unknown( + "Unknown error. Please try again or contact the DeTEE devs team.", + )) + } Err(e) => { log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); Err(Status::unknown( diff --git a/src/grpc/types.rs b/src/grpc/types.rs index 900df5a..bc2b098 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -274,8 +274,8 @@ impl From for AppContract { public_ipv4: value.host_ipv4, resource: Some(AppResource { memory_mb: value.memory_mb, - disk_mb: value.disk_size_gb, - vcpu: value.vcpus, + disk_size_gb: value.disk_size_gb, + vcpus: value.vcpus, ports: value.mapped_ports.iter().map(|(_, g)| *g).collect(), }), mapped_ports: value @@ -316,8 +316,8 @@ impl From for db::NewAppReq { hratls_pubkey: val.hratls_pubkey, ports: resource.ports, memory_mb: resource.memory_mb, - vcpu: resource.vcpu, - disk_mb: resource.disk_mb, + vcpus: resource.vcpus, + disk_size_gb: resource.disk_size_gb, locked_nano: val.locked_nano, price_per_unit: val.price_per_unit, error: String::new(), @@ -329,9 +329,9 @@ impl From for db::NewAppReq { impl From for NewAppReq { fn from(value: db::NewAppReq) -> Self { let resource = AppResource { - vcpu: value.vcpu, + vcpus: value.vcpus, memory_mb: value.memory_mb, - disk_mb: value.disk_mb, + disk_size_gb: value.disk_size_gb, ports: value.ports, }; let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default()); diff --git a/src/grpc/vm.rs b/src/grpc/vm.rs index 9b5e6aa..c3f5ea3 100644 --- a/src/grpc/vm.rs +++ b/src/grpc/vm.rs @@ -222,6 +222,11 @@ impl BrainVmCli for VmCliServer { async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; + // TODO: make it atleast 1 hour + if req.locked_nano < 100 { + log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); + return Err(Status::unknown("lock atleaset 100 nano lps")); + } info!("New VM requested via CLI: {req:?}"); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); diff --git a/surql/tables.sql b/surql/tables.sql index 4ab0a99..65e9381 100644 --- a/surql/tables.sql +++ b/surql/tables.sql @@ -99,8 +99,8 @@ DEFINE FIELD mr_enclave ON TABLE new_app_req TYPE string; DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string; DEFINE FIELD ports ON TABLE new_app_req TYPE array; DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int; -DEFINE FIELD vcpu ON TABLE new_app_req TYPE int; -DEFINE FIELD disk_mb ON TABLE new_app_req TYPE int; +DEFINE FIELD vcpus ON TABLE new_app_req TYPE int; +DEFINE FIELD disk_size_gb ON TABLE new_app_req TYPE int; DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int; DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int; DEFINE FIELD error ON TABLE new_app_req TYPE string; diff --git a/tests/common/app_daemon_utils.rs b/tests/common/app_daemon_utils.rs new file mode 100644 index 0000000..1d4fa52 --- /dev/null +++ b/tests/common/app_daemon_utils.rs @@ -0,0 +1,144 @@ +use anyhow::Result; +use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient; +use detee_shared::app_proto::{self, NewAppRes, RegisterAppNodeReq}; +use detee_shared::common_proto::MappedPort; +use futures::StreamExt; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::transport::Channel; + +use super::test_utils::Key; + +pub async fn mock_app_daemon( + brain_channel: &Channel, + daemon_error: Option, +) -> Result { + let mut daemon_client = BrainAppDaemonClient::new(brain_channel.clone()); + let daemon_key = Key::new(); + + register_app_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?; + + let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); + + tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); + + let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); + tokio::spawn(daemon_msg_sender( + daemon_client.clone(), + daemon_key.clone(), + daemon_msg_tx.clone(), + rx, + )); + + tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error)); + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + Ok(daemon_key.pubkey) +} + +pub async fn register_app_node( + client: &mut BrainAppDaemonClient, + key: &Key, + operator_wallet: &str, +) -> Result> { + log::info!("Registering app_node: {}", key.pubkey); + let node_pubkey = key.pubkey.clone(); + + let req = RegisterAppNodeReq { + node_pubkey, + operator_wallet: operator_wallet.to_string(), + main_ip: String::from("185.243.218.213"), + city: String::from("Oslo"), + country: String::from("Norway"), + region: String::from("EU"), + price: 1200, + }; + + let mut grpc_stream = client.register_app_node(key.sign_request(req)?).await?.into_inner(); + + let mut deleted_app_reqs = Vec::new(); + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(del_app_req) => { + deleted_app_reqs.push(del_app_req); + } + Err(e) => { + panic!("Received error instead of deleted_app_reqs: {e:?}"); + } + } + } + Ok(deleted_app_reqs) +} + +pub async fn daemon_listener( + mut client: BrainAppDaemonClient, + key: Key, + tx: mpsc::Sender, +) -> Result<()> { + log::info!("listening app_daemon"); + let mut grpc_stream = + client.brain_messages(key.sign_stream_auth_app(vec![])?).await?.into_inner(); + + while let Some(Ok(stream_update)) = grpc_stream.next().await { + log::info!("app deamon got notified: {:?}", &stream_update); + let _ = tx.send(stream_update).await; + } + + Ok(()) +} + +pub async fn daemon_msg_sender( + mut client: BrainAppDaemonClient, + key: Key, + tx: mpsc::Sender, + rx: mpsc::Receiver, +) -> Result<()> { + log::info!("sender app_daemon"); + let rx_stream = ReceiverStream::new(rx); + tx.send(app_proto::DaemonMessageApp { + msg: Some(app_proto::daemon_message_app::Msg::Auth(key.sign_stream_auth_app(vec![])?)), + }) + .await?; + client.daemon_messages(rx_stream).await?; + Ok(()) +} + +pub async fn daemon_engine( + tx: mpsc::Sender, + mut rx: mpsc::Receiver, + new_app_err: Option, +) -> Result<()> { + log::info!("daemon engine app_daemon"); + while let Some(brain_msg) = rx.recv().await { + match brain_msg.msg { + Some(app_proto::brain_message_app::Msg::NewAppReq(new_app_req)) => { + let exposed_ports = + [vec![34500], new_app_req.resource.unwrap_or_default().ports].concat(); + + let mapped_ports = exposed_ports + .into_iter() + .map(|port| MappedPort { host_port: port, guest_port: port }) + .collect::>(); + + let res_data = NewAppRes { + uuid: new_app_req.uuid, + mapped_ports, + ip_address: "127.0.0.1".to_string(), + error: new_app_err.clone().unwrap_or_default(), + }; + + let res = app_proto::DaemonMessageApp { + msg: Some(app_proto::daemon_message_app::Msg::NewAppRes(res_data)), + }; + tx.send(res).await?; + } + Some(app_proto::brain_message_app::Msg::DeleteAppReq(_del_app_req)) => { + todo!() + } + None => todo!(), + } + } + + Ok(()) +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 09e1a27..e5947ff 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -6,3 +6,6 @@ pub mod test_utils; pub mod vm_cli_utils; #[allow(dead_code)] pub mod vm_daemon_utils; + +#[allow(dead_code)] +pub mod app_daemon_utils; diff --git a/tests/common/prepare_test_env.rs b/tests/common/prepare_test_env.rs index 42e6e0c..c2aba72 100644 --- a/tests/common/prepare_test_env.rs +++ b/tests/common/prepare_test_env.rs @@ -1,4 +1,6 @@ use anyhow::Result; +use detee_shared::app_proto::brain_app_cli_server::BrainAppCliServer; +use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemonServer; use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; @@ -7,6 +9,7 @@ use hyper_util::rt::TokioIo; use std::net::SocketAddr; use std::sync::Arc; use surreal_brain::constants::DB_SCHEMA_FILES; +use surreal_brain::grpc::app::{AppCliServer, AppDaemonServer}; use surreal_brain::grpc::general::GeneralCliServer; use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; use surrealdb::engine::remote::ws::Client; @@ -96,6 +99,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream { .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) + .add_service(BrainAppCliServer::new(AppCliServer::new(db_arc.clone()))) + .add_service(BrainAppDaemonServer::new(AppDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await?; diff --git a/tests/common/test_utils.rs b/tests/common/test_utils.rs index a99efb0..0c5b884 100644 --- a/tests/common/test_utils.rs +++ b/tests/common/test_utils.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use detee_shared::app_proto as sgx_proto; use detee_shared::vm_proto as snp_proto; use ed25519_dalek::{Signer, SigningKey}; use itertools::Itertools; @@ -63,11 +64,22 @@ impl Key { Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) } - pub fn sign_stream_auth(&self, contracts: Vec) -> Result { + pub fn sign_stream_auth_vm( + &self, + contracts: Vec, + ) -> Result { let pubkey = self.pubkey.clone(); let timestamp = chrono::Utc::now().to_rfc3339(); let signature = self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature }) } + + pub fn sign_stream_auth_app(&self, contracts: Vec) -> Result { + let pubkey = self.pubkey.clone(); + let timestamp = chrono::Utc::now().to_rfc3339(); + let signature = + self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; + Ok(sgx_proto::DaemonAuth { timestamp, pubkey, contracts, signature }) + } } diff --git a/tests/common/vm_cli_utils.rs b/tests/common/vm_cli_utils.rs index 38aa876..7206d31 100644 --- a/tests/common/vm_cli_utils.rs +++ b/tests/common/vm_cli_utils.rs @@ -35,7 +35,7 @@ pub async fn create_new_vm( node_pubkey: node_pubkey.to_string(), price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; diff --git a/tests/common/vm_daemon_utils.rs b/tests/common/vm_daemon_utils.rs index dc3c367..96abacc 100644 --- a/tests/common/vm_daemon_utils.rs +++ b/tests/common/vm_daemon_utils.rs @@ -59,8 +59,8 @@ pub async fn register_vm_node( let mut deleted_vm_reqs = Vec::new(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { - Ok(del_vm_rq) => { - deleted_vm_reqs.push(del_vm_rq); + Ok(del_vm_req) => { + deleted_vm_reqs.push(del_vm_req); } Err(e) => { panic!("Received error instead of deleted_vm_reqs: {e:?}"); @@ -76,7 +76,8 @@ pub async fn daemon_listener( tx: mpsc::Sender, ) -> Result<()> { log::info!("listening vm_daemon"); - let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner(); + let mut grpc_stream = + client.brain_messages(key.sign_stream_auth_vm(vec![])?).await?.into_inner(); while let Some(Ok(stream_update)) = grpc_stream.next().await { log::info!("vm deamon got notified: {:?}", &stream_update); @@ -95,7 +96,7 @@ pub async fn daemon_msg_sender( log::info!("sender vm_daemon"); let rx_stream = ReceiverStream::new(rx); tx.send(vm_proto::VmDaemonMessage { - msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)), + msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth_vm(vec![])?)), }) .await?; client.daemon_messages(rx_stream).await?; diff --git a/tests/grpc_app_cli_test.rs b/tests/grpc_app_cli_test.rs new file mode 100644 index 0000000..574f3b3 --- /dev/null +++ b/tests/grpc_app_cli_test.rs @@ -0,0 +1,91 @@ +use common::app_daemon_utils::mock_app_daemon; +use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; +use common::test_utils::Key; +use common::vm_cli_utils::airdrop; +use detee_shared::app_proto; +use detee_shared::app_proto::brain_app_cli_client::BrainAppCliClient; +use std::vec; +use surreal_brain::constants::{ACCOUNT, ACTIVE_APP, NEW_APP_REQ, TOKEN_DECIMAL}; +use surreal_brain::db::prelude as db; + +mod common; + +#[tokio::test] +async fn test_app_creation() { + /* + env_logger::builder() + .filter_level(log::LevelFilter::Trace) + .filter_module("tungstenite", log::LevelFilter::Debug) + .filter_module("tokio_tungstenite", log::LevelFilter::Debug) + .init(); + */ + let db = prepare_test_db().await.unwrap(); + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap(); + + let key = Key::new(); + + let mut new_app_req = app_proto::NewAppReq { + admin_pubkey: key.pubkey.clone(), + node_pubkey: daemon_key.clone(), + price_per_unit: 1200, + resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }), + locked_nano: 100, + ..Default::default() + }; + + let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); + let new_app_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await; + assert!(new_app_resp.is_err()); + let new_app_err = new_app_resp.err().unwrap(); + assert!(new_app_err.to_string().contains("Insufficient funds")); + + let airdrop_amount = 10; + airdrop(&brain_channel, &key.pubkey, airdrop_amount).await.unwrap(); + + let new_app_resp = client_app_cli + .new_app(key.sign_request(new_app_req.clone()).unwrap()) + .await + .unwrap() + .into_inner(); + let active_app = + db.select::>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap(); + assert!(active_app.is_some()); + + let daemon_key_02 = + mock_app_daemon(&brain_channel, Some("something went wrong 01".to_string())).await.unwrap(); + + new_app_req.node_pubkey = daemon_key_02.clone(); + + let new_app_resp = client_app_cli + .new_app(key.sign_request(new_app_req.clone()).unwrap()) + .await + .unwrap() + .into_inner(); + assert!(!new_app_resp.error.is_empty()); + + let app_req_db = + db.select::>((NEW_APP_REQ, new_app_resp.uuid)).await.unwrap(); + + assert!(!app_req_db.unwrap().error.is_empty()); + + let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100)); + assert_eq!(acc_db.tmp_locked, 0); + + let locking_nano = 1288; + new_app_req.node_pubkey = daemon_key; + new_app_req.locked_nano = locking_nano; + + let new_app_resp = + client_app_cli.new_app(key.sign_request(new_app_req).unwrap()).await.unwrap().into_inner(); + assert!(new_app_resp.error.is_empty()); + + let active_app = + db.select::>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap(); + assert!(active_app.is_some()); + + let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); + assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100)); + assert_eq!(acc_db.tmp_locked, 0); +} diff --git a/tests/grpc_vm_cli_test.rs b/tests/grpc_vm_cli_test.rs index 475e11b..5d68ce1 100644 --- a/tests/grpc_vm_cli_test.rs +++ b/tests/grpc_vm_cli_test.rs @@ -73,7 +73,7 @@ async fn test_vm_creation() { } let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); - assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL); + assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100)); assert_eq!(acc_db.tmp_locked, 0); new_vm_req.node_pubkey = daemon_key; @@ -87,7 +87,7 @@ async fn test_vm_creation() { assert!(active_vm.is_some()); let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); - assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - locking_nano); + assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100)); assert_eq!(acc_db.tmp_locked, 0); } @@ -109,7 +109,7 @@ async fn test_timeout_vm_creation() { node_pubkey: daemon_key.pubkey, price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; diff --git a/tests/grpc_vm_daemon_test.rs b/tests/grpc_vm_daemon_test.rs index f8ce79a..5a684e2 100644 --- a/tests/grpc_vm_daemon_test.rs +++ b/tests/grpc_vm_daemon_test.rs @@ -39,7 +39,7 @@ async fn test_brain_message() { node_pubkey: daemon_key, price_per_unit: 1200, extra_ports: vec![8080, 8081], - locked_nano: 0, + locked_nano: 100, ..Default::default() }; airdrop(&brain_channel, &cli_key.pubkey, 10).await.unwrap();