Compare commits
	
		
			2 Commits
		
	
	
		
			186146a597
			...
			815701184f
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 815701184f | |||
| af18e4ee77 | 
							
								
								
									
										1
									
								
								.env
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										1
									
								
								.env
									
									
									
									
									
								
							| @ -5,3 +5,4 @@ DB_NAMESPACE = "brain" | ||||
| DB_NAME = "migration" | ||||
| CERT_PATH = "./tmp/brain-crt.pem" | ||||
| CERT_KEY_PATH = "./tmp/brain-key.pem" | ||||
| # ADMIN_PUB_KEYS = "admin_key01, admin_key02, admin_key03" | ||||
|  | ||||
							
								
								
									
										10
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										10
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -1972,6 +1972,15 @@ dependencies = [ | ||||
|  "either", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "itertools" | ||||
| version = "0.14.0" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" | ||||
| dependencies = [ | ||||
|  "either", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "itoa" | ||||
| version = "1.0.15" | ||||
| @ -3788,6 +3797,7 @@ dependencies = [ | ||||
|  "env_logger", | ||||
|  "futures", | ||||
|  "hyper-util", | ||||
|  "itertools 0.14.0", | ||||
|  "log", | ||||
|  "nanoid", | ||||
|  "rand 0.8.5", | ||||
|  | ||||
| @ -35,5 +35,6 @@ anyhow = "1.0.98" | ||||
| bs58 = "0.5.1" | ||||
| ed25519-dalek = { version = "2.1.1", features = ["rand_core"] } | ||||
| hyper-util = "0.1.11" | ||||
| itertools = "0.14.0" | ||||
| rand = "0.8" | ||||
| tower = "0.5.2" | ||||
|  | ||||
| @ -1,3 +1,5 @@ | ||||
| use std::sync::LazyLock; | ||||
| 
 | ||||
| pub const BRAIN_GRPC_ADDR: &str = "0.0.0.0:31337"; | ||||
| pub const CERT_PATH: &str = "/etc/detee/brain/brain-crt.pem"; | ||||
| pub const CERT_KEY_PATH: &str = "/etc/detee/brain/brain-key.pem"; | ||||
| @ -5,11 +7,18 @@ pub const CONFIG_PATH: &str = "/etc/detee/brain/config.ini"; | ||||
| 
 | ||||
| pub const DB_SCHEMA_FILE: &str = "interim_tables.surql"; | ||||
| 
 | ||||
| pub const ADMIN_ACCOUNTS: &[&str] = &[ | ||||
|     "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK", | ||||
|     "FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL", | ||||
|     "H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc", | ||||
| ]; | ||||
| pub static ADMIN_ACCOUNTS: LazyLock<Vec<String>> = LazyLock::new(|| { | ||||
|     let default_admin_keys = vec![ | ||||
|         "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK".to_string(), | ||||
|         "FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL".to_string(), | ||||
|         "H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc".to_string(), | ||||
|     ]; | ||||
| 
 | ||||
|     std::env::var("ADMIN_PUB_KEYS") | ||||
|         .ok() | ||||
|         .map(|keys| keys.split(',').map(|key| key.trim().to_string()).collect::<Vec<String>>()) | ||||
|         .unwrap_or(default_admin_keys) | ||||
| }); | ||||
| 
 | ||||
| pub const OLD_BRAIN_DATA_PATH: &str = "./saved_data.yaml"; | ||||
| 
 | ||||
|  | ||||
| @ -30,6 +30,18 @@ impl Account { | ||||
|         Ok(account) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn get_or_create(db: &Surreal<Client>, address: &str) -> Result<Self, Error> { | ||||
|         let id = (ACCOUNT, address); | ||||
| 
 | ||||
|         match db.select(id).await? { | ||||
|             Some(account) => Ok(account), | ||||
|             None => { | ||||
|                 let account: Option<Self> = db.create(id).await?; | ||||
|                 account.ok_or(Error::FailedToCreateDBEntry) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub async fn airdrop(db: &Surreal<Client>, account: &str, tokens: u64) -> Result<(), Error> { | ||||
|         let tokens = tokens.saturating_mul(1_000_000_000); | ||||
|         let _ = db | ||||
|  | ||||
| @ -22,6 +22,8 @@ pub enum Error { | ||||
|     StdIo(#[from] std::io::Error), | ||||
|     #[error(transparent)] | ||||
|     TimeOut(#[from] tokio::time::error::Elapsed), | ||||
|     #[error("Failed to create account")] | ||||
|     FailedToCreateDBEntry, | ||||
| } | ||||
| 
 | ||||
| pub mod prelude { | ||||
| @ -29,7 +31,6 @@ pub mod prelude { | ||||
|     pub use super::general::*; | ||||
|     pub use super::vm::*; | ||||
|     pub use super::*; | ||||
|     pub use detee_shared::snp::pb::vm_proto::{MeasurementArgs, VmNodeFilters}; | ||||
| } | ||||
| 
 | ||||
| pub async fn db_connection( | ||||
|  | ||||
							
								
								
									
										63
									
								
								src/db/vm.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										63
									
								
								src/db/vm.rs
									
									
									
									
									
								
							| @ -5,8 +5,9 @@ use super::Error; | ||||
| use crate::constants::{ | ||||
|     ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT, | ||||
| }; | ||||
| use crate::db::{MeasurementArgs, Report, VmNodeFilters}; | ||||
| use crate::db::{Account, Report}; | ||||
| use crate::old_brain; | ||||
| use detee_shared::vm_proto; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use surrealdb::engine::remote::ws::Client; | ||||
| use surrealdb::sql::Datetime; | ||||
| @ -52,6 +53,7 @@ impl VmNodeResources { | ||||
| 
 | ||||
| impl VmNode { | ||||
|     pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> { | ||||
|         Account::get_or_create(db, &self.operator.key().to_string()).await?; | ||||
|         let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| @ -82,7 +84,7 @@ impl VmNodeWithReports { | ||||
|     // https://en.wikipedia.org/wiki/Dependency_inversion_principle
 | ||||
|     pub async fn find_by_filters( | ||||
|         db: &Surreal<Client>, | ||||
|         filters: VmNodeFilters, | ||||
|         filters: vm_proto::VmNodeFilters, | ||||
|     ) -> Result<Vec<Self>, Error> { | ||||
|         let mut query = format!( | ||||
|             "select *, <-report.* as reports from {VM_NODE} where
 | ||||
| @ -189,14 +191,53 @@ impl NewVmReq { | ||||
|     } | ||||
| 
 | ||||
|     pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> { | ||||
|         let _: Vec<Self> = db.insert(NEW_VM_REQ).relation(self).await?; | ||||
|         let locked_nano = self.locked_nano; | ||||
|         let account = self.admin.key().to_string(); | ||||
|         let vm_id = self.id.key().to_string(); | ||||
|         let vm_node = self.vm_node.key().to_string(); | ||||
|         // TODO: check for possible injection and maybe use .bind()
 | ||||
|         let query = format!( | ||||
|             " | ||||
|             BEGIN TRANSACTION; | ||||
|             UPDATE account:{account} SET balance -= {locked_nano}; | ||||
|             IF account:{account}.balance < 0 {{ | ||||
|                 THROW 'Insufficient funds.' | ||||
|             }}; | ||||
|             UPDATE account:{account} SET tmp_locked += {locked_nano}; | ||||
|             RELATE | ||||
|                 account:{account} | ||||
|                 ->new_vm_req:{vm_id} | ||||
|                 ->vm_node:{vm_node} | ||||
|             CONTENT {{ | ||||
|                 created_at: time::now(), hostname: '{}', vcpus: {}, memory_mb: {}, disk_size_gb: {}, | ||||
|                 extra_ports: {}, public_ipv4: {:?}, public_ipv6: {:?}, | ||||
|                 dtrfs_url: '{}', dtrfs_sha: '{}', kernel_url: '{}', kernel_sha: '{}', | ||||
|                 price_per_unit: {}, locked_nano: {locked_nano}, error: '' | ||||
|             }}; | ||||
|             COMMIT TRANSACTION; | ||||
|         ",
 | ||||
|             self.hostname, | ||||
|             self.vcpus, | ||||
|             self.memory_mb, | ||||
|             self.disk_size_gb, | ||||
|             format!("{:?}", self.extra_ports,), | ||||
|             self.public_ipv4, | ||||
|             self.public_ipv6, | ||||
|             self.dtrfs_url, | ||||
|             self.dtrfs_sha, | ||||
|             self.kernel_url, | ||||
|             self.kernel_sha, | ||||
|             self.price_per_unit | ||||
|         ); | ||||
|         //let _: Vec<Self> = db.insert(NEW_VM_REQ).relation(self).await?;
 | ||||
|         db.query(query).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// first string is the vm_id
 | ||||
| pub enum WrappedMeasurement { | ||||
|     Args(String, MeasurementArgs), | ||||
|     Args(String, vm_proto::MeasurementArgs), | ||||
|     Error(String, String), | ||||
| } | ||||
| 
 | ||||
| @ -223,9 +264,10 @@ impl WrappedMeasurement { | ||||
|             )) | ||||
|             .await?; | ||||
|         let mut error_stream = resp.stream::<Notification<ErrorMessage>>(0)?; | ||||
|         let mut args_stream = resp.stream::<Notification<MeasurementArgs>>(1)?; | ||||
|         let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?; | ||||
| 
 | ||||
|         let args: Option<MeasurementArgs> = db.delete(("measurement_args", vm_id)).await?; | ||||
|         let args: Option<vm_proto::MeasurementArgs> = | ||||
|             db.delete(("measurement_args", vm_id)).await?; | ||||
|         if let Some(args) = args { | ||||
|             return Ok(Self::Args(vm_id.to_string(), args)); | ||||
|         } | ||||
| @ -254,7 +296,7 @@ impl WrappedMeasurement { | ||||
|                             match args_notif { | ||||
|                                 Ok(args_notif) => { | ||||
|                                     if args_notif.action == surrealdb::Action::Create { | ||||
|                                         let _: Option<MeasurementArgs> = db.delete(("measurement_args", vm_id)).await?; | ||||
|                                         let _: Option<vm_proto::MeasurementArgs> = db.delete(("measurement_args", vm_id)).await?; | ||||
|                                         return Ok(Self::Args(vm_id.to_string(), args_notif.data)); | ||||
|                                     }; | ||||
|                                 }, | ||||
| @ -317,7 +359,7 @@ impl ActiveVm { | ||||
|     pub async fn activate( | ||||
|         db: &Surreal<Client>, | ||||
|         id: &str, | ||||
|         args: MeasurementArgs, | ||||
|         args: vm_proto::MeasurementArgs, | ||||
|     ) -> Result<(), Error> { | ||||
|         let new_vm_req = match NewVmReq::get(db, id).await? { | ||||
|             Some(r) => r, | ||||
| @ -367,9 +409,12 @@ impl ActiveVm { | ||||
|             collected_at: new_vm_req.created_at, | ||||
|         }; | ||||
| 
 | ||||
|         let admin_account = active_vm.admin.key().to_string(); | ||||
|         let locked_nano = active_vm.locked_nano; | ||||
|         let _: Vec<ActiveVm> = db.insert(()).relation(active_vm).await?; | ||||
| 
 | ||||
|         NewVmReq::delete(db, id).await?; | ||||
|         db.query(format!("UPDATE {ACCOUNT}:{admin_account} SET tmp_locked -= {locked_nano};")) | ||||
|             .await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|  | ||||
| @ -166,7 +166,8 @@ pub fn check_admin_key<T>(req: &Request<T>) -> Result<(), Status> { | ||||
|     }; | ||||
|     let pubkey = pubkey | ||||
|         .to_str() | ||||
|         .map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?; | ||||
|         .map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))? | ||||
|         .to_owned(); | ||||
| 
 | ||||
|     if !ADMIN_ACCOUNTS.contains(&pubkey) { | ||||
|         return Err(Status::unauthenticated("This operation is reserved to admin accounts")); | ||||
|  | ||||
| @ -1,3 +1,4 @@ | ||||
| use anyhow::{anyhow, Result}; | ||||
| use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; | ||||
| use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; | ||||
| use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; | ||||
| @ -15,9 +16,9 @@ use tokio::sync::OnceCell; | ||||
| use tonic::transport::{Channel, Endpoint, Server, Uri}; | ||||
| use tower::service_fn; | ||||
| 
 | ||||
| pub static DB_STATE: OnceCell<()> = OnceCell::const_new(); | ||||
| pub static DB_STATE: OnceCell<Result<()>> = OnceCell::const_new(); | ||||
| 
 | ||||
| pub async fn prepare_test_db() -> Surreal<Client> { | ||||
| pub async fn prepare_test_db() -> Result<Surreal<Client>> { | ||||
|     dotenv().ok(); | ||||
| 
 | ||||
|     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||
| @ -26,24 +27,25 @@ pub async fn prepare_test_db() -> Surreal<Client> { | ||||
|     let db_ns = "test_brain"; | ||||
|     let db_name = "test_migration_db"; | ||||
| 
 | ||||
|     let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) | ||||
|         .await | ||||
|         .unwrap(); | ||||
|     let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?; | ||||
|     DB_STATE | ||||
|         .get_or_init(|| async { | ||||
|             let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); | ||||
|             db.query(format!("REMOVE DATABASE {db_name}")).await.unwrap(); | ||||
|             db.query(std::fs::read_to_string("interim_tables.surql").unwrap()).await.unwrap(); | ||||
|             surreal_brain::db::migration0(&db, &old_brain_data).await.unwrap(); | ||||
|             let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk() | ||||
|                 .map_err(|e| anyhow!(e.to_string()))?; | ||||
| 
 | ||||
|             db.query(format!("REMOVE DATABASE {db_name}")).await?; | ||||
|             db.query(std::fs::read_to_string("interim_tables.surql")?).await?; | ||||
|             surreal_brain::db::migration0(&db, &old_brain_data).await?; | ||||
|             Ok::<(), anyhow::Error>(()) | ||||
|         }) | ||||
|         .await; | ||||
|     db | ||||
|     Ok(db) | ||||
| } | ||||
| 
 | ||||
| pub async fn run_service_in_background() -> SocketAddr { | ||||
| pub async fn run_service_in_background() -> Result<SocketAddr> { | ||||
|     dotenv().ok(); | ||||
|     let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||||
|     let addr = listener.local_addr().unwrap(); | ||||
|     let listener = TcpListener::bind("127.0.0.1:0").await?; | ||||
|     let addr = listener.local_addr()?; | ||||
| 
 | ||||
|     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||
|     let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); | ||||
| @ -52,9 +54,8 @@ pub async fn run_service_in_background() -> SocketAddr { | ||||
|     let db_name = "test_migration_db"; | ||||
| 
 | ||||
|     tokio::spawn(async move { | ||||
|         let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) | ||||
|             .await | ||||
|             .unwrap(); | ||||
|         let db = | ||||
|             surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?; | ||||
|         let db_arc = Arc::new(db); | ||||
| 
 | ||||
|         Server::builder() | ||||
| @ -62,13 +63,14 @@ pub async fn run_service_in_background() -> SocketAddr { | ||||
|             .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) | ||||
|             .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) | ||||
|             .await | ||||
|             .unwrap(); | ||||
|             .await?; | ||||
| 
 | ||||
|         Ok::<(), anyhow::Error>(()) | ||||
|     }); | ||||
| 
 | ||||
|     tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; | ||||
| 
 | ||||
|     addr | ||||
|     Ok(addr) | ||||
| } | ||||
| 
 | ||||
| pub async fn run_service_for_stream_server() -> DuplexStream { | ||||
| @ -82,9 +84,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream { | ||||
|     let db_name = "test_migration_db"; | ||||
| 
 | ||||
|     tokio::spawn(async move { | ||||
|         let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) | ||||
|             .await | ||||
|             .unwrap(); | ||||
|         let db = | ||||
|             surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?; | ||||
|         let db_arc = Arc::new(db); | ||||
| 
 | ||||
|         tonic::transport::Server::builder() | ||||
| @ -92,24 +93,26 @@ pub async fn run_service_for_stream_server() -> DuplexStream { | ||||
|             .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) | ||||
|             .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) | ||||
|             .await | ||||
|             .await?; | ||||
| 
 | ||||
|         Ok::<(), anyhow::Error>(()) | ||||
|     }); | ||||
|     client | ||||
| } | ||||
| 
 | ||||
| pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Channel { | ||||
| pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Result<Channel> { | ||||
|     let mut client = Some(c_stream); | ||||
| 
 | ||||
|     Endpoint::from_static("http://127.0.0.1:0") | ||||
|     Ok(Endpoint::from_static("http://127.0.0.1:0") | ||||
|         .connect_with_connector(service_fn(move |_: Uri| { | ||||
|             let client = client.take().unwrap(); | ||||
|             async move { Ok::<TokioIo<DuplexStream>, std::io::Error>(TokioIo::new(client)) } | ||||
|         })) | ||||
|         .await | ||||
|         .unwrap() | ||||
|         .await?) | ||||
| } | ||||
| 
 | ||||
| pub async fn run_service_for_stream() -> Channel { | ||||
| pub async fn run_service_for_stream() -> Result<Channel> { | ||||
|     let client = run_service_for_stream_server().await; | ||||
| 
 | ||||
|     connect_stream_client_channel(client).await | ||||
| } | ||||
|  | ||||
| @ -1,4 +1,8 @@ | ||||
| use super::test_utils::Key; | ||||
| use anyhow::{anyhow, Result}; | ||||
| use detee_shared::common_proto::Empty; | ||||
| use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; | ||||
| use detee_shared::general_proto::{AirdropReq, ReportNodeReq}; | ||||
| use detee_shared::vm_proto; | ||||
| use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||
| use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; | ||||
| @ -7,15 +11,27 @@ use surrealdb::engine::remote::ws::Client; | ||||
| use surrealdb::Surreal; | ||||
| use tonic::transport::Channel; | ||||
| 
 | ||||
| async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> { | ||||
|     let mut client = BrainGeneralCliClient::new(brain_channel.clone()); | ||||
|     let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount }; | ||||
| 
 | ||||
|     let admin_key = Key::new(); | ||||
|     std::env::set_var("ADMIN_PUB_KEYS", &admin_key.pubkey); | ||||
| 
 | ||||
|     client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?; | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| pub async fn create_new_vm( | ||||
|     db: &Surreal<Client>, | ||||
|     key: Key, | ||||
|     node_pubkey: String, | ||||
|     brain_channel: Channel, | ||||
| ) -> String { | ||||
|     key: &Key, | ||||
|     node_pubkey: &str, | ||||
|     brain_channel: &Channel, | ||||
| ) -> Result<String> { | ||||
|     let new_vm_req = vm_proto::NewVmReq { | ||||
|         admin_pubkey: key.pubkey.clone(), | ||||
|         node_pubkey, | ||||
|         node_pubkey: node_pubkey.to_string(), | ||||
|         price_per_unit: 1200, | ||||
|         extra_ports: vec![8080, 8081], | ||||
|         locked_nano: 0, | ||||
| @ -23,8 +39,7 @@ pub async fn create_new_vm( | ||||
|     }; | ||||
| 
 | ||||
|     let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); | ||||
|     let new_vm_resp = | ||||
|         client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.unwrap().into_inner(); | ||||
|     let new_vm_resp = client_vm_cli.new_vm(key.sign_request(new_vm_req)?).await?.into_inner(); | ||||
| 
 | ||||
|     assert!(new_vm_resp.error.is_empty()); | ||||
|     assert!(new_vm_resp.uuid.len() == 40); | ||||
| @ -32,16 +47,34 @@ pub async fn create_new_vm( | ||||
|     // wait for update db
 | ||||
|     tokio::time::sleep(tokio::time::Duration::from_millis(700)).await; | ||||
| 
 | ||||
|     let vm_req_db: Option<db::NewVmReq> = | ||||
|         db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap(); | ||||
|     let vm_req_db: Option<db::NewVmReq> = db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await?; | ||||
| 
 | ||||
|     if let Some(new_vm_req) = vm_req_db { | ||||
|         panic!("New VM request found in DB: {:?}", new_vm_req); | ||||
|     } | ||||
| 
 | ||||
|     let active_vm_op: Option<db::ActiveVm> = | ||||
|         db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap(); | ||||
|     let active_vm = active_vm_op.unwrap(); | ||||
|         db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await?; | ||||
|     let active_vm = active_vm_op.ok_or(anyhow!("Not found active vm in db"))?; | ||||
| 
 | ||||
|     active_vm.id.key().to_string() | ||||
|     Ok(active_vm.id.key().to_string()) | ||||
| } | ||||
| 
 | ||||
| pub async fn report_node( | ||||
|     key: &Key, | ||||
|     brain_channel: &Channel, | ||||
|     node_pubkey: &str, | ||||
|     contract: &str, | ||||
|     reason: &str, | ||||
| ) -> Result<tonic::Response<Empty>> { | ||||
|     let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone()); | ||||
| 
 | ||||
|     let report_req = ReportNodeReq { | ||||
|         admin_pubkey: key.pubkey.clone(), | ||||
|         node_pubkey: node_pubkey.to_string(), | ||||
|         contract: contract.to_string(), | ||||
|         reason: reason.to_string(), | ||||
|     }; | ||||
| 
 | ||||
|     Ok(client_gen_cli.report_node(key.sign_request(report_req)?).await?) | ||||
| } | ||||
|  | ||||
| @ -1,4 +1,5 @@ | ||||
| use super::test_utils::Key; | ||||
| use anyhow::Result; | ||||
| use detee_shared::vm_proto; | ||||
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||
| use detee_shared::vm_proto::RegisterVmNodeReq; | ||||
| @ -7,11 +8,11 @@ use tokio::sync::mpsc; | ||||
| use tokio_stream::wrappers::ReceiverStream; | ||||
| use tonic::transport::Channel; | ||||
| 
 | ||||
| pub async fn mock_vm_daemon(brain_channel: Channel) -> String { | ||||
|     let daemon_client = BrainVmDaemonClient::new(brain_channel); | ||||
| pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result<String> { | ||||
|     let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); | ||||
|     let daemon_key = Key::new(); | ||||
| 
 | ||||
|     register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; | ||||
|     register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?; | ||||
| 
 | ||||
|     let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); | ||||
| 
 | ||||
| @ -29,20 +30,20 @@ pub async fn mock_vm_daemon(brain_channel: Channel) -> String { | ||||
| 
 | ||||
|     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||||
| 
 | ||||
|     daemon_key.pubkey | ||||
|     Ok(daemon_key.pubkey) | ||||
| } | ||||
| 
 | ||||
| pub async fn register_vm_node( | ||||
|     mut client: BrainVmDaemonClient<Channel>, | ||||
|     key: Key, | ||||
|     operator_wallet: String, | ||||
| ) -> Vec<vm_proto::VmContract> { | ||||
|     client: &mut BrainVmDaemonClient<Channel>, | ||||
|     key: &Key, | ||||
|     operator_wallet: &str, | ||||
| ) -> Result<Vec<vm_proto::VmContract>> { | ||||
|     log::info!("Registering vm_node: {}", key.pubkey); | ||||
|     let node_pubkey = key.pubkey.clone(); | ||||
| 
 | ||||
|     let req = RegisterVmNodeReq { | ||||
|         node_pubkey, | ||||
|         operator_wallet, | ||||
|         operator_wallet: operator_wallet.to_string(), | ||||
|         main_ip: String::from("185.243.218.213"), | ||||
|         city: String::from("Oslo"), | ||||
|         country: String::from("Norway"), | ||||
| @ -50,8 +51,7 @@ pub async fn register_vm_node( | ||||
|         price: 1200, | ||||
|     }; | ||||
| 
 | ||||
|     let mut grpc_stream = | ||||
|         client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner(); | ||||
|     let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner(); | ||||
| 
 | ||||
|     let mut vm_contracts = Vec::new(); | ||||
|     while let Some(stream_update) = grpc_stream.next().await { | ||||
| @ -64,22 +64,23 @@ pub async fn register_vm_node( | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|     vm_contracts | ||||
|     Ok(vm_contracts) | ||||
| } | ||||
| 
 | ||||
| pub async fn daemon_listener( | ||||
|     mut client: BrainVmDaemonClient<Channel>, | ||||
|     key: Key, | ||||
|     tx: mpsc::Sender<vm_proto::BrainVmMessage>, | ||||
| ) { | ||||
| ) -> Result<()> { | ||||
|     log::info!("listening vm_daemon"); | ||||
|     let mut grpc_stream = | ||||
|         client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); | ||||
|     let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner(); | ||||
| 
 | ||||
|     while let Some(Ok(stream_update)) = grpc_stream.next().await { | ||||
|         log::info!("vm deamon got notified: {:?}", &stream_update); | ||||
|         let _ = tx.send(stream_update).await; | ||||
|     } | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| pub async fn daemon_msg_sender( | ||||
| @ -87,28 +88,29 @@ pub async fn daemon_msg_sender( | ||||
|     key: Key, | ||||
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, | ||||
|     rx: mpsc::Receiver<vm_proto::VmDaemonMessage>, | ||||
| ) { | ||||
| ) -> Result<()> { | ||||
|     log::info!("sender vm_daemon"); | ||||
|     let rx_stream = ReceiverStream::new(rx); | ||||
|     tx.send(vm_proto::VmDaemonMessage { | ||||
|         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), | ||||
|         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)), | ||||
|     }) | ||||
|     .await | ||||
|     .unwrap(); | ||||
|     client.daemon_messages(rx_stream).await.unwrap(); | ||||
|     .await?; | ||||
|     client.daemon_messages(rx_stream).await?; | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| pub async fn daemon_engine( | ||||
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, | ||||
|     mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>, | ||||
| ) { | ||||
| ) -> Result<()> { | ||||
|     log::info!("daemon engine vm_daemon"); | ||||
|     while let Some(brain_msg) = rx.recv().await { | ||||
|         match brain_msg.msg { | ||||
|             Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { | ||||
|                 let exposed_ports = [vec![22], new_vm_req.extra_ports].concat(); | ||||
|                 let args = Some(vm_proto::MeasurementArgs { | ||||
|                     dtrfs_api_endpoint: String::from("184.107.169.199:48865"), | ||||
|                     exposed_ports: new_vm_req.extra_ports, | ||||
|                     exposed_ports, | ||||
|                     ovmf_hash: String::from( | ||||
|                         "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", | ||||
|                     ), | ||||
| @ -124,7 +126,7 @@ pub async fn daemon_engine( | ||||
|                 let res_data = vm_proto::VmDaemonMessage { | ||||
|                     msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), | ||||
|                 }; | ||||
|                 tx.send(res_data).await.unwrap(); | ||||
|                 tx.send(res_data).await?; | ||||
|             } | ||||
|             Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { | ||||
|                 todo!() | ||||
| @ -135,4 +137,6 @@ pub async fn daemon_engine( | ||||
|             None => todo!(), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
							
								
								
									
										213
									
								
								tests/grpc_general_test.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										213
									
								
								tests/grpc_general_test.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,213 @@ | ||||
| use common::prepare_test_env::{ | ||||
|     prepare_test_db, run_service_for_stream, run_service_in_background, | ||||
| }; | ||||
| use common::test_utils::Key; | ||||
| use common::vm_cli_utils::{create_new_vm, report_node}; | ||||
| use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; | ||||
| use detee_shared::common_proto::{Empty, Pubkey}; | ||||
| use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; | ||||
| use detee_shared::general_proto::AirdropReq; | ||||
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||
| use futures::StreamExt; | ||||
| use itertools::Itertools; | ||||
| use std::vec; | ||||
| use surreal_brain::constants::VM_NODE; | ||||
| use surreal_brain::db::vm::VmNodeWithReports; | ||||
| 
 | ||||
| mod common; | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_general_balance() { | ||||
|     // env_logger::builder().filter_level(log::LevelFilter::Trace).init();
 | ||||
|     prepare_test_db().await.unwrap(); | ||||
| 
 | ||||
|     let addr = run_service_in_background().await.unwrap(); | ||||
|     let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
|     let pubkey = key.pubkey.clone(); | ||||
|     let req_data = Pubkey { pubkey }; | ||||
| 
 | ||||
|     let req = key.sign_request(req_data).unwrap(); | ||||
| 
 | ||||
|     let acc_bal = client.get_balance(req).await.unwrap().into_inner(); | ||||
| 
 | ||||
|     assert_eq!(acc_bal.balance, 0); | ||||
|     assert_eq!(acc_bal.tmp_locked, 0); | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_general_airdrop() { | ||||
|     // env_logger::builder().filter_level(log::LevelFilter::Trace).init();
 | ||||
|     prepare_test_db().await.unwrap(); | ||||
| 
 | ||||
|     const AIRDROP_MULTIPLE: u64 = 1_000_000_000; | ||||
|     let airdrop_amount = 10; | ||||
| 
 | ||||
|     let addr = run_service_in_background().await.unwrap(); | ||||
|     let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); | ||||
| 
 | ||||
|     let admin_keys = vec![Key::new(), Key::new(), Key::new()]; | ||||
|     let admin_pub_keys = admin_keys.iter().map(|k| k.pubkey.clone()).join(", "); | ||||
|     std::env::set_var("ADMIN_PUB_KEYS", admin_pub_keys); | ||||
| 
 | ||||
|     let user_01_key = Key::new(); | ||||
|     let user_01_pubkey = user_01_key.pubkey.clone(); | ||||
| 
 | ||||
|     let airdrop_req = AirdropReq { pubkey: user_01_pubkey.clone(), tokens: airdrop_amount }; | ||||
| 
 | ||||
|     // user airdroping himself
 | ||||
|     let err = | ||||
|         client.airdrop(user_01_key.sign_request(airdrop_req.clone()).unwrap()).await.err().unwrap(); | ||||
|     assert_eq!(err.message(), "This operation is reserved to admin accounts"); | ||||
| 
 | ||||
|     // other user airdroping
 | ||||
|     let err = | ||||
|         client.airdrop(Key::new().sign_request(airdrop_req.clone()).unwrap()).await.err().unwrap(); | ||||
|     assert_eq!(err.message(), "This operation is reserved to admin accounts"); | ||||
| 
 | ||||
|     let _ = client.airdrop(admin_keys[0].sign_request(airdrop_req.clone()).unwrap()).await.unwrap(); | ||||
| 
 | ||||
|     let bal_req_data = Pubkey { pubkey: user_01_pubkey }; | ||||
|     let bal_req = user_01_key.sign_request(bal_req_data.clone()).unwrap(); | ||||
|     let acc_bal_user_01 = client.get_balance(bal_req).await.unwrap().into_inner(); | ||||
| 
 | ||||
|     assert_eq!(acc_bal_user_01.balance, airdrop_amount * AIRDROP_MULTIPLE); | ||||
|     assert_eq!(acc_bal_user_01.tmp_locked, 0); | ||||
| 
 | ||||
|     // second airdrop from same admin
 | ||||
|     let _ = client.airdrop(admin_keys[0].sign_request(airdrop_req.clone()).unwrap()).await.unwrap(); | ||||
| 
 | ||||
|     let acc_bal_user_01 = client | ||||
|         .get_balance(user_01_key.sign_request(bal_req_data.clone()).unwrap()) | ||||
|         .await | ||||
|         .unwrap() | ||||
|         .into_inner(); | ||||
| 
 | ||||
|     assert_eq!(acc_bal_user_01.balance, 2 * airdrop_amount * AIRDROP_MULTIPLE); | ||||
| 
 | ||||
|     // third airdrop from another admin
 | ||||
|     let _ = client.airdrop(admin_keys[1].sign_request(airdrop_req.clone()).unwrap()).await.unwrap(); | ||||
| 
 | ||||
|     let acc_bal_user_01 = client | ||||
|         .get_balance(user_01_key.sign_request(bal_req_data).unwrap()) | ||||
|         .await | ||||
|         .unwrap() | ||||
|         .into_inner(); | ||||
| 
 | ||||
|     assert_eq!(acc_bal_user_01.balance, 3 * airdrop_amount * AIRDROP_MULTIPLE); | ||||
| 
 | ||||
|     // self airdrop
 | ||||
|     let airdrop_req = AirdropReq { pubkey: admin_keys[2].pubkey.clone(), tokens: airdrop_amount }; | ||||
| 
 | ||||
|     let _ = client.airdrop(admin_keys[2].sign_request(airdrop_req.clone()).unwrap()).await.unwrap(); | ||||
| 
 | ||||
|     let bal_req_data = Pubkey { pubkey: admin_keys[2].pubkey.clone() }; | ||||
|     let acc_bal_admin_3 = client | ||||
|         .get_balance(admin_keys[2].sign_request(bal_req_data.clone()).unwrap()) | ||||
|         .await | ||||
|         .unwrap() | ||||
|         .into_inner(); | ||||
| 
 | ||||
|     assert_eq!(acc_bal_admin_3.balance, airdrop_amount * AIRDROP_MULTIPLE); | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_report_node() { | ||||
|     let db = prepare_test_db().await.unwrap(); | ||||
| 
 | ||||
|     let brain_channel = run_service_for_stream().await.unwrap(); | ||||
|     let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
| 
 | ||||
|     let report_error = | ||||
|         report_node(&key, &brain_channel, &daemon_key, "uuid", "reason").await.err().unwrap(); | ||||
| 
 | ||||
|     log::info!("Report error: {:?}", report_error); | ||||
|     assert!(report_error.to_string().contains("No contract found by this ID.")); | ||||
| 
 | ||||
|     let active_vm_id = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap(); | ||||
| 
 | ||||
|     let reason = String::from("something went wrong on vm"); | ||||
| 
 | ||||
|     let _ = report_node(&key, &brain_channel, &daemon_key, &active_vm_id, &reason) | ||||
|         .await | ||||
|         .unwrap() | ||||
|         .into_inner(); | ||||
| 
 | ||||
|     let vm_nodes: Vec<VmNodeWithReports> = db | ||||
|         .query(format!( | ||||
|             "SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};" | ||||
|         )) | ||||
|         .await | ||||
|         .unwrap() | ||||
|         .take(0) | ||||
|         .unwrap(); | ||||
| 
 | ||||
|     let vm_node_with_report = &vm_nodes[0]; | ||||
| 
 | ||||
|     assert!(vm_node_with_report.reports[0].reason == reason); | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| // TODO: register some operators before testing this
 | ||||
| async fn test_list_operators() { | ||||
|     prepare_test_db().await.unwrap(); | ||||
| 
 | ||||
|     let channel = run_service_for_stream().await.unwrap(); | ||||
| 
 | ||||
|     let mut client = BrainGeneralCliClient::new(channel); | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
| 
 | ||||
|     let mut grpc_stream = | ||||
|         client.list_operators(key.sign_request(Empty {}).unwrap()).await.unwrap().into_inner(); | ||||
| 
 | ||||
|     let mut operators = Vec::new(); | ||||
|     while let Some(stream_update) = grpc_stream.next().await { | ||||
|         match stream_update { | ||||
|             Ok(op) => { | ||||
|                 operators.push(op); | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 panic!("Received error instead of operators: {e:?}"); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     assert!(!operators.is_empty()) | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_inspect_operator() { | ||||
|     prepare_test_db().await.unwrap(); | ||||
| 
 | ||||
|     let brain_channel = run_service_for_stream().await.unwrap(); | ||||
|     let mut cli_client = BrainGeneralCliClient::new(brain_channel.clone()); | ||||
|     let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); | ||||
|     let key = Key::new(); | ||||
|     let daemon_key = Key::new(); | ||||
|     let operator_key = Key::new(); | ||||
| 
 | ||||
|     let err = cli_client | ||||
|         .inspect_operator(key.sign_request(Pubkey { pubkey: operator_key.pubkey.clone() }).unwrap()) | ||||
|         .await | ||||
|         .err() | ||||
|         .unwrap(); | ||||
| 
 | ||||
|     assert_eq!(err.message(), "The wallet you specified is not an operator"); | ||||
| 
 | ||||
|     // TODO: test with app node also
 | ||||
|     register_vm_node(&mut daemon_client, &daemon_key, &operator_key.pubkey).await.unwrap(); | ||||
| 
 | ||||
|     let inspect_response = cli_client | ||||
|         .inspect_operator(key.sign_request(Pubkey { pubkey: operator_key.pubkey.clone() }).unwrap()) | ||||
|         .await | ||||
|         .unwrap() | ||||
|         .into_inner(); | ||||
| 
 | ||||
|     assert!(inspect_response.app_nodes.is_empty()); | ||||
|     assert!(!inspect_response.vm_nodes.is_empty()); | ||||
|     assert_eq!(&inspect_response.vm_nodes[0].operator, &operator_key.pubkey); | ||||
| } | ||||
| @ -1,166 +0,0 @@ | ||||
| use common::prepare_test_env::{ | ||||
|     prepare_test_db, run_service_for_stream, run_service_in_background, | ||||
| }; | ||||
| use common::test_utils::Key; | ||||
| use common::vm_cli_utils::create_new_vm; | ||||
| use common::vm_daemon_utils::mock_vm_daemon; | ||||
| use detee_shared::common_proto::{Empty, Pubkey}; | ||||
| use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; | ||||
| use detee_shared::general_proto::ReportNodeReq; | ||||
| use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||
| use detee_shared::vm_proto::ListVmContractsReq; | ||||
| use futures::StreamExt; | ||||
| use surreal_brain::constants::VM_NODE; | ||||
| use surreal_brain::db::vm::VmNodeWithReports; | ||||
| 
 | ||||
| mod common; | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_general_balance() { | ||||
|     // env_logger::builder().filter_level(log::LevelFilter::Trace).init();
 | ||||
|     let _ = prepare_test_db().await; | ||||
| 
 | ||||
|     let addr = run_service_in_background().await; | ||||
|     let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
|     let pubkey = key.pubkey.clone(); | ||||
|     let req_data = Pubkey { pubkey }; | ||||
| 
 | ||||
|     let req = key.sign_request(req_data).unwrap(); | ||||
| 
 | ||||
|     let acc_bal = client.get_balance(req).await.unwrap().into_inner(); | ||||
| 
 | ||||
|     assert_eq!(acc_bal.balance, 0); | ||||
|     assert_eq!(acc_bal.tmp_locked, 0); | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_vm_creation() { | ||||
|     let db = prepare_test_db().await; | ||||
| 
 | ||||
|     let brain_channel = run_service_for_stream().await; | ||||
|     let daemon_key = mock_vm_daemon(brain_channel.clone()).await; | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
| 
 | ||||
|     let _ = create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await; | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_report_node() { | ||||
|     let db = prepare_test_db().await; | ||||
| 
 | ||||
|     let brain_channel = run_service_for_stream().await; | ||||
|     let daemon_key = mock_vm_daemon(brain_channel.clone()).await; | ||||
|     let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone()); | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
|     let pubkey = key.pubkey.clone(); | ||||
| 
 | ||||
|     let report_req = ReportNodeReq { | ||||
|         admin_pubkey: pubkey.clone(), | ||||
|         node_pubkey: daemon_key.clone(), | ||||
|         contract: String::from("uuid"), | ||||
|         reason: String::from("reason"), | ||||
|     }; | ||||
| 
 | ||||
|     let report_error = | ||||
|         client_gen_cli.report_node(key.sign_request(report_req).unwrap()).await.err().unwrap(); | ||||
| 
 | ||||
|     println!("Report error: {:?}", report_error); | ||||
|     assert_eq!(report_error.message(), "No contract found by this ID."); | ||||
| 
 | ||||
|     let active_vm_id = | ||||
|         create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await; | ||||
| 
 | ||||
|     let reason = String::from("something went wrong on vm"); | ||||
|     let report_req = ReportNodeReq { | ||||
|         admin_pubkey: pubkey, | ||||
|         node_pubkey: daemon_key.clone(), | ||||
|         contract: active_vm_id, | ||||
|         reason: reason.clone(), | ||||
|     }; | ||||
| 
 | ||||
|     let _ = client_gen_cli | ||||
|         .report_node(key.sign_request(report_req).unwrap()) | ||||
|         .await | ||||
|         .unwrap() | ||||
|         .into_inner(); | ||||
| 
 | ||||
|     let vm_nodes: Vec<VmNodeWithReports> = db | ||||
|         .query(format!( | ||||
|             "SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};" | ||||
|         )) | ||||
|         .await | ||||
|         .unwrap() | ||||
|         .take(0) | ||||
|         .unwrap(); | ||||
| 
 | ||||
|     let vm_node_with_report = vm_nodes.get(0).unwrap(); | ||||
| 
 | ||||
|     assert!(vm_node_with_report.reports[0].reason == reason); | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| // TODO: register some operators before testing this
 | ||||
| async fn test_list_operators() { | ||||
|     prepare_test_db().await; | ||||
| 
 | ||||
|     let channel = run_service_for_stream().await; | ||||
| 
 | ||||
|     let mut client = BrainGeneralCliClient::new(channel); | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
| 
 | ||||
|     let mut grpc_stream = | ||||
|         client.list_operators(key.sign_request(Empty {}).unwrap()).await.unwrap().into_inner(); | ||||
| 
 | ||||
|     let mut operators = Vec::new(); | ||||
|     while let Some(stream_update) = grpc_stream.next().await { | ||||
|         match stream_update { | ||||
|             Ok(op) => { | ||||
|                 operators.push(op); | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 panic!("Received error instead of operators: {e:?}"); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     assert!(!operators.is_empty()) | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| // TODO: create vm for this user before testing this
 | ||||
| async fn test_list_vm_contracts() { | ||||
|     prepare_test_db().await; | ||||
| 
 | ||||
|     let channel = run_service_for_stream().await; | ||||
|     let mut client = BrainVmCliClient::new(channel); | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
|     let pubkey = key.pubkey.clone(); | ||||
| 
 | ||||
|     let req_data = | ||||
|         ListVmContractsReq { wallet: pubkey, uuid: String::from("uuid"), as_operator: false }; | ||||
| 
 | ||||
|     let mut grpc_stream = | ||||
|         client.list_vm_contracts(key.sign_request(req_data).unwrap()).await.unwrap().into_inner(); | ||||
| 
 | ||||
|     let mut vm_contracts = Vec::new(); | ||||
|     while let Some(stream_update) = grpc_stream.next().await { | ||||
|         match stream_update { | ||||
|             Ok(vm_c) => { | ||||
|                 vm_contracts.push(vm_c); | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 panic!("Received error instead of vm_contracts: {e:?}"); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     assert!(vm_contracts.is_empty()) | ||||
| 
 | ||||
|     // verify report in db
 | ||||
| } | ||||
							
								
								
									
										90
									
								
								tests/grpc_vm_cli_test.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										90
									
								
								tests/grpc_vm_cli_test.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,90 @@ | ||||
| use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; | ||||
| use common::test_utils::Key; | ||||
| use common::vm_cli_utils::create_new_vm; | ||||
| use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; | ||||
| use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||
| use detee_shared::vm_proto::{ListVmContractsReq, NewVmReq}; | ||||
| use futures::StreamExt; | ||||
| use std::vec; | ||||
| 
 | ||||
| mod common; | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_vm_creation() { | ||||
|     let db = prepare_test_db().await.unwrap(); | ||||
|     // env_logger::builder().filter_level(log::LevelFilter::Error).init();
 | ||||
| 
 | ||||
|     let brain_channel = run_service_for_stream().await.unwrap(); | ||||
|     let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
| 
 | ||||
|     let _ = create_new_vm(&db, &key, &daemon_key, &brain_channel).await; | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_vm_creation_timeout() { | ||||
|     prepare_test_db().await.unwrap(); | ||||
|     // env_logger::builder().filter_level(log::LevelFilter::Error).init();
 | ||||
| 
 | ||||
|     let brain_channel = run_service_for_stream().await.unwrap(); | ||||
|     let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); | ||||
|     let daemon_key = Key::new(); | ||||
| 
 | ||||
|     register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await.unwrap(); | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
| 
 | ||||
|     let new_vm_req = NewVmReq { | ||||
|         admin_pubkey: key.pubkey.clone(), | ||||
|         node_pubkey: daemon_key.pubkey, | ||||
|         price_per_unit: 1200, | ||||
|         extra_ports: vec![8080, 8081], | ||||
|         locked_nano: 0, | ||||
|         ..Default::default() | ||||
|     }; | ||||
| 
 | ||||
|     let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); | ||||
|     let timeout_error = | ||||
|         client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.err().unwrap(); | ||||
| 
 | ||||
|     assert_eq!( | ||||
|         timeout_error.message(), | ||||
|         "Network timeout. Please try again later or contact the DeTEE devs team.", | ||||
|     ) | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| // TODO: create vm for this user before testing this
 | ||||
| async fn test_list_vm_contracts() { | ||||
|     prepare_test_db().await.unwrap(); | ||||
| 
 | ||||
|     let channel = run_service_for_stream().await.unwrap(); | ||||
|     let mut client = BrainVmCliClient::new(channel); | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
|     let pubkey = key.pubkey.clone(); | ||||
| 
 | ||||
|     let req_data = | ||||
|         ListVmContractsReq { wallet: pubkey, uuid: String::from("uuid"), as_operator: false }; | ||||
| 
 | ||||
|     let mut grpc_stream = | ||||
|         client.list_vm_contracts(key.sign_request(req_data).unwrap()).await.unwrap().into_inner(); | ||||
| 
 | ||||
|     let mut vm_contracts = Vec::new(); | ||||
|     while let Some(stream_update) = grpc_stream.next().await { | ||||
|         match stream_update { | ||||
|             Ok(vm_c) => { | ||||
|                 vm_contracts.push(vm_c); | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 panic!("Received error instead of vm_contracts: {e:?}"); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     assert!(vm_contracts.is_empty()) | ||||
| 
 | ||||
|     // verify report in db
 | ||||
| } | ||||
| @ -11,27 +11,24 @@ mod common; | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_reg_vm_node() { | ||||
|     prepare_test_db().await; | ||||
|     prepare_test_db().await.unwrap(); | ||||
| 
 | ||||
|     let addr = run_service_in_background().await; | ||||
|     let client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap(); | ||||
|     let addr = run_service_in_background().await.unwrap(); | ||||
|     let mut client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap(); | ||||
| 
 | ||||
|     let operator_wallet = Key::new().pubkey; | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
| 
 | ||||
|     let vm_contracts = register_vm_node(client, key, operator_wallet).await; | ||||
|     let vm_contracts = | ||||
|         register_vm_node(&mut client, &Key::new(), &Key::new().pubkey).await.unwrap(); | ||||
| 
 | ||||
|     assert!(vm_contracts.is_empty()) | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_brain_message() { | ||||
|     env_logger::builder().filter_level(log::LevelFilter::Info).init(); | ||||
|     let db = prepare_test_db().await; | ||||
|     env_logger::builder().filter_level(log::LevelFilter::Error).init(); | ||||
|     prepare_test_db().await.unwrap(); | ||||
| 
 | ||||
|     let brain_channel = run_service_for_stream().await; | ||||
|     let daemon_key = mock_vm_daemon(brain_channel.clone()).await; | ||||
|     let brain_channel = run_service_for_stream().await.unwrap(); | ||||
|     let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); | ||||
|     let mut cli_client = BrainVmCliClient::new(brain_channel); | ||||
| 
 | ||||
|     let cli_key = Key::new(); | ||||
| @ -49,9 +46,6 @@ async fn test_brain_message() { | ||||
| 
 | ||||
|     assert!(new_vm_resp.error.is_empty()); | ||||
|     assert!(new_vm_resp.uuid.len() == 40); | ||||
| 
 | ||||
|     let id = ("measurement_args", new_vm_resp.uuid); | ||||
|     let data_in_db: detee_shared::vm_proto::MeasurementArgs = db.select(id).await.unwrap().unwrap(); | ||||
| 
 | ||||
|     assert_eq!(data_in_db, new_vm_resp.args.unwrap()); | ||||
|     assert!(new_vm_resp.args.is_some()); | ||||
|     assert!(new_vm_resp.args.unwrap().exposed_ports.len() == 3); | ||||
| } | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user