payments on app deployment and refund for vm and app deletion #5
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @ -1,3 +1,4 @@ | |||||||
| /target | /target | ||||||
| secrets | secrets | ||||||
| tmp | tmp | ||||||
|  | .env | ||||||
|  | |||||||
							
								
								
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -1000,7 +1000,7 @@ dependencies = [ | |||||||
| [[package]] | [[package]] | ||||||
| name = "detee-shared" | name = "detee-shared" | ||||||
| version = "0.1.0" | version = "0.1.0" | ||||||
| source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d6ca058d2de78b5257517034bca2b2c7d5929db8" | source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#005677153b3fcd3251b64111a736c806106fdc04" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "bincode 2.0.1", |  "bincode 2.0.1", | ||||||
|  "prost", |  "prost", | ||||||
|  | |||||||
| @ -13,7 +13,7 @@ serde_yaml = "0.9.34" | |||||||
| surrealdb = "2.2.2" | surrealdb = "2.2.2" | ||||||
| tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } | tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } | ||||||
| tonic = { version = "0.12", features = ["tls"] } | tonic = { version = "0.12", features = ["tls"] } | ||||||
| detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain" } | detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" } | ||||||
| ed25519-dalek = "2.1.1" | ed25519-dalek = "2.1.1" | ||||||
| bs58 = "0.5.1" | bs58 = "0.5.1" | ||||||
| tokio-stream = "0.1.17" | tokio-stream = "0.1.17" | ||||||
|  | |||||||
| @ -17,6 +17,7 @@ async fn main() -> Result<(), Box<dyn Error>> { | |||||||
|     let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env"); |     let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env"); | ||||||
| 
 | 
 | ||||||
|     let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap(); |     let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap(); | ||||||
|  |     env_logger::builder().filter_level(log::LevelFilter::Trace); | ||||||
| 
 | 
 | ||||||
|     db::migration0(&db, &old_brain_data).await?; |     db::migration0(&db, &old_brain_data).await?; | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -48,3 +48,6 @@ pub const ID_ALPHABET: [char; 62] = [ | |||||||
| 
 | 
 | ||||||
| pub const TOKEN_DECIMAL: u64 = 1_000_000_000; | pub const TOKEN_DECIMAL: u64 = 1_000_000_000; | ||||||
| pub const MIN_ESCROW: u64 = 5000 * TOKEN_DECIMAL; | pub const MIN_ESCROW: u64 = 5000 * TOKEN_DECIMAL; | ||||||
|  | 
 | ||||||
|  | pub const APP_DAEMON_TIMEOUT: u64 = 20; | ||||||
|  | pub const VM_DAEMON_TIMEOUT: u64 = 10; | ||||||
|  | |||||||
							
								
								
									
										259
									
								
								src/db/app.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										259
									
								
								src/db/app.rs
									
									
									
									
									
								
							| @ -1,11 +1,13 @@ | |||||||
| use std::time::Duration; | use std::time::Duration; | ||||||
| 
 | 
 | ||||||
| use super::Error; | use super::Error; | ||||||
| use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ}; | use crate::constants::{ | ||||||
|  |     ACCOUNT, ACTIVE_APP, APP_DAEMON_TIMEOUT, APP_NODE, DELETED_APP, NEW_APP_REQ, | ||||||
|  | }; | ||||||
| use crate::db; | use crate::db; | ||||||
| use crate::db::general::Report; | use crate::db::general::Report; | ||||||
| use crate::old_brain; | use crate::old_brain; | ||||||
| use detee_shared::app_proto; | use detee_shared::app_proto::{self, NewAppRes}; | ||||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||||
| use surrealdb::engine::remote::ws::Client; | use surrealdb::engine::remote::ws::Client; | ||||||
| use surrealdb::sql::Datetime; | use surrealdb::sql::Datetime; | ||||||
| @ -68,8 +70,8 @@ pub struct NewAppReq { | |||||||
|     pub hratls_pubkey: String, |     pub hratls_pubkey: String, | ||||||
|     pub ports: Vec<u32>, |     pub ports: Vec<u32>, | ||||||
|     pub memory_mb: u32, |     pub memory_mb: u32, | ||||||
|     pub vcpu: u32, |     pub vcpus: u32, | ||||||
|     pub disk_mb: u32, |     pub disk_size_gb: u32, | ||||||
|     pub locked_nano: u64, |     pub locked_nano: u64, | ||||||
|     pub price_per_unit: u64, |     pub price_per_unit: u64, | ||||||
|     pub error: String, |     pub error: String, | ||||||
| @ -81,26 +83,117 @@ impl NewAppReq { | |||||||
|         let new_app_req: Option<Self> = db.select((NEW_APP_REQ, id)).await?; |         let new_app_req: Option<Self> = db.select((NEW_APP_REQ, id)).await?; | ||||||
|         Ok(new_app_req) |         Ok(new_app_req) | ||||||
|     } |     } | ||||||
|     pub async fn submit_error( |     pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> { | ||||||
|         db: &Surreal<Client>, |         let tx_query = String::from( | ||||||
|         id: &str, |             " | ||||||
|         error: String, |             BEGIN TRANSACTION; | ||||||
|     ) -> Result<Option<Self>, Error> { | 
 | ||||||
|         #[derive(Serialize)] |                 LET $new_app_req = $new_app_req_input; | ||||||
|         struct NewAppError { |                 LET $error = $error_input; | ||||||
|             error: String, |                 LET $record = (select * from $new_app_req)[0]; | ||||||
|  |                 LET $admin = $record.in; | ||||||
|  | 
 | ||||||
|  |                 if $record == None {{ | ||||||
|  |                     THROW 'app req not exist ' + <string>$new_app_req | ||||||
|  |                 }}; | ||||||
|  | 
 | ||||||
|  |                 UPDATE $new_app_req SET error = $error; | ||||||
|  | 
 | ||||||
|  |                 UPDATE $admin SET tmp_locked -= $record.locked_nano; | ||||||
|  |                 UPDATE $admin SET balance += $record.locked_nano; | ||||||
|  | 
 | ||||||
|  |             COMMIT TRANSACTION;",
 | ||||||
|  |         ); | ||||||
|  | 
 | ||||||
|  |         log::trace!("submit_new_app_err query: {tx_query}"); | ||||||
|  | 
 | ||||||
|  |         let mut query_resp = db | ||||||
|  |             .query(tx_query) | ||||||
|  |             .bind(("new_app_req_input", RecordId::from((NEW_APP_REQ, id)))) | ||||||
|  |             .bind(("error_input", error)) | ||||||
|  |             .await?; | ||||||
|  | 
 | ||||||
|  |         let query_err = query_resp.take_errors(); | ||||||
|  |         if !query_err.is_empty() { | ||||||
|  |             log::trace!("errors in submit_new_app_err: {query_err:?}"); | ||||||
|  |             let tx_fail_err_str = | ||||||
|  |                 String::from("The query was not executed due to a failed transaction"); | ||||||
|  | 
 | ||||||
|  |             if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str { | ||||||
|  |                 log::error!("app req not exist: {}", query_err[&4]); | ||||||
|  |                 return Err(Error::ContractNotFound); | ||||||
|  |             } else { | ||||||
|  |                 log::error!("Unknown error in submit_new_app_err: {query_err:?}"); | ||||||
|  |                 return Err(Error::Unknown("submit_new_app_req".to_string())); | ||||||
|  |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         let record: Option<Self> = |         Ok(()) | ||||||
|             db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?; |  | ||||||
| 
 |  | ||||||
|         Ok(record) |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn submit(self, db: &Surreal<Client>) -> Result<Vec<Self>, Error> { |     pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> { | ||||||
|         // TODO: handle financial transaction
 |         let locked_nano = self.locked_nano; | ||||||
|         let new_app_req: Vec<Self> = db.insert(NEW_APP_REQ).relation(self).await?; |         let tx_query = format!( " | ||||||
|         Ok(new_app_req) |             BEGIN TRANSACTION; | ||||||
|  | 
 | ||||||
|  |                 LET $account = $account_input; | ||||||
|  |                 LET $new_app_req = $new_app_req_input; | ||||||
|  |                 LET $app_node = $app_node_input; | ||||||
|  |                 LET $package_url = $package_url_input; | ||||||
|  |                 LET $mr_enclave = $mr_enclave_input; | ||||||
|  |                 LET $hratls_pubkey = $hratls_pubkey_input; | ||||||
|  |                 LET $app_name = $app_name_input; | ||||||
|  | 
 | ||||||
|  |                 UPDATE $account SET balance -= {locked_nano}; | ||||||
|  |                 IF $account.balance < 0 {{ | ||||||
|  |                     THROW 'Insufficient funds.' | ||||||
|  |                 }}; | ||||||
|  |                 UPDATE $account SET tmp_locked += {locked_nano}; | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  |                 RELATE | ||||||
|  |                     $account | ||||||
|  |                     ->$new_app_req | ||||||
|  |                     ->$app_node | ||||||
|  |                 CONTENT {{ | ||||||
|  |                     created_at: time::now(), app_name: $app_name, package_url: $package_url, | ||||||
|  |                     mr_enclave: $mr_enclave, hratls_pubkey: $hratls_pubkey, ports: {:?}, memory_mb: {}, | ||||||
|  |                     vcpus: {}, disk_size_gb: {}, locked_nano: {locked_nano}, price_per_unit: {}, error: '', | ||||||
|  |                 }}; | ||||||
|  | 
 | ||||||
|  |             COMMIT TRANSACTION; | ||||||
|  |             ",
 | ||||||
|  |             self.ports, self.memory_mb, self.vcpus, self.disk_size_gb, self.price_per_unit); | ||||||
|  | 
 | ||||||
|  |         log::trace!("submit_new_app_req query: {tx_query}"); | ||||||
|  | 
 | ||||||
|  |         let mut query_resp = db | ||||||
|  |             .query(tx_query) | ||||||
|  |             .bind(("account_input", self.admin)) | ||||||
|  |             .bind(("new_app_req_input", self.id)) | ||||||
|  |             .bind(("app_node_input", self.app_node)) | ||||||
|  |             .bind(("package_url_input", self.package_url)) | ||||||
|  |             .bind(("mr_enclave_input", self.mr_enclave)) | ||||||
|  |             .bind(("hratls_pubkey_input", self.hratls_pubkey)) | ||||||
|  |             .bind(("app_name_input", self.app_name)) | ||||||
|  |             .await?; | ||||||
|  | 
 | ||||||
|  |         let query_err = query_resp.take_errors(); | ||||||
|  |         if !query_err.is_empty() { | ||||||
|  |             log::trace!("errors in submit_new_app_req: {query_err:?}"); | ||||||
|  |             let tx_fail_err_str = | ||||||
|  |                 String::from("The query was not executed due to a failed transaction"); | ||||||
|  | 
 | ||||||
|  |             if query_err.contains_key(&8) && query_err[&8].to_string() != tx_fail_err_str { | ||||||
|  |                 log::error!("Transaction error: {}", query_err[&8]); | ||||||
|  |                 return Err(Error::InsufficientFunds); | ||||||
|  |             } else { | ||||||
|  |                 log::error!("Unknown error in submit_new_app_req: {query_err:?}"); | ||||||
|  |                 return Err(Error::Unknown("submit_new_app_req".to_string())); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         Ok(()) | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -202,8 +295,6 @@ impl From<ActiveApp> for DeletedApp { | |||||||
|             disk_size_gb: value.disk_size_gb, |             disk_size_gb: value.disk_size_gb, | ||||||
|             created_at: value.created_at, |             created_at: value.created_at, | ||||||
|             price_per_unit: value.price_per_unit, |             price_per_unit: value.price_per_unit, | ||||||
|             locked_nano: value.locked_nano, |  | ||||||
|             collected_at: value.collected_at, |  | ||||||
|             mr_enclave: value.mr_enclave, |             mr_enclave: value.mr_enclave, | ||||||
|             package_url: value.package_url, |             package_url: value.package_url, | ||||||
|             hratls_pubkey: value.hratls_pubkey, |             hratls_pubkey: value.hratls_pubkey, | ||||||
| @ -221,22 +312,31 @@ impl ActiveApp { | |||||||
|         (self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64) |         (self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn activate(db: &Surreal<Client>, id: &str) -> Result<(), Error> { |     pub async fn activate( | ||||||
|         let new_app_req = match NewAppReq::get(db, id).await? { |         db: &Surreal<Client>, | ||||||
|  |         new_app_res: app_proto::NewAppRes, | ||||||
|  |     ) -> Result<(), Error> { | ||||||
|  |         let new_app_req = match NewAppReq::get(db, &new_app_res.uuid).await? { | ||||||
|             Some(r) => r, |             Some(r) => r, | ||||||
|             None => return Ok(()), |             None => return Ok(()), | ||||||
|         }; |         }; | ||||||
| 
 | 
 | ||||||
|  |         let mapped_ports = new_app_res | ||||||
|  |             .mapped_ports | ||||||
|  |             .into_iter() | ||||||
|  |             .map(|data| (data.host_port, data.guest_port)) | ||||||
|  |             .collect::<Vec<(u32, u32)>>(); | ||||||
|  | 
 | ||||||
|         let active_app = Self { |         let active_app = Self { | ||||||
|             id: RecordId::from((ACTIVE_APP, id)), |             id: RecordId::from((ACTIVE_APP, &new_app_res.uuid)), | ||||||
|             admin: new_app_req.admin, |             admin: new_app_req.admin, | ||||||
|             app_node: new_app_req.app_node, |             app_node: new_app_req.app_node, | ||||||
|             app_name: new_app_req.app_name, |             app_name: new_app_req.app_name, | ||||||
|             mapped_ports: vec![], |             mapped_ports, | ||||||
|             host_ipv4: String::new(), |             host_ipv4: String::new(), | ||||||
|             vcpus: new_app_req.vcpu, |             vcpus: new_app_req.vcpus, | ||||||
|             memory_mb: new_app_req.memory_mb, |             memory_mb: new_app_req.memory_mb, | ||||||
|             disk_size_gb: new_app_req.disk_mb, |             disk_size_gb: new_app_req.disk_size_gb, | ||||||
|             created_at: new_app_req.created_at.clone(), |             created_at: new_app_req.created_at.clone(), | ||||||
|             price_per_unit: new_app_req.price_per_unit, |             price_per_unit: new_app_req.price_per_unit, | ||||||
|             locked_nano: new_app_req.locked_nano, |             locked_nano: new_app_req.locked_nano, | ||||||
| @ -246,22 +346,63 @@ impl ActiveApp { | |||||||
|             hratls_pubkey: new_app_req.hratls_pubkey.clone(), |             hratls_pubkey: new_app_req.hratls_pubkey.clone(), | ||||||
|         }; |         }; | ||||||
| 
 | 
 | ||||||
|  |         let admin_account = active_app.admin.key().to_string(); | ||||||
|  |         let locked_nano = active_app.locked_nano; | ||||||
|  | 
 | ||||||
|         let _: Vec<ActiveApp> = db.insert(()).relation(active_app).await?; |         let _: Vec<ActiveApp> = db.insert(()).relation(active_app).await?; | ||||||
|  |         db.delete::<Option<NewAppReq>>((NEW_APP_REQ, &new_app_res.uuid)).await?; | ||||||
|  |         db.query(format!("UPDATE {ACCOUNT}:{admin_account} SET tmp_locked -= {locked_nano};")) | ||||||
|  |             .await?; | ||||||
| 
 | 
 | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<bool, Error> { |     pub async fn delete(db: &Surreal<Client>, admin: &str, id: &str) -> Result<(), Error> { | ||||||
|         let deleted_app: Option<Self> = db.delete((ACTIVE_APP, id)).await?; |         let mut app_del_resp = db | ||||||
|         if let Some(deleted_app) = deleted_app { |             .query( | ||||||
|             let deleted_app: DeletedApp = deleted_app.into(); |                 " | ||||||
|             let _: Vec<DeletedApp> = db.insert(DELETED_APP).relation(deleted_app).await?; |             BEGIN TRANSACTION; | ||||||
|             Ok(true) | 
 | ||||||
|         } else { |                 LET $active_app = $app_id_input; | ||||||
|             Ok(false) |                 LET $admin = $admin_input; | ||||||
|  | 
 | ||||||
|  |                 IF $active_app.in != $admin { | ||||||
|  |                     THROW 'Unauthorized' | ||||||
|  |                 }; | ||||||
|  | 
 | ||||||
|  |                 return fn::delete_app($active_app); | ||||||
|  | 
 | ||||||
|  |             COMMIT TRANSACTION; | ||||||
|  |             ",
 | ||||||
|  |             ) | ||||||
|  |             .bind(("app_id_input", RecordId::from((ACTIVE_APP, id)))) | ||||||
|  |             .bind(("admin_input", RecordId::from((ACCOUNT, admin)))) | ||||||
|  |             .await?; | ||||||
|  | 
 | ||||||
|  |         log::trace!("delete_app query response: {app_del_resp:?}"); | ||||||
|  | 
 | ||||||
|  |         let query_err = app_del_resp.take_errors(); | ||||||
|  |         if !query_err.is_empty() { | ||||||
|  |             log::trace!("errors in delete_app: {query_err:?}"); | ||||||
|  |             let tx_fail_err_str = | ||||||
|  |                 String::from("The query was not executed due to a failed transaction"); | ||||||
|  | 
 | ||||||
|  |             if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str { | ||||||
|  |                 log::error!("Unauthorized: {}", query_err[&2]); | ||||||
|  |                 return Err(Error::AccessDenied); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<ActiveApp, Error> { |         Ok(()) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub enum WrappedAppResp { | ||||||
|  |     NewAppRes(NewAppRes), | ||||||
|  |     Error(NewAppRes), | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl WrappedAppResp { | ||||||
|  |     pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<WrappedAppResp, Error> { | ||||||
|         let mut query_response = db |         let mut query_response = db | ||||||
|             .query(format!( |             .query(format!( | ||||||
|                 "live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};" |                 "live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};" | ||||||
| @ -272,14 +413,40 @@ impl ActiveApp { | |||||||
|         let mut error_stream = query_response.stream::<Notification<db::ErrorFromTable>>(0)?; |         let mut error_stream = query_response.stream::<Notification<db::ErrorFromTable>>(0)?; | ||||||
|         let mut active_app_stream = query_response.stream::<Notification<ActiveApp>>(1)?; |         let mut active_app_stream = query_response.stream::<Notification<ActiveApp>>(1)?; | ||||||
| 
 | 
 | ||||||
|         tokio::time::timeout(Duration::from_secs(30), async { |         let mut error = db | ||||||
|  |             .query(format!("select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};")) | ||||||
|  |             .await?; | ||||||
|  |         if let Some(error_on_newapp_req) = error.take::<Option<db::ErrorFromTable>>(0)? { | ||||||
|  |             if !error_on_newapp_req.error.is_empty() { | ||||||
|  |                 let app_daemon_err = NewAppRes { | ||||||
|  |                     uuid: app_id.to_string(), | ||||||
|  |                     error: error_on_newapp_req.error, | ||||||
|  |                     ..Default::default() | ||||||
|  |                 }; | ||||||
|  | 
 | ||||||
|  |                 return Ok(Self::Error(app_daemon_err)); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         if let Some(active_app) = db.select::<Option<ActiveApp>>((ACTIVE_APP, app_id)).await? { | ||||||
|  |             return Ok(Self::NewAppRes(active_app.into())); | ||||||
|  |         } | ||||||
|  |         log::trace!("listening for table: {NEW_APP_REQ}"); | ||||||
|  | 
 | ||||||
|  |         tokio::time::timeout(Duration::from_secs(APP_DAEMON_TIMEOUT), async { | ||||||
|             loop { |             loop { | ||||||
|                 tokio::select! { |                 tokio::select! { | ||||||
|                     Some(err_notif) = error_stream.next() =>{ |                     Some(err_notif) = error_stream.next() =>{ | ||||||
|                         match err_notif{ |                         match err_notif{ | ||||||
|                             Ok(err_notif) =>{ |                             Ok(err_notif) =>{ | ||||||
|                                 if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){ |                                 if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){ | ||||||
|                                     return Err(Error::NewAppDaemonResp(err_notif.data.error)) |                                 let app_daemon_err = NewAppRes { | ||||||
|  |                                         uuid: app_id.to_string(), | ||||||
|  |                                         error: err_notif.data.error, | ||||||
|  |                                         ..Default::default() | ||||||
|  |                                     }; | ||||||
|  | 
 | ||||||
|  |                                     return Ok(Self::Error(app_daemon_err)); | ||||||
|                                 } |                                 } | ||||||
|                             } |                             } | ||||||
|                             Err(e) => return Err(e.into()) |                             Err(e) => return Err(e.into()) | ||||||
| @ -291,7 +458,7 @@ impl ActiveApp { | |||||||
|                             Ok(active_app_notif) =>{ |                             Ok(active_app_notif) =>{ | ||||||
|                                 if active_app_notif.action == surrealdb::Action::Create { |                                 if active_app_notif.action == surrealdb::Action::Create { | ||||||
|                                     let _: Option<NewAppReq> = db.delete((NEW_APP_REQ, app_id)).await?; |                                     let _: Option<NewAppReq> = db.delete((NEW_APP_REQ, app_id)).await?; | ||||||
|                                     return Ok(active_app_notif.data); |                                     return Ok(Self::NewAppRes(active_app_notif.data.into())); | ||||||
|                                 } |                                 } | ||||||
|                             } |                             } | ||||||
|                             Err(e) => return Err(e.into()) |                             Err(e) => return Err(e.into()) | ||||||
| @ -511,9 +678,17 @@ pub struct DeletedApp { | |||||||
|     pub disk_size_gb: u32, |     pub disk_size_gb: u32, | ||||||
|     pub created_at: Datetime, |     pub created_at: Datetime, | ||||||
|     pub price_per_unit: u64, |     pub price_per_unit: u64, | ||||||
|     pub locked_nano: u64, |  | ||||||
|     pub collected_at: Datetime, |  | ||||||
|     pub mr_enclave: String, |     pub mr_enclave: String, | ||||||
|     pub package_url: String, |     pub package_url: String, | ||||||
|     pub hratls_pubkey: String, |     pub hratls_pubkey: String, | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | impl DeletedApp { | ||||||
|  |     pub async fn list_by_node(db: &Surreal<Client>, node_pubkey: &str) -> Result<Vec<Self>, Error> { | ||||||
|  |         let mut result = db | ||||||
|  |             .query(format!("select * from {DELETED_APP} where out = {APP_NODE}:{node_pubkey};")) | ||||||
|  |             .await?; | ||||||
|  |         let contracts: Vec<Self> = result.take(0)?; | ||||||
|  |         Ok(contracts) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | |||||||
| @ -31,8 +31,6 @@ pub enum Error { | |||||||
|     UnknownTable(String), |     UnknownTable(String), | ||||||
|     #[error("Daemon channel got closed: {0}")] |     #[error("Daemon channel got closed: {0}")] | ||||||
|     AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>), |     AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>), | ||||||
|     #[error("AppDaemon Error {0}")] |  | ||||||
|     NewAppDaemonResp(String), |  | ||||||
|     #[error("Minimum escrow amount is {MIN_ESCROW}")] |     #[error("Minimum escrow amount is {MIN_ESCROW}")] | ||||||
|     MinimalEscrow, |     MinimalEscrow, | ||||||
|     #[error("Insufficient funds, deposit more tokens")] |     #[error("Insufficient funds, deposit more tokens")] | ||||||
| @ -186,7 +184,7 @@ pub async fn live_appnode_msgs< | |||||||
|             } |             } | ||||||
|             Err(e) => { |             Err(e) => { | ||||||
|                 log::error!( |                 log::error!( | ||||||
|                     "live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" |                     "live_appnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" | ||||||
|                 ); |                 ); | ||||||
|                 return Err(Error::from(e)); |                 return Err(Error::from(e)); | ||||||
|             } |             } | ||||||
| @ -196,7 +194,7 @@ pub async fn live_appnode_msgs< | |||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Deserialize)] | #[derive(Deserialize, Debug, Clone)] | ||||||
| pub struct ErrorFromTable { | pub struct ErrorFromTable { | ||||||
|     pub error: String, |     pub error: String, | ||||||
| } | } | ||||||
|  | |||||||
							
								
								
									
										73
									
								
								src/db/vm.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										73
									
								
								src/db/vm.rs
									
									
									
									
									
								
							| @ -3,7 +3,8 @@ use std::time::Duration; | |||||||
| 
 | 
 | ||||||
| use super::Error; | use super::Error; | ||||||
| use crate::constants::{ | use crate::constants::{ | ||||||
|     ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT, |     ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_DAEMON_TIMEOUT, VM_NODE, | ||||||
|  |     VM_UPDATE_EVENT, | ||||||
| }; | }; | ||||||
| use crate::db::{Account, ErrorFromTable, Report}; | use crate::db::{Account, ErrorFromTable, Report}; | ||||||
| use crate::old_brain; | use crate::old_brain; | ||||||
| @ -329,18 +330,22 @@ impl WrappedMeasurement { | |||||||
|             UPDATE_VM_REQ => UPDATE_VM_REQ, |             UPDATE_VM_REQ => UPDATE_VM_REQ, | ||||||
|             _ => NEW_VM_REQ, |             _ => NEW_VM_REQ, | ||||||
|         }; |         }; | ||||||
| 
 |         let mut resp = db | ||||||
|         let mut args_stream = db |             .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) | ||||||
|             .query(format!( |             .query(format!( | ||||||
|                 "live select * from measurement_args where id = measurement_args:{vm_id};" |                 "live select * from measurement_args where id = measurement_args:{vm_id};" | ||||||
|             )) |             )) | ||||||
|             .await? |             .await?; | ||||||
|             .stream::<Notification<vm_proto::MeasurementArgs>>(0)?; |         let mut error_stream = resp.stream::<Notification<ErrorFromTable>>(0)?; | ||||||
|  |         let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?; | ||||||
| 
 | 
 | ||||||
|         let mut error_stream = db |         let mut error = | ||||||
|             .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) |             db.query(format!("select error from {table} where id = {NEW_VM_REQ}:{vm_id}")).await?; | ||||||
|             .await? |         if let Some(error_on_newvm_req) = error.take::<Option<ErrorFromTable>>(0)? { | ||||||
|             .stream::<Notification<ErrorFromTable>>(0)?; |             if !error_on_newvm_req.error.is_empty() { | ||||||
|  |                 return Ok(Self::Error(vm_id.to_string(), error_on_newvm_req.error)); | ||||||
|  |             } | ||||||
|  |         } | ||||||
| 
 | 
 | ||||||
|         let args: Option<vm_proto::MeasurementArgs> = |         let args: Option<vm_proto::MeasurementArgs> = | ||||||
|             db.delete(("measurement_args", vm_id)).await?; |             db.delete(("measurement_args", vm_id)).await?; | ||||||
| @ -349,7 +354,7 @@ impl WrappedMeasurement { | |||||||
|         } |         } | ||||||
|         log::trace!("listening for table: {table}"); |         log::trace!("listening for table: {table}"); | ||||||
| 
 | 
 | ||||||
|         tokio::time::timeout(Duration::from_secs(10), async { |         tokio::time::timeout(Duration::from_secs(VM_DAEMON_TIMEOUT), async { | ||||||
|             loop { |             loop { | ||||||
|                 tokio::select! { |                 tokio::select! { | ||||||
|                     error_notification = error_stream.next() => { |                     error_notification = error_stream.next() => { | ||||||
| @ -358,10 +363,7 @@ impl WrappedMeasurement { | |||||||
|                                 Ok(err_notif) => { |                                 Ok(err_notif) => { | ||||||
|                                     if err_notif.action == surrealdb::Action::Update |                                     if err_notif.action == surrealdb::Action::Update | ||||||
|                                     && !err_notif.data.error.is_empty() { |                                     && !err_notif.data.error.is_empty() { | ||||||
|                                         return Ok::<WrappedMeasurement, Error>( |                                         return Ok(Self::Error(vm_id.to_string(), err_notif.data.error)); | ||||||
|                                             Self::Error(vm_id.to_string(), |  | ||||||
|                                             err_notif.data.error) |  | ||||||
|                                         ); |  | ||||||
|                                     }; |                                     }; | ||||||
|                                 }, |                                 }, | ||||||
|                                 Err(e) => return Err(e.into()), |                                 Err(e) => return Err(e.into()), | ||||||
| @ -544,16 +546,43 @@ impl ActiveVm { | |||||||
|         Ok(false) |         Ok(false) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<bool, Error> { |     pub async fn delete(db: &Surreal<Client>, admin: &str, id: &str) -> Result<(), Error> { | ||||||
|         let deleted_vm: Option<Self> = db.delete((ACTIVE_VM, id)).await?; |         let mut vm_del_resp = db | ||||||
|         if let Some(deleted_vm) = deleted_vm { |             .query( | ||||||
|             let deleted_vm: DeletedVm = deleted_vm.into(); |                 " | ||||||
|             let _: Vec<DeletedVm> = db.insert(DELETED_VM).relation(deleted_vm).await?; |             BEGIN TRANSACTION; | ||||||
|             Ok(true) | 
 | ||||||
|         } else { |                 LET $active_vm = $vm_id_input; | ||||||
|             Ok(false) |                 LET $admin = $admin_input; | ||||||
|  | 
 | ||||||
|  |                 IF $active_vm.in != $admin { | ||||||
|  |                     THROW 'Unauthorized' | ||||||
|  |                 }; | ||||||
|  | 
 | ||||||
|  |                 return fn::delete_vm($active_vm); | ||||||
|  | 
 | ||||||
|  |             COMMIT TRANSACTION; | ||||||
|  |             ",
 | ||||||
|  |             ) | ||||||
|  |             .bind(("vm_id_input", RecordId::from((ACTIVE_VM, id)))) | ||||||
|  |             .bind(("admin_input", RecordId::from((ACCOUNT, admin)))) | ||||||
|  |             .await?; | ||||||
|  | 
 | ||||||
|  |         log::trace!("delete_vm query response: {vm_del_resp:?}"); | ||||||
|  | 
 | ||||||
|  |         let query_err = vm_del_resp.take_errors(); | ||||||
|  |         if !query_err.is_empty() { | ||||||
|  |             log::trace!("errors in delete_vm: {query_err:?}"); | ||||||
|  |             let tx_fail_err_str = | ||||||
|  |                 String::from("The query was not executed due to a failed transaction"); | ||||||
|  | 
 | ||||||
|  |             if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str { | ||||||
|  |                 log::error!("Unauthorized: {}", query_err[&2]); | ||||||
|  |                 return Err(Error::AccessDenied); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn extend_time( |     pub async fn extend_time( | ||||||
|         db: &Surreal<Client>, |         db: &Surreal<Client>, | ||||||
|  | |||||||
| @ -15,7 +15,7 @@ use surrealdb::Surreal; | |||||||
| use tokio::sync::mpsc; | use tokio::sync::mpsc; | ||||||
| use tokio_stream::wrappers::ReceiverStream; | use tokio_stream::wrappers::ReceiverStream; | ||||||
| use tokio_stream::{Stream, StreamExt}; | use tokio_stream::{Stream, StreamExt}; | ||||||
| use tonic::{Response, Status, Streaming}; | use tonic::{Request, Response, Status, Streaming}; | ||||||
| 
 | 
 | ||||||
| pub struct AppDaemonServer { | pub struct AppDaemonServer { | ||||||
|     pub db: Arc<Surreal<Client>>, |     pub db: Arc<Surreal<Client>>, | ||||||
| @ -29,13 +29,13 @@ impl AppDaemonServer { | |||||||
| 
 | 
 | ||||||
| #[tonic::async_trait] | #[tonic::async_trait] | ||||||
| impl BrainAppDaemon for AppDaemonServer { | impl BrainAppDaemon for AppDaemonServer { | ||||||
|     type RegisterAppNodeStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>; |     type RegisterAppNodeStream = Pin<Box<dyn Stream<Item = Result<DelAppReq, Status>> + Send>>; | ||||||
|     type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessageApp, Status>> + Send>>; |     type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessageApp, Status>> + Send>>; | ||||||
| 
 | 
 | ||||||
|     async fn register_app_node( |     async fn register_app_node( | ||||||
|         &self, |         &self, | ||||||
|         req: tonic::Request<RegisterAppNodeReq>, |         req: Request<RegisterAppNodeReq>, | ||||||
|     ) -> Result<tonic::Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> { |     ) -> Result<Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         info!("Starting app_node registration process for {:?}", req); |         info!("Starting app_node registration process for {:?}", req); | ||||||
| 
 | 
 | ||||||
| @ -59,11 +59,11 @@ impl BrainAppDaemon for AppDaemonServer { | |||||||
|         app_node.register(&self.db).await?; |         app_node.register(&self.db).await?; | ||||||
|         info!("Sending existing contracts to {}", req.node_pubkey); |         info!("Sending existing contracts to {}", req.node_pubkey); | ||||||
| 
 | 
 | ||||||
|         let contracts = db::ActiveAppWithNode::list_by_node(&self.db, &req.node_pubkey).await?; |         let deleted_apps = db::DeletedApp::list_by_node(&self.db, &req.node_pubkey).await?; | ||||||
|         let (tx, rx) = mpsc::channel(6); |         let (tx, rx) = mpsc::channel(6); | ||||||
|         tokio::spawn(async move { |         tokio::spawn(async move { | ||||||
|             for contract in contracts { |             for deleted_app in deleted_apps { | ||||||
|                 let _ = tx.send(Ok(contract.into())).await; |                 let _ = tx.send(Ok(deleted_app.into())).await; | ||||||
|             } |             } | ||||||
|         }); |         }); | ||||||
| 
 | 
 | ||||||
| @ -73,8 +73,8 @@ impl BrainAppDaemon for AppDaemonServer { | |||||||
| 
 | 
 | ||||||
|     async fn brain_messages( |     async fn brain_messages( | ||||||
|         &self, |         &self, | ||||||
|         req: tonic::Request<DaemonAuth>, |         req: Request<DaemonAuth>, | ||||||
|     ) -> Result<tonic::Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> { |     ) -> Result<Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> { | ||||||
|         let auth = req.into_inner(); |         let auth = req.into_inner(); | ||||||
|         let pubkey = auth.pubkey.clone(); |         let pubkey = auth.pubkey.clone(); | ||||||
|         check_sig_from_parts( |         check_sig_from_parts( | ||||||
| @ -110,8 +110,8 @@ impl BrainAppDaemon for AppDaemonServer { | |||||||
| 
 | 
 | ||||||
|     async fn daemon_messages( |     async fn daemon_messages( | ||||||
|         &self, |         &self, | ||||||
|         req: tonic::Request<Streaming<DaemonMessageApp>>, |         req: Request<Streaming<DaemonMessageApp>>, | ||||||
|     ) -> Result<tonic::Response<Empty>, Status> { |     ) -> Result<Response<Empty>, Status> { | ||||||
|         let mut req_stream = req.into_inner(); |         let mut req_stream = req.into_inner(); | ||||||
|         let pubkey: String; |         let pubkey: String; | ||||||
|         if let Some(Ok(msg)) = req_stream.next().await { |         if let Some(Ok(msg)) = req_stream.next().await { | ||||||
| @ -145,7 +145,7 @@ impl BrainAppDaemon for AppDaemonServer { | |||||||
|                             ) |                             ) | ||||||
|                             .await?; |                             .await?; | ||||||
|                         } else { |                         } else { | ||||||
|                             db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?; |                             db::ActiveApp::activate(&self.db, new_app_resp).await?; | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
|                     Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => { |                     Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => { | ||||||
| @ -180,33 +180,40 @@ impl BrainAppCli for AppCliServer { | |||||||
|     type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>; |     type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>; | ||||||
|     type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>; |     type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>; | ||||||
| 
 | 
 | ||||||
|     async fn new_app( |     async fn new_app(&self, req: Request<NewAppReq>) -> Result<Response<NewAppRes>, Status> { | ||||||
|         &self, |  | ||||||
|         req: tonic::Request<detee_shared::app_proto::NewAppReq>, |  | ||||||
|     ) -> Result<tonic::Response<detee_shared::app_proto::NewAppRes>, Status> { |  | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         info!("deploy_app process starting for {:?}", req); |         // TODO: make it atleast 1 hour
 | ||||||
| 
 |         if req.locked_nano < 100 { | ||||||
|  |             log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); | ||||||
|  |             return Err(Status::unknown("lock atleaset 100 nano lps")); | ||||||
|  |         } | ||||||
|  |         info!("new_app process starting for {:?}", req); | ||||||
|         if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { |         if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { | ||||||
|             return Err(Status::permission_denied("This operator banned you. What did you do?")); |             return Err(Status::permission_denied("This operator banned you. What did you do?")); | ||||||
|         } |         } | ||||||
|         let new_app_req: db::NewAppReq = req.into(); | 
 | ||||||
|         let id = new_app_req.id.to_string(); |         let db_req: db::NewAppReq = req.into(); | ||||||
|  |         let id = db_req.id.key().to_string(); | ||||||
| 
 | 
 | ||||||
|         let (tx, rx) = tokio::sync::oneshot::channel(); |         let (tx, rx) = tokio::sync::oneshot::channel(); | ||||||
|         let db = self.db.clone(); |         let db = self.db.clone(); | ||||||
|         tokio::spawn(async move { |         tokio::spawn(async move { | ||||||
|             let _ = tx.send(db::ActiveApp::listen(&db, &id).await); |             let _ = tx.send(db::WrappedAppResp::listen(&db, &id).await); | ||||||
|         }); |         }); | ||||||
| 
 |         db_req.submit(&self.db).await?; | ||||||
|         new_app_req.submit(&self.db).await?; |  | ||||||
| 
 | 
 | ||||||
|         match rx.await { |         match rx.await { | ||||||
|  |             Ok(Ok(db::WrappedAppResp::NewAppRes(new_app_resp))) => Ok(Response::new(new_app_resp)), | ||||||
|  |             Ok(Ok(db::WrappedAppResp::Error(err))) => Ok(Response::new(err)), | ||||||
|             Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( |             Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( | ||||||
|                 "Network timeout. Please try again later or contact the DeTEE devs team.", |                 "Network timeout. Please try again later or contact the DeTEE devs team.", | ||||||
|             )), |             )), | ||||||
|             Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)), |             Ok(Err(e)) => { | ||||||
|             Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())), |                 log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); | ||||||
|  |                 Err(Status::unknown( | ||||||
|  |                     "Unknown error. Please try again or contact the DeTEE devs team.", | ||||||
|  |                 )) | ||||||
|  |             } | ||||||
|             Err(e) => { |             Err(e) => { | ||||||
|                 log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); |                 log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); | ||||||
|                 Err(Status::unknown( |                 Err(Status::unknown( | ||||||
| @ -216,22 +223,25 @@ impl BrainAppCli for AppCliServer { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     async fn delete_app( |     async fn delete_app(&self, req: Request<DelAppReq>) -> Result<Response<Empty>, Status> { | ||||||
|         &self, |  | ||||||
|         req: tonic::Request<DelAppReq>, |  | ||||||
|     ) -> Result<tonic::Response<Empty>, Status> { |  | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         info!("delete_app process starting for {:?}", req); |         info!("delete_app process starting for {:?}", req); | ||||||
|         match ActiveApp::delete(&self.db, &req.uuid).await? { |         match ActiveApp::delete(&self.db, &req.admin_pubkey, &req.uuid).await { | ||||||
|             true => Ok(Response::new(Empty {})), |             Ok(()) => Ok(Response::new(Empty {})), | ||||||
|             false => Err(Status::not_found(format!("Could not find App contract {}", &req.uuid))), |             Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")), | ||||||
|  |             Err(e) => { | ||||||
|  |                 log::error!("Error deleting app contract {}: {e}", &req.uuid); | ||||||
|  |                 Err(Status::unknown( | ||||||
|  |                     "Unknown error. Please try again or contact the DeTEE devs team.", | ||||||
|  |                 )) | ||||||
|  |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     async fn list_app_contracts( |     async fn list_app_contracts( | ||||||
|         &self, |         &self, | ||||||
|         req: tonic::Request<ListAppContractsReq>, |         req: Request<ListAppContractsReq>, | ||||||
|     ) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> { |     ) -> Result<Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         info!("list_app_contracts process starting for {:?}", req); |         info!("list_app_contracts process starting for {:?}", req); | ||||||
| 
 | 
 | ||||||
| @ -270,8 +280,8 @@ impl BrainAppCli for AppCliServer { | |||||||
| 
 | 
 | ||||||
|     async fn list_app_nodes( |     async fn list_app_nodes( | ||||||
|         &self, |         &self, | ||||||
|         req: tonic::Request<AppNodeFilters>, |         req: Request<AppNodeFilters>, | ||||||
|     ) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> { |     ) -> Result<Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         info!("list_app_nodes process starting for {:?}", req); |         info!("list_app_nodes process starting for {:?}", req); | ||||||
|         let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?; |         let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?; | ||||||
| @ -287,8 +297,8 @@ impl BrainAppCli for AppCliServer { | |||||||
| 
 | 
 | ||||||
|     async fn get_one_app_node( |     async fn get_one_app_node( | ||||||
|         &self, |         &self, | ||||||
|         req: tonic::Request<AppNodeFilters>, |         req: Request<AppNodeFilters>, | ||||||
|     ) -> Result<tonic::Response<AppNodeListResp>, Status> { |     ) -> Result<Response<AppNodeListResp>, Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         info!("get_one_app_node process starting for {:?}", req); |         info!("get_one_app_node process starting for {:?}", req); | ||||||
|         let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true) |         let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true) | ||||||
|  | |||||||
| @ -274,8 +274,8 @@ impl From<db::ActiveAppWithNode> for AppContract { | |||||||
|             public_ipv4: value.host_ipv4, |             public_ipv4: value.host_ipv4, | ||||||
|             resource: Some(AppResource { |             resource: Some(AppResource { | ||||||
|                 memory_mb: value.memory_mb, |                 memory_mb: value.memory_mb, | ||||||
|                 disk_mb: value.disk_size_gb, |                 disk_size_gb: value.disk_size_gb, | ||||||
|                 vcpu: value.vcpus, |                 vcpus: value.vcpus, | ||||||
|                 ports: value.mapped_ports.iter().map(|(_, g)| *g).collect(), |                 ports: value.mapped_ports.iter().map(|(_, g)| *g).collect(), | ||||||
|             }), |             }), | ||||||
|             mapped_ports: value |             mapped_ports: value | ||||||
| @ -316,8 +316,8 @@ impl From<NewAppReq> for db::NewAppReq { | |||||||
|             hratls_pubkey: val.hratls_pubkey, |             hratls_pubkey: val.hratls_pubkey, | ||||||
|             ports: resource.ports, |             ports: resource.ports, | ||||||
|             memory_mb: resource.memory_mb, |             memory_mb: resource.memory_mb, | ||||||
|             vcpu: resource.vcpu, |             vcpus: resource.vcpus, | ||||||
|             disk_mb: resource.disk_mb, |             disk_size_gb: resource.disk_size_gb, | ||||||
|             locked_nano: val.locked_nano, |             locked_nano: val.locked_nano, | ||||||
|             price_per_unit: val.price_per_unit, |             price_per_unit: val.price_per_unit, | ||||||
|             error: String::new(), |             error: String::new(), | ||||||
| @ -329,9 +329,9 @@ impl From<NewAppReq> for db::NewAppReq { | |||||||
| impl From<db::NewAppReq> for NewAppReq { | impl From<db::NewAppReq> for NewAppReq { | ||||||
|     fn from(value: db::NewAppReq) -> Self { |     fn from(value: db::NewAppReq) -> Self { | ||||||
|         let resource = AppResource { |         let resource = AppResource { | ||||||
|             vcpu: value.vcpu, |             vcpus: value.vcpus, | ||||||
|             memory_mb: value.memory_mb, |             memory_mb: value.memory_mb, | ||||||
|             disk_mb: value.disk_mb, |             disk_size_gb: value.disk_size_gb, | ||||||
|             ports: value.ports, |             ports: value.ports, | ||||||
|         }; |         }; | ||||||
|         let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default()); |         let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default()); | ||||||
|  | |||||||
| @ -222,6 +222,11 @@ impl BrainVmCli for VmCliServer { | |||||||
| 
 | 
 | ||||||
|     async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> { |     async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|  |         // TODO: make it atleast 1 hour
 | ||||||
|  |         if req.locked_nano < 100 { | ||||||
|  |             log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); | ||||||
|  |             return Err(Status::unknown("lock atleaset 100 nano lps")); | ||||||
|  |         } | ||||||
|         info!("New VM requested via CLI: {req:?}"); |         info!("New VM requested via CLI: {req:?}"); | ||||||
|         if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { |         if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { | ||||||
|             return Err(Status::permission_denied("This operator banned you. What did you do?")); |             return Err(Status::permission_denied("This operator banned you. What did you do?")); | ||||||
| @ -337,9 +342,15 @@ impl BrainVmCli for VmCliServer { | |||||||
| 
 | 
 | ||||||
|     async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> { |     async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         match db::ActiveVm::delete(&self.db, &req.uuid).await? { |         match db::ActiveVm::delete(&self.db, &req.admin_pubkey, &req.uuid).await { | ||||||
|             true => Ok(Response::new(Empty {})), |             Ok(()) => Ok(Response::new(Empty {})), | ||||||
|             false => Err(Status::not_found(format!("Could not find VM contract {}", &req.uuid))), |             Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")), | ||||||
|  |             Err(e) => { | ||||||
|  |                 log::error!("Error deleting VM contract {}: {e}", &req.uuid); | ||||||
|  |                 Err(Status::unknown( | ||||||
|  |                     "Unknown error. Please try again or contact the DeTEE devs team.", | ||||||
|  |                 )) | ||||||
|  |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -22,7 +22,7 @@ DEFINE FUNCTION OVERWRITE fn::delete_vm( | |||||||
|         UPDATE $account SET balance += $vm.locked_nano; |         UPDATE $account SET balance += $vm.locked_nano; | ||||||
|     }; |     }; | ||||||
|     INSERT RELATION INTO deleted_vm ( $deleted_vm ); |     INSERT RELATION INTO deleted_vm ( $deleted_vm ); | ||||||
|     DELETE $vm.id; |     RETURN DELETE $vm.id RETURN BEFORE; | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| DEFINE FUNCTION OVERWRITE fn::app_price_per_minute( | DEFINE FUNCTION OVERWRITE fn::app_price_per_minute( | ||||||
| @ -35,3 +35,20 @@ DEFINE FUNCTION OVERWRITE fn::app_price_per_minute( | |||||||
|         ($app.disk_size_gb / 10)) |         ($app.disk_size_gb / 10)) | ||||||
|     * $app.price_per_unit; |     * $app.price_per_unit; | ||||||
| }; | }; | ||||||
|  | 
 | ||||||
|  | DEFINE FUNCTION OVERWRITE fn::delete_app( | ||||||
|  | 	$app_id: record | ||||||
|  | ) { | ||||||
|  |     LET $app = (select * from $app_id)[0]; | ||||||
|  |     LET $account = $app.in; | ||||||
|  |     LET $deleted_app = $app.patch([{ | ||||||
|  | 		'op': 'replace', | ||||||
|  | 		'path': 'id', | ||||||
|  | 		'value': type::record("deleted_app:" + record::id($app.id)) | ||||||
|  |     }]); | ||||||
|  |     IF $app.locked_nano >= 0 { | ||||||
|  |         UPDATE $account SET balance += $app.locked_nano; | ||||||
|  |     }; | ||||||
|  |     INSERT RELATION INTO deleted_app ( $deleted_app ); | ||||||
|  |     RETURN DELETE $app.id RETURN BEFORE; | ||||||
|  | }; | ||||||
| @ -99,8 +99,8 @@ DEFINE FIELD mr_enclave ON TABLE new_app_req TYPE string; | |||||||
| DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string; | DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string; | ||||||
| DEFINE FIELD ports ON TABLE new_app_req TYPE array<int>; | DEFINE FIELD ports ON TABLE new_app_req TYPE array<int>; | ||||||
| DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int; | DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int; | ||||||
| DEFINE FIELD vcpu ON TABLE new_app_req TYPE int; | DEFINE FIELD vcpus ON TABLE new_app_req TYPE int; | ||||||
| DEFINE FIELD disk_mb ON TABLE new_app_req TYPE int; | DEFINE FIELD disk_size_gb ON TABLE new_app_req TYPE int; | ||||||
| DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int; | DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int; | ||||||
| DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int; | DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int; | ||||||
| DEFINE FIELD error ON TABLE new_app_req TYPE string; | DEFINE FIELD error ON TABLE new_app_req TYPE string; | ||||||
|  | |||||||
| @ -27,3 +27,5 @@ FOR $contract IN (select * from active_vm fetch out) { | |||||||
|         fn::delete_vm($contract.id); |         fn::delete_vm($contract.id); | ||||||
|     }; |     }; | ||||||
| }; | }; | ||||||
|  | 
 | ||||||
|  | -- TODO: implement for active_app | ||||||
							
								
								
									
										31
									
								
								tests/common/app_cli_utils.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										31
									
								
								tests/common/app_cli_utils.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,31 @@ | |||||||
|  | use anyhow::Result; | ||||||
|  | use detee_shared::app_proto::{ | ||||||
|  |     brain_app_cli_client::BrainAppCliClient, AppResource, NewAppReq, NewAppRes, | ||||||
|  | }; | ||||||
|  | use tonic::transport::Channel; | ||||||
|  | 
 | ||||||
|  | use crate::common::test_utils::Key; | ||||||
|  | 
 | ||||||
|  | pub async fn create_new_app( | ||||||
|  |     key: &Key, | ||||||
|  |     node_pubkey: &str, | ||||||
|  |     brain_channel: &Channel, | ||||||
|  | ) -> Result<NewAppRes> { | ||||||
|  |     let new_app_req = NewAppReq { | ||||||
|  |         admin_pubkey: key.pubkey.clone(), | ||||||
|  |         node_pubkey: node_pubkey.to_string(), | ||||||
|  |         price_per_unit: 1200, | ||||||
|  |         resource: Some(AppResource { ports: vec![8080, 8081], ..Default::default() }), | ||||||
|  |         locked_nano: 100, | ||||||
|  |         ..Default::default() | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); | ||||||
|  |     let new_app_resp = | ||||||
|  |         client_app_cli.new_app(key.sign_request(new_app_req.clone())?).await?.into_inner(); | ||||||
|  | 
 | ||||||
|  |     assert!(new_app_resp.error.is_empty()); | ||||||
|  |     assert!(new_app_resp.uuid.len() == 40); | ||||||
|  | 
 | ||||||
|  |     Ok(new_app_resp) | ||||||
|  | } | ||||||
							
								
								
									
										144
									
								
								tests/common/app_daemon_utils.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										144
									
								
								tests/common/app_daemon_utils.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,144 @@ | |||||||
|  | use anyhow::Result; | ||||||
|  | use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient; | ||||||
|  | use detee_shared::app_proto::{self, NewAppRes, RegisterAppNodeReq}; | ||||||
|  | use detee_shared::common_proto::MappedPort; | ||||||
|  | use futures::StreamExt; | ||||||
|  | use tokio::sync::mpsc; | ||||||
|  | use tokio_stream::wrappers::ReceiverStream; | ||||||
|  | use tonic::transport::Channel; | ||||||
|  | 
 | ||||||
|  | use super::test_utils::Key; | ||||||
|  | 
 | ||||||
|  | pub async fn mock_app_daemon( | ||||||
|  |     brain_channel: &Channel, | ||||||
|  |     daemon_error: Option<String>, | ||||||
|  | ) -> Result<String> { | ||||||
|  |     let mut daemon_client = BrainAppDaemonClient::new(brain_channel.clone()); | ||||||
|  |     let daemon_key = Key::new(); | ||||||
|  | 
 | ||||||
|  |     register_app_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?; | ||||||
|  | 
 | ||||||
|  |     let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); | ||||||
|  | 
 | ||||||
|  |     tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); | ||||||
|  | 
 | ||||||
|  |     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); | ||||||
|  |     tokio::spawn(daemon_msg_sender( | ||||||
|  |         daemon_client.clone(), | ||||||
|  |         daemon_key.clone(), | ||||||
|  |         daemon_msg_tx.clone(), | ||||||
|  |         rx, | ||||||
|  |     )); | ||||||
|  | 
 | ||||||
|  |     tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error)); | ||||||
|  | 
 | ||||||
|  |     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||||||
|  | 
 | ||||||
|  |     Ok(daemon_key.pubkey) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn register_app_node( | ||||||
|  |     client: &mut BrainAppDaemonClient<Channel>, | ||||||
|  |     key: &Key, | ||||||
|  |     operator_wallet: &str, | ||||||
|  | ) -> Result<Vec<app_proto::DelAppReq>> { | ||||||
|  |     log::info!("Registering app_node: {}", key.pubkey); | ||||||
|  |     let node_pubkey = key.pubkey.clone(); | ||||||
|  | 
 | ||||||
|  |     let req = RegisterAppNodeReq { | ||||||
|  |         node_pubkey, | ||||||
|  |         operator_wallet: operator_wallet.to_string(), | ||||||
|  |         main_ip: String::from("185.243.218.213"), | ||||||
|  |         city: String::from("Oslo"), | ||||||
|  |         country: String::from("Norway"), | ||||||
|  |         region: String::from("EU"), | ||||||
|  |         price: 1200, | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     let mut grpc_stream = client.register_app_node(key.sign_request(req)?).await?.into_inner(); | ||||||
|  | 
 | ||||||
|  |     let mut deleted_app_reqs = Vec::new(); | ||||||
|  |     while let Some(stream_update) = grpc_stream.next().await { | ||||||
|  |         match stream_update { | ||||||
|  |             Ok(del_app_req) => { | ||||||
|  |                 deleted_app_reqs.push(del_app_req); | ||||||
|  |             } | ||||||
|  |             Err(e) => { | ||||||
|  |                 panic!("Received error instead of deleted_app_reqs: {e:?}"); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     Ok(deleted_app_reqs) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn daemon_listener( | ||||||
|  |     mut client: BrainAppDaemonClient<Channel>, | ||||||
|  |     key: Key, | ||||||
|  |     tx: mpsc::Sender<app_proto::BrainMessageApp>, | ||||||
|  | ) -> Result<()> { | ||||||
|  |     log::info!("listening app_daemon"); | ||||||
|  |     let mut grpc_stream = | ||||||
|  |         client.brain_messages(key.sign_stream_auth_app(vec![])?).await?.into_inner(); | ||||||
|  | 
 | ||||||
|  |     while let Some(Ok(stream_update)) = grpc_stream.next().await { | ||||||
|  |         log::info!("app deamon got notified: {:?}", &stream_update); | ||||||
|  |         let _ = tx.send(stream_update).await; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     Ok(()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn daemon_msg_sender( | ||||||
|  |     mut client: BrainAppDaemonClient<Channel>, | ||||||
|  |     key: Key, | ||||||
|  |     tx: mpsc::Sender<app_proto::DaemonMessageApp>, | ||||||
|  |     rx: mpsc::Receiver<app_proto::DaemonMessageApp>, | ||||||
|  | ) -> Result<()> { | ||||||
|  |     log::info!("sender app_daemon"); | ||||||
|  |     let rx_stream = ReceiverStream::new(rx); | ||||||
|  |     tx.send(app_proto::DaemonMessageApp { | ||||||
|  |         msg: Some(app_proto::daemon_message_app::Msg::Auth(key.sign_stream_auth_app(vec![])?)), | ||||||
|  |     }) | ||||||
|  |     .await?; | ||||||
|  |     client.daemon_messages(rx_stream).await?; | ||||||
|  |     Ok(()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn daemon_engine( | ||||||
|  |     tx: mpsc::Sender<app_proto::DaemonMessageApp>, | ||||||
|  |     mut rx: mpsc::Receiver<app_proto::BrainMessageApp>, | ||||||
|  |     new_app_err: Option<String>, | ||||||
|  | ) -> Result<()> { | ||||||
|  |     log::info!("daemon engine app_daemon"); | ||||||
|  |     while let Some(brain_msg) = rx.recv().await { | ||||||
|  |         match brain_msg.msg { | ||||||
|  |             Some(app_proto::brain_message_app::Msg::NewAppReq(new_app_req)) => { | ||||||
|  |                 let exposed_ports = | ||||||
|  |                     [vec![34500], new_app_req.resource.unwrap_or_default().ports].concat(); | ||||||
|  | 
 | ||||||
|  |                 let mapped_ports = exposed_ports | ||||||
|  |                     .into_iter() | ||||||
|  |                     .map(|port| MappedPort { host_port: port, guest_port: port }) | ||||||
|  |                     .collect::<Vec<MappedPort>>(); | ||||||
|  | 
 | ||||||
|  |                 let res_data = NewAppRes { | ||||||
|  |                     uuid: new_app_req.uuid, | ||||||
|  |                     mapped_ports, | ||||||
|  |                     ip_address: "127.0.0.1".to_string(), | ||||||
|  |                     error: new_app_err.clone().unwrap_or_default(), | ||||||
|  |                 }; | ||||||
|  | 
 | ||||||
|  |                 let res = app_proto::DaemonMessageApp { | ||||||
|  |                     msg: Some(app_proto::daemon_message_app::Msg::NewAppRes(res_data)), | ||||||
|  |                 }; | ||||||
|  |                 tx.send(res).await?; | ||||||
|  |             } | ||||||
|  |             Some(app_proto::brain_message_app::Msg::DeleteAppReq(del_app_req)) => { | ||||||
|  |                 println!("MOCK_APP_DAEMON:: delete app request for {}", del_app_req.uuid); | ||||||
|  |             } | ||||||
|  |             None => todo!(), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     Ok(()) | ||||||
|  | } | ||||||
| @ -6,3 +6,9 @@ pub mod test_utils; | |||||||
| pub mod vm_cli_utils; | pub mod vm_cli_utils; | ||||||
| #[allow(dead_code)] | #[allow(dead_code)] | ||||||
| pub mod vm_daemon_utils; | pub mod vm_daemon_utils; | ||||||
|  | 
 | ||||||
|  | #[allow(dead_code)] | ||||||
|  | pub mod app_daemon_utils; | ||||||
|  | 
 | ||||||
|  | #[allow(dead_code)] | ||||||
|  | pub mod app_cli_utils; | ||||||
|  | |||||||
| @ -1,4 +1,6 @@ | |||||||
| use anyhow::Result; | use anyhow::Result; | ||||||
|  | use detee_shared::app_proto::brain_app_cli_server::BrainAppCliServer; | ||||||
|  | use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemonServer; | ||||||
| use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; | use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; | ||||||
| use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; | use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; | ||||||
| use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; | use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; | ||||||
| @ -7,6 +9,7 @@ use hyper_util::rt::TokioIo; | |||||||
| use std::net::SocketAddr; | use std::net::SocketAddr; | ||||||
| use std::sync::Arc; | use std::sync::Arc; | ||||||
| use surreal_brain::constants::DB_SCHEMA_FILES; | use surreal_brain::constants::DB_SCHEMA_FILES; | ||||||
|  | use surreal_brain::grpc::app::{AppCliServer, AppDaemonServer}; | ||||||
| use surreal_brain::grpc::general::GeneralCliServer; | use surreal_brain::grpc::general::GeneralCliServer; | ||||||
| use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; | use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; | ||||||
| use surrealdb::engine::remote::ws::Client; | use surrealdb::engine::remote::ws::Client; | ||||||
| @ -96,6 +99,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream { | |||||||
|             .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) |             .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) | ||||||
|             .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) |             .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) | ||||||
|             .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) |             .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) | ||||||
|  |             .add_service(BrainAppCliServer::new(AppCliServer::new(db_arc.clone()))) | ||||||
|  |             .add_service(BrainAppDaemonServer::new(AppDaemonServer::new(db_arc.clone()))) | ||||||
|             .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) |             .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) | ||||||
|             .await?; |             .await?; | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -1,9 +1,14 @@ | |||||||
| use anyhow::Result; | use anyhow::Result; | ||||||
|  | use detee_shared::app_proto as sgx_proto; | ||||||
|  | use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; | ||||||
|  | use detee_shared::general_proto::AirdropReq; | ||||||
| use detee_shared::vm_proto as snp_proto; | use detee_shared::vm_proto as snp_proto; | ||||||
| use ed25519_dalek::{Signer, SigningKey}; | use ed25519_dalek::{Signer, SigningKey}; | ||||||
| use itertools::Itertools; | use itertools::Itertools; | ||||||
| use std::sync::OnceLock; | use std::sync::OnceLock; | ||||||
|  | use surreal_brain::constants::TOKEN_DECIMAL; | ||||||
| use tonic::metadata::AsciiMetadataValue; | use tonic::metadata::AsciiMetadataValue; | ||||||
|  | use tonic::transport::Channel; | ||||||
| use tonic::Request; | use tonic::Request; | ||||||
| 
 | 
 | ||||||
| pub static ADMIN_KEYS: OnceLock<Vec<Key>> = OnceLock::new(); | pub static ADMIN_KEYS: OnceLock<Vec<Key>> = OnceLock::new(); | ||||||
| @ -63,11 +68,33 @@ impl Key { | |||||||
|         Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) |         Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub fn sign_stream_auth(&self, contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> { |     pub fn sign_stream_auth_vm( | ||||||
|  |         &self, | ||||||
|  |         contracts: Vec<String>, | ||||||
|  |     ) -> Result<snp_proto::DaemonStreamAuth> { | ||||||
|         let pubkey = self.pubkey.clone(); |         let pubkey = self.pubkey.clone(); | ||||||
|         let timestamp = chrono::Utc::now().to_rfc3339(); |         let timestamp = chrono::Utc::now().to_rfc3339(); | ||||||
|         let signature = |         let signature = | ||||||
|             self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; |             self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; | ||||||
|         Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature }) |         Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature }) | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     pub fn sign_stream_auth_app(&self, contracts: Vec<String>) -> Result<sgx_proto::DaemonAuth> { | ||||||
|  |         let pubkey = self.pubkey.clone(); | ||||||
|  |         let timestamp = chrono::Utc::now().to_rfc3339(); | ||||||
|  |         let signature = | ||||||
|  |             self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; | ||||||
|  |         Ok(sgx_proto::DaemonAuth { timestamp, pubkey, contracts, signature }) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> { | ||||||
|  |     let mut client = BrainGeneralCliClient::new(brain_channel.clone()); | ||||||
|  |     let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount * TOKEN_DECIMAL }; | ||||||
|  | 
 | ||||||
|  |     let admin_key = admin_keys()[0].clone(); | ||||||
|  | 
 | ||||||
|  |     client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?; | ||||||
|  | 
 | ||||||
|  |     Ok(()) | ||||||
| } | } | ||||||
|  | |||||||
| @ -1,29 +1,18 @@ | |||||||
| use super::test_utils::{admin_keys, Key}; | use super::test_utils::Key; | ||||||
| use anyhow::{anyhow, Result}; | use anyhow::{anyhow, Result}; | ||||||
| use detee_shared::app_proto; | use detee_shared::app_proto; | ||||||
| use detee_shared::common_proto::Empty; | use detee_shared::common_proto::Empty; | ||||||
| use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; | use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; | ||||||
| use detee_shared::general_proto::{Account, AirdropReq, RegOperatorReq, ReportNodeReq}; | use detee_shared::general_proto::{Account, RegOperatorReq, ReportNodeReq}; | ||||||
| use detee_shared::vm_proto; | use detee_shared::vm_proto; | ||||||
| use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||||
| use futures::StreamExt; | use futures::StreamExt; | ||||||
| use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL}; | use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; | ||||||
| use surreal_brain::db::prelude as db; | use surreal_brain::db::prelude as db; | ||||||
| use surrealdb::engine::remote::ws::Client; | use surrealdb::engine::remote::ws::Client; | ||||||
| use surrealdb::Surreal; | use surrealdb::Surreal; | ||||||
| use tonic::transport::Channel; | use tonic::transport::Channel; | ||||||
| 
 | 
 | ||||||
| pub async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> { |  | ||||||
|     let mut client = BrainGeneralCliClient::new(brain_channel.clone()); |  | ||||||
|     let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount * TOKEN_DECIMAL }; |  | ||||||
| 
 |  | ||||||
|     let admin_key = admin_keys()[0].clone(); |  | ||||||
| 
 |  | ||||||
|     client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?; |  | ||||||
| 
 |  | ||||||
|     Ok(()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| pub async fn create_new_vm( | pub async fn create_new_vm( | ||||||
|     db: &Surreal<Client>, |     db: &Surreal<Client>, | ||||||
|     key: &Key, |     key: &Key, | ||||||
| @ -35,7 +24,7 @@ pub async fn create_new_vm( | |||||||
|         node_pubkey: node_pubkey.to_string(), |         node_pubkey: node_pubkey.to_string(), | ||||||
|         price_per_unit: 1200, |         price_per_unit: 1200, | ||||||
|         extra_ports: vec![8080, 8081], |         extra_ports: vec![8080, 8081], | ||||||
|         locked_nano: 0, |         locked_nano: 100, | ||||||
|         ..Default::default() |         ..Default::default() | ||||||
|     }; |     }; | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -59,8 +59,8 @@ pub async fn register_vm_node( | |||||||
|     let mut deleted_vm_reqs = Vec::new(); |     let mut deleted_vm_reqs = Vec::new(); | ||||||
|     while let Some(stream_update) = grpc_stream.next().await { |     while let Some(stream_update) = grpc_stream.next().await { | ||||||
|         match stream_update { |         match stream_update { | ||||||
|             Ok(del_vm_rq) => { |             Ok(del_vm_req) => { | ||||||
|                 deleted_vm_reqs.push(del_vm_rq); |                 deleted_vm_reqs.push(del_vm_req); | ||||||
|             } |             } | ||||||
|             Err(e) => { |             Err(e) => { | ||||||
|                 panic!("Received error instead of deleted_vm_reqs: {e:?}"); |                 panic!("Received error instead of deleted_vm_reqs: {e:?}"); | ||||||
| @ -76,7 +76,8 @@ pub async fn daemon_listener( | |||||||
|     tx: mpsc::Sender<vm_proto::BrainVmMessage>, |     tx: mpsc::Sender<vm_proto::BrainVmMessage>, | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|     log::info!("listening vm_daemon"); |     log::info!("listening vm_daemon"); | ||||||
|     let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner(); |     let mut grpc_stream = | ||||||
|  |         client.brain_messages(key.sign_stream_auth_vm(vec![])?).await?.into_inner(); | ||||||
| 
 | 
 | ||||||
|     while let Some(Ok(stream_update)) = grpc_stream.next().await { |     while let Some(Ok(stream_update)) = grpc_stream.next().await { | ||||||
|         log::info!("vm deamon got notified: {:?}", &stream_update); |         log::info!("vm deamon got notified: {:?}", &stream_update); | ||||||
| @ -95,7 +96,7 @@ pub async fn daemon_msg_sender( | |||||||
|     log::info!("sender vm_daemon"); |     log::info!("sender vm_daemon"); | ||||||
|     let rx_stream = ReceiverStream::new(rx); |     let rx_stream = ReceiverStream::new(rx); | ||||||
|     tx.send(vm_proto::VmDaemonMessage { |     tx.send(vm_proto::VmDaemonMessage { | ||||||
|         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)), |         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth_vm(vec![])?)), | ||||||
|     }) |     }) | ||||||
|     .await?; |     .await?; | ||||||
|     client.daemon_messages(rx_stream).await?; |     client.daemon_messages(rx_stream).await?; | ||||||
| @ -135,8 +136,8 @@ pub async fn daemon_engine( | |||||||
|             Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { |             Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { | ||||||
|                 todo!() |                 todo!() | ||||||
|             } |             } | ||||||
|             Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { |             Some(vm_proto::brain_vm_message::Msg::DeleteVm(del_vm_req)) => { | ||||||
|                 todo!() |                 println!("MOCK_VM_DAEMON:: delete vm request for {}", del_vm_req.uuid); | ||||||
|             } |             } | ||||||
|             None => todo!(), |             None => todo!(), | ||||||
|         } |         } | ||||||
|  | |||||||
							
								
								
									
										182
									
								
								tests/grpc_app_cli_test.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										182
									
								
								tests/grpc_app_cli_test.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,182 @@ | |||||||
|  | use common::app_daemon_utils::mock_app_daemon; | ||||||
|  | use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; | ||||||
|  | use common::test_utils::{airdrop, Key}; | ||||||
|  | use detee_shared::app_proto::brain_app_cli_client::BrainAppCliClient; | ||||||
|  | use detee_shared::app_proto::{self, DelAppReq}; | ||||||
|  | use std::vec; | ||||||
|  | use surreal_brain::constants::{ACCOUNT, ACTIVE_APP, DELETED_APP, NEW_APP_REQ, TOKEN_DECIMAL}; | ||||||
|  | use surreal_brain::db::prelude as db; | ||||||
|  | 
 | ||||||
|  | use crate::common::app_cli_utils::create_new_app; | ||||||
|  | 
 | ||||||
|  | mod common; | ||||||
|  | 
 | ||||||
|  | #[tokio::test] | ||||||
|  | async fn test_app_creation() { | ||||||
|  |     /* | ||||||
|  |     env_logger::builder() | ||||||
|  |         .filter_level(log::LevelFilter::Trace) | ||||||
|  |         .filter_module("tungstenite", log::LevelFilter::Debug) | ||||||
|  |         .filter_module("tokio_tungstenite", log::LevelFilter::Debug) | ||||||
|  |         .init(); | ||||||
|  |     */ | ||||||
|  |     let db = prepare_test_db().await.unwrap(); | ||||||
|  |     let brain_channel = run_service_for_stream().await.unwrap(); | ||||||
|  |     let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let key = Key::new(); | ||||||
|  | 
 | ||||||
|  |     let mut new_app_req = app_proto::NewAppReq { | ||||||
|  |         admin_pubkey: key.pubkey.clone(), | ||||||
|  |         node_pubkey: daemon_key.clone(), | ||||||
|  |         price_per_unit: 1200, | ||||||
|  |         resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }), | ||||||
|  |         locked_nano: 100, | ||||||
|  |         ..Default::default() | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); | ||||||
|  |     let new_app_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await; | ||||||
|  |     assert!(new_app_resp.is_err()); | ||||||
|  |     let new_app_err = new_app_resp.err().unwrap(); | ||||||
|  |     assert!(new_app_err.to_string().contains("Insufficient funds")); | ||||||
|  | 
 | ||||||
|  |     let airdrop_amount = 10; | ||||||
|  |     airdrop(&brain_channel, &key.pubkey, airdrop_amount).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let new_app_resp = client_app_cli | ||||||
|  |         .new_app(key.sign_request(new_app_req.clone()).unwrap()) | ||||||
|  |         .await | ||||||
|  |         .unwrap() | ||||||
|  |         .into_inner(); | ||||||
|  |     let active_app = | ||||||
|  |         db.select::<Option<db::ActiveApp>>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap(); | ||||||
|  |     assert!(active_app.is_some()); | ||||||
|  | 
 | ||||||
|  |     let daemon_key_02 = | ||||||
|  |         mock_app_daemon(&brain_channel, Some("something went wrong 01".to_string())).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     new_app_req.node_pubkey = daemon_key_02.clone(); | ||||||
|  | 
 | ||||||
|  |     let new_app_resp = client_app_cli | ||||||
|  |         .new_app(key.sign_request(new_app_req.clone()).unwrap()) | ||||||
|  |         .await | ||||||
|  |         .unwrap() | ||||||
|  |         .into_inner(); | ||||||
|  |     assert!(!new_app_resp.error.is_empty()); | ||||||
|  | 
 | ||||||
|  |     let app_req_db = | ||||||
|  |         db.select::<Option<db::NewAppReq>>((NEW_APP_REQ, new_app_resp.uuid)).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     assert!(!app_req_db.unwrap().error.is_empty()); | ||||||
|  | 
 | ||||||
|  |     let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); | ||||||
|  |     assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100)); | ||||||
|  |     assert_eq!(acc_db.tmp_locked, 0); | ||||||
|  | 
 | ||||||
|  |     let locking_nano = 1288; | ||||||
|  |     new_app_req.node_pubkey = daemon_key; | ||||||
|  |     new_app_req.locked_nano = locking_nano; | ||||||
|  | 
 | ||||||
|  |     let new_app_resp = | ||||||
|  |         client_app_cli.new_app(key.sign_request(new_app_req).unwrap()).await.unwrap().into_inner(); | ||||||
|  |     assert!(new_app_resp.error.is_empty()); | ||||||
|  | 
 | ||||||
|  |     let active_app = | ||||||
|  |         db.select::<Option<db::ActiveApp>>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap(); | ||||||
|  |     assert!(active_app.is_some()); | ||||||
|  | 
 | ||||||
|  |     let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); | ||||||
|  |     assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100)); | ||||||
|  |     assert_eq!(acc_db.tmp_locked, 0); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[tokio::test] | ||||||
|  | async fn test_timeout_app_creation() { | ||||||
|  |     let _ = prepare_test_db().await.unwrap(); | ||||||
|  |     let brain_channel = run_service_for_stream().await.unwrap(); | ||||||
|  |     let daemon_key = Key::new().pubkey.clone(); | ||||||
|  | 
 | ||||||
|  |     let key = Key::new(); | ||||||
|  | 
 | ||||||
|  |     let new_app_req = app_proto::NewAppReq { | ||||||
|  |         admin_pubkey: key.pubkey.clone(), | ||||||
|  |         node_pubkey: daemon_key.clone(), | ||||||
|  |         price_per_unit: 1200, | ||||||
|  |         resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }), | ||||||
|  |         locked_nano: 100, | ||||||
|  |         ..Default::default() | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     airdrop(&brain_channel, &key.pubkey, 10).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); | ||||||
|  |     let timeout_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await; | ||||||
|  |     assert!(timeout_resp.is_err()); | ||||||
|  |     let timeout_err = timeout_resp.err().unwrap(); | ||||||
|  |     assert_eq!( | ||||||
|  |         timeout_err.message(), | ||||||
|  |         "Network timeout. Please try again later or contact the DeTEE devs team." | ||||||
|  |     ); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[tokio::test] | ||||||
|  | async fn test_app_deletion() { | ||||||
|  |     let db = prepare_test_db().await.unwrap(); | ||||||
|  |     let brain_channel = run_service_for_stream().await.unwrap(); | ||||||
|  |     let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let key = Key::new(); | ||||||
|  |     airdrop(&brain_channel, &key.pubkey, 10).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let new_app_res = create_new_app(&key, &daemon_key, &brain_channel).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); | ||||||
|  | 
 | ||||||
|  |     let del_app_req = DelAppReq { admin_pubkey: key.pubkey.clone(), uuid: new_app_res.uuid }; | ||||||
|  |     let _ = client_app_cli.delete_app(key.sign_request(del_app_req).unwrap()).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let key_02 = Key::new(); | ||||||
|  | 
 | ||||||
|  |     // delete random app
 | ||||||
|  |     let mut del_app_req = DelAppReq { | ||||||
|  |         admin_pubkey: key_02.pubkey.clone(), | ||||||
|  |         uuid: "9ae3VH8nJg2i8pqTQ6mJtvYuS2kd9n1XLLco8GUPfT95".to_string(), | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     let del_err = client_app_cli | ||||||
|  |         .delete_app(key_02.sign_request(del_app_req.clone()).unwrap()) | ||||||
|  |         .await | ||||||
|  |         .err() | ||||||
|  |         .unwrap(); | ||||||
|  |     assert_eq!(del_err.message(), "Unauthorized"); | ||||||
|  | 
 | ||||||
|  |     let new_app_res_02 = create_new_app(&key, &daemon_key, &brain_channel).await.unwrap(); | ||||||
|  |     del_app_req.uuid = new_app_res_02.uuid; | ||||||
|  | 
 | ||||||
|  |     let del_err = client_app_cli | ||||||
|  |         .delete_app(key_02.sign_request(del_app_req.clone()).unwrap()) | ||||||
|  |         .await | ||||||
|  |         .err() | ||||||
|  |         .unwrap(); | ||||||
|  |     assert_eq!(del_err.message(), "Unauthorized"); | ||||||
|  | 
 | ||||||
|  |     // test refund
 | ||||||
|  |     let key_03 = Key::new(); | ||||||
|  |     airdrop(&brain_channel, &key_03.pubkey, 10).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let new_app_res_03 = create_new_app(&key_03, &daemon_key, &brain_channel).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let del_app_req = | ||||||
|  |         DelAppReq { admin_pubkey: key_03.pubkey.clone(), uuid: new_app_res_03.uuid.clone() }; | ||||||
|  |     let _ = client_app_cli.delete_app(key_03.sign_request(del_app_req).unwrap()).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let acc: db::Account = db.select((ACCOUNT, key_03.pubkey.clone())).await.unwrap().unwrap(); | ||||||
|  |     assert_eq!(acc.balance, 10 * TOKEN_DECIMAL); | ||||||
|  | 
 | ||||||
|  |     let deleted_app = | ||||||
|  |         db.select::<Option<db::DeletedApp>>((DELETED_APP, new_app_res_03.uuid)).await.unwrap(); | ||||||
|  |     assert!(deleted_app.is_some()); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // TODO: test register app node, delete app contract while node offline, kick, etc..
 | ||||||
| @ -1,10 +1,10 @@ | |||||||
| use common::prepare_test_env::{ | use common::prepare_test_env::{ | ||||||
|     prepare_test_db, run_service_for_stream, run_service_in_background, |     prepare_test_db, run_service_for_stream, run_service_in_background, | ||||||
| }; | }; | ||||||
| use common::test_utils::{admin_keys, Key}; | use common::test_utils::{admin_keys, airdrop, Key}; | ||||||
| use common::vm_cli_utils::{ | use common::vm_cli_utils::{ | ||||||
|     airdrop, create_new_vm, list_accounts, list_all_app_contracts, list_all_vm_contracts, |     create_new_vm, list_accounts, list_all_app_contracts, list_all_vm_contracts, register_operator, | ||||||
|     register_operator, report_node, |     report_node, | ||||||
| }; | }; | ||||||
| use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; | use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; | ||||||
| use detee_shared::common_proto::{Empty, Pubkey}; | use detee_shared::common_proto::{Empty, Pubkey}; | ||||||
|  | |||||||
| @ -1,14 +1,14 @@ | |||||||
| use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; | use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; | ||||||
| use common::test_utils::Key; | use common::test_utils::{airdrop, Key}; | ||||||
| use common::vm_cli_utils::{airdrop, create_new_vm, user_list_vm_contracts}; | use common::vm_cli_utils::{create_new_vm, user_list_vm_contracts}; | ||||||
| use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; | use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; | ||||||
| use detee_shared::vm_proto; |  | ||||||
| use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||||
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||||
|  | use detee_shared::vm_proto::{self, DeleteVmReq}; | ||||||
| use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq}; | use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq}; | ||||||
| use futures::StreamExt; | use futures::StreamExt; | ||||||
| use std::vec; | use std::vec; | ||||||
| use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL}; | use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, TOKEN_DECIMAL}; | ||||||
| use surreal_brain::db::prelude as db; | use surreal_brain::db::prelude as db; | ||||||
| 
 | 
 | ||||||
| mod common; | mod common; | ||||||
| @ -73,7 +73,7 @@ async fn test_vm_creation() { | |||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); |     let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); | ||||||
|     assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL); |     assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100)); | ||||||
|     assert_eq!(acc_db.tmp_locked, 0); |     assert_eq!(acc_db.tmp_locked, 0); | ||||||
| 
 | 
 | ||||||
|     new_vm_req.node_pubkey = daemon_key; |     new_vm_req.node_pubkey = daemon_key; | ||||||
| @ -87,7 +87,7 @@ async fn test_vm_creation() { | |||||||
|     assert!(active_vm.is_some()); |     assert!(active_vm.is_some()); | ||||||
| 
 | 
 | ||||||
|     let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); |     let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); | ||||||
|     assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - locking_nano); |     assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100)); | ||||||
|     assert_eq!(acc_db.tmp_locked, 0); |     assert_eq!(acc_db.tmp_locked, 0); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -109,7 +109,7 @@ async fn test_timeout_vm_creation() { | |||||||
|         node_pubkey: daemon_key.pubkey, |         node_pubkey: daemon_key.pubkey, | ||||||
|         price_per_unit: 1200, |         price_per_unit: 1200, | ||||||
|         extra_ports: vec![8080, 8081], |         extra_ports: vec![8080, 8081], | ||||||
|         locked_nano: 0, |         locked_nano: 100, | ||||||
|         ..Default::default() |         ..Default::default() | ||||||
|     }; |     }; | ||||||
| 
 | 
 | ||||||
| @ -125,6 +125,65 @@ async fn test_timeout_vm_creation() { | |||||||
|     ) |     ) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | #[tokio::test] | ||||||
|  | async fn test_vm_deletion() { | ||||||
|  |     let db = prepare_test_db().await.unwrap(); | ||||||
|  |     let brain_channel = run_service_for_stream().await.unwrap(); | ||||||
|  |     let daemon_key = mock_vm_daemon(&brain_channel, None).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let key = Key::new(); | ||||||
|  |     airdrop(&brain_channel, &key.pubkey, 10).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let new_vm_uuid = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); | ||||||
|  | 
 | ||||||
|  |     let del_app_req = DeleteVmReq { admin_pubkey: key.pubkey.clone(), uuid: new_vm_uuid }; | ||||||
|  |     let _ = client_vm_cli.delete_vm(key.sign_request(del_app_req).unwrap()).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let key_02 = Key::new(); | ||||||
|  | 
 | ||||||
|  |     // delete random vm
 | ||||||
|  |     let mut del_vm_req = DeleteVmReq { | ||||||
|  |         admin_pubkey: key_02.pubkey.clone(), | ||||||
|  |         uuid: "9ae3VH8nJg2i8pqTQ6mJtvYuS2kd9n1XLLco8GUPfT95".to_string(), | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     let del_err = client_vm_cli | ||||||
|  |         .delete_vm(key_02.sign_request(del_vm_req.clone()).unwrap()) | ||||||
|  |         .await | ||||||
|  |         .err() | ||||||
|  |         .unwrap(); | ||||||
|  |     assert_eq!(del_err.message(), "Unauthorized"); | ||||||
|  | 
 | ||||||
|  |     let new_vm_uuid_02 = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap(); | ||||||
|  |     del_vm_req.uuid = new_vm_uuid_02; | ||||||
|  | 
 | ||||||
|  |     let del_err = client_vm_cli | ||||||
|  |         .delete_vm(key_02.sign_request(del_vm_req.clone()).unwrap()) | ||||||
|  |         .await | ||||||
|  |         .err() | ||||||
|  |         .unwrap(); | ||||||
|  |     assert_eq!(del_err.message(), "Unauthorized"); | ||||||
|  | 
 | ||||||
|  |     // test refund
 | ||||||
|  |     let key_03 = Key::new(); | ||||||
|  |     airdrop(&brain_channel, &key_03.pubkey, 10).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let new_vm_uuid_03 = create_new_vm(&db, &key_03, &daemon_key, &brain_channel).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let del_vm_req = | ||||||
|  |         DeleteVmReq { admin_pubkey: key_03.pubkey.clone(), uuid: new_vm_uuid_03.clone() }; | ||||||
|  |     let _ = client_vm_cli.delete_vm(key_03.sign_request(del_vm_req).unwrap()).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     let acc: db::Account = db.select((ACCOUNT, key_03.pubkey.clone())).await.unwrap().unwrap(); | ||||||
|  |     assert_eq!(acc.balance, 10 * TOKEN_DECIMAL); | ||||||
|  | 
 | ||||||
|  |     let deleted_vm = | ||||||
|  |         db.select::<Option<db::DeletedVm>>((DELETED_VM, new_vm_uuid_03)).await.unwrap(); | ||||||
|  |     assert!(deleted_vm.is_some()); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| #[tokio::test] | #[tokio::test] | ||||||
| // TODO: create vm for this user before testing this
 | // TODO: create vm for this user before testing this
 | ||||||
| async fn test_list_vm_contracts() { | async fn test_list_vm_contracts() { | ||||||
| @ -240,3 +299,5 @@ async fn test_extend_vm() { | |||||||
|     let acc: db::Account = db_conn.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); |     let acc: db::Account = db_conn.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap(); | ||||||
|     assert_eq!(acc.balance, expected_bal_02); |     assert_eq!(acc.balance, expected_bal_02); | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // TODO: test register vm node, delete vm contract while node offline, kick, etc..
 | ||||||
|  | |||||||
| @ -1,8 +1,7 @@ | |||||||
| use common::prepare_test_env::{ | use common::prepare_test_env::{ | ||||||
|     prepare_test_db, run_service_for_stream, run_service_in_background, |     prepare_test_db, run_service_for_stream, run_service_in_background, | ||||||
| }; | }; | ||||||
| use common::test_utils::Key; | use common::test_utils::{airdrop, Key}; | ||||||
| use common::vm_cli_utils::airdrop; |  | ||||||
| use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; | use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; | ||||||
| use detee_shared::vm_proto; | use detee_shared::vm_proto; | ||||||
| use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||||
| @ -39,7 +38,7 @@ async fn test_brain_message() { | |||||||
|         node_pubkey: daemon_key, |         node_pubkey: daemon_key, | ||||||
|         price_per_unit: 1200, |         price_per_unit: 1200, | ||||||
|         extra_ports: vec![8080, 8081], |         extra_ports: vec![8080, 8081], | ||||||
|         locked_nano: 0, |         locked_nano: 100, | ||||||
|         ..Default::default() |         ..Default::default() | ||||||
|     }; |     }; | ||||||
|     airdrop(&brain_channel, &cli_key.pubkey, 10).await.unwrap(); |     airdrop(&brain_channel, &cli_key.pubkey, 10).await.unwrap(); | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user