Compare commits
	
		
			10 Commits
		
	
	
		
			32f6548eff
			...
			8fa8a64862
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 8fa8a64862 | |||
| a1664d61e4 | |||
| 28ff8dcd81 | |||
| 8abe8eebf4 | |||
| ea3169c024 | |||
| 8536c85fd5 | |||
| 5cd8317f91 | |||
| de5fba37c9 | |||
| ab39b12da9 | |||
| 92827161ad | 
							
								
								
									
										5
									
								
								.env
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										5
									
								
								.env
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,5 @@ | ||||
| DB_URL = "localhost:8000" | ||||
| DB_USER = "root" | ||||
| DB_PASS = "root" | ||||
| DB_NAMESPACE = "brain" | ||||
| DB_NAME = "migration" | ||||
							
								
								
									
										7
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										7
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -1078,6 +1078,12 @@ version = "0.3.3" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" | ||||
| 
 | ||||
| [[package]] | ||||
| name = "dotenv" | ||||
| version = "0.15.0" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" | ||||
| 
 | ||||
| [[package]] | ||||
| name = "earcutr" | ||||
| version = "0.4.3" | ||||
| @ -3777,6 +3783,7 @@ dependencies = [ | ||||
|  "chrono", | ||||
|  "dashmap 6.1.0", | ||||
|  "detee-shared", | ||||
|  "dotenv", | ||||
|  "ed25519-dalek", | ||||
|  "env_logger", | ||||
|  "futures", | ||||
|  | ||||
| @ -21,6 +21,7 @@ log = "0.4.27" | ||||
| env_logger = "0.11.8" | ||||
| thiserror = "2.0.12" | ||||
| nanoid = "0.4.0" | ||||
| dotenv = "0.15.0" | ||||
| 
 | ||||
| [profile.release] | ||||
| lto = true | ||||
|  | ||||
							
								
								
									
										1
									
								
								README.md
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										1
									
								
								README.md
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1 @@ | ||||
| # Brain Miration to SurrealDB | ||||
| @ -131,3 +131,4 @@ DEFINE FIELD contract ON TABLE kick TYPE record<deleted_vm|deleted_app>; | ||||
| DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node; | ||||
| DEFINE FIELD created_at ON TABLE report TYPE datetime; | ||||
| DEFINE FIELD reason ON TABLE report TYPE string; | ||||
| DEFINE FIELD contract_id ON TABLE report TYPE string; | ||||
|  | ||||
| @ -1,9 +1,10 @@ | ||||
| use std::sync::Arc; | ||||
| 
 | ||||
| use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; | ||||
| use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; | ||||
| use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; | ||||
| use surreal_brain::constants::{ | ||||
|     BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH, DB_ADDRESS, DB_NAME, DB_NS, | ||||
| }; | ||||
| use dotenv::dotenv; | ||||
| use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH}; | ||||
| use surreal_brain::db; | ||||
| use surreal_brain::grpc::BrainVmCliForReal; | ||||
| use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmDaemonForReal}; | ||||
| @ -11,13 +12,24 @@ use tonic::transport::{Identity, Server, ServerTlsConfig}; | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main() { | ||||
|     dotenv().ok(); | ||||
|     env_logger::builder().filter_level(log::LevelFilter::Debug).init(); | ||||
|     db::init(DB_ADDRESS, DB_NS, DB_NAME).await.unwrap(); | ||||
| 
 | ||||
|     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||
|     let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); | ||||
|     let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env"); | ||||
|     let db_ns = std::env::var("DB_NAMESPACE").expect("DB_NAMESPACE not set in .env"); | ||||
|     let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env"); | ||||
| 
 | ||||
|     let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap(); | ||||
|     let db_arc = Arc::new(db); | ||||
| 
 | ||||
|     let addr = BRAIN_GRPC_ADDR.parse().unwrap(); | ||||
| 
 | ||||
|     let snp_daemon_server = BrainVmDaemonServer::new(BrainVmDaemonForReal {}); | ||||
|     let snp_cli_server = BrainVmCliServer::new(BrainVmCliForReal {}); | ||||
|     let general_service_server = BrainGeneralCliServer::new(BrainGeneralCliForReal {}); | ||||
|     let snp_daemon_server = BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone())); | ||||
|     let snp_cli_server = BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone())); | ||||
|     let general_service_server = | ||||
|         BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone())); | ||||
| 
 | ||||
|     let cert = std::fs::read_to_string(CERT_PATH).unwrap(); | ||||
|     let key = std::fs::read_to_string(CERT_KEY_PATH).unwrap(); | ||||
|  | ||||
| @ -1,20 +1,24 @@ | ||||
| // After deleting this migration, also delete old_brain structs
 | ||||
| // and dangling impls from the model
 | ||||
| use dotenv::dotenv; | ||||
| use std::error::Error; | ||||
| use surreal_brain::constants::{DB_ADDRESS, DB_NAME, DB_NS}; | ||||
| use surreal_brain::db::init; | ||||
| use surreal_brain::{db, old_brain}; | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main() -> Result<(), Box<dyn Error>> { | ||||
|     dotenv().ok(); | ||||
|     let old_brain_data = old_brain::BrainData::load_from_disk()?; | ||||
|     // println!("{}", serde_yaml::to_string(&old_brain_data)?);
 | ||||
| 
 | ||||
|     init(DB_ADDRESS, DB_NS, DB_NAME).await?; | ||||
|     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||
|     let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); | ||||
|     let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env"); | ||||
|     let db_ns = std::env::var("DB_NAMESPACE").expect("DB_NAMESPACE not set in .env"); | ||||
|     let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env"); | ||||
| 
 | ||||
|     let result = db::migration0(&old_brain_data).await?; | ||||
|     let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap(); | ||||
| 
 | ||||
|     println!("{result:?}"); | ||||
|     db::migration0(&db, &old_brain_data).await?; | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| @ -2,13 +2,7 @@ pub const BRAIN_GRPC_ADDR: &str = "0.0.0.0:31337"; | ||||
| pub const CERT_PATH: &str = "./tmp/brain-crt.pem"; | ||||
| pub const CERT_KEY_PATH: &str = "./tmp/brain-key.pem"; | ||||
| 
 | ||||
| pub const DB_ADDRESS: &str = "localhost:8000"; | ||||
| pub const DB_NS: &str = "brain"; | ||||
| pub const DB_NAME: &str = "migration"; | ||||
| 
 | ||||
| // TODO: read from .env
 | ||||
| pub const DB_USER: &str = "root"; | ||||
| pub const DB_PASS: &str = "root"; | ||||
| pub const DB_SCHEMA_FILE: &str = "interim_tables.surql"; | ||||
| 
 | ||||
| pub const ADMIN_ACCOUNTS: &[&str] = &[ | ||||
|     "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK", | ||||
| @ -26,6 +20,8 @@ pub const UPDATE_VM_REQ: &str = "update_vm_req"; | ||||
| pub const DELETED_VM: &str = "deleted_vm"; | ||||
| pub const VM_CONTRACT: &str = "vm_contract"; | ||||
| 
 | ||||
| pub const ACTIVE_APP: &str = "active_app"; | ||||
| 
 | ||||
| pub const ID_ALPHABET: [char; 62] = [ | ||||
|     '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', | ||||
|     'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', | ||||
|  | ||||
							
								
								
									
										236
									
								
								src/db.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										236
									
								
								src/db.rs
									
									
									
									
									
								
							| @ -1,11 +1,12 @@ | ||||
| use std::{str::FromStr, time::Duration}; | ||||
| 
 | ||||
| pub use crate::constants::{ | ||||
|     ACCOUNT, ACTIVE_VM, DB_ADDRESS, DB_NAME, DB_NS, DB_PASS, DB_USER, DELETED_VM, ID_ALPHABET, | ||||
|     NEW_VM_REQ, UPDATE_VM_REQ, VM_CONTRACT, VM_NODE, | ||||
|     ACCOUNT, ACTIVE_APP, ACTIVE_VM, DB_SCHEMA_FILE, DELETED_VM, ID_ALPHABET, NEW_VM_REQ, | ||||
|     UPDATE_VM_REQ, VM_CONTRACT, VM_NODE, | ||||
| }; | ||||
| 
 | ||||
| use crate::old_brain; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use std::{str::FromStr, sync::LazyLock}; | ||||
| use surrealdb::{ | ||||
|     engine::remote::ws::{Client, Ws}, | ||||
|     opt::auth::Root, | ||||
| @ -15,49 +16,65 @@ use surrealdb::{ | ||||
| use tokio::sync::mpsc::Sender; | ||||
| use tokio_stream::StreamExt as _; | ||||
| 
 | ||||
| pub static DB: LazyLock<Surreal<Client>> = LazyLock::new(Surreal::init); | ||||
| 
 | ||||
| #[derive(thiserror::Error, Debug)] | ||||
| pub enum Error { | ||||
|     #[error("Internal DB error: {0}")] | ||||
|     DataBase(#[from] surrealdb::Error), | ||||
|     #[error("Daemon channel got closed: {0}")] | ||||
|     DaemonConnection(#[from] tokio::sync::mpsc::error::SendError<DaemonNotification>), | ||||
|     #[error(transparent)] | ||||
|     StdIo(#[from] std::io::Error), | ||||
|     #[error(transparent)] | ||||
|     TimeOut(#[from] tokio::time::error::Elapsed), | ||||
| } | ||||
| 
 | ||||
| pub async fn init(db_address: &str, ns: &str, db: &str) -> surrealdb::Result<()> { | ||||
|     DB.connect::<Ws>(db_address).await?; | ||||
| pub async fn db_connection( | ||||
|     db_address: &str, | ||||
|     username: &str, | ||||
|     password: &str, | ||||
|     ns: &str, | ||||
|     db: &str, | ||||
| ) -> Result<Surreal<Client>, Error> { | ||||
|     let db_connection: Surreal<Client> = Surreal::init(); | ||||
|     db_connection.connect::<Ws>(db_address).await?; | ||||
|     // Sign in to the server
 | ||||
|     DB.signin(Root { username: DB_USER, password: DB_PASS }).await?; | ||||
|     DB.use_ns(ns).use_db(db).await?; | ||||
|     Ok(()) | ||||
|     db_connection.signin(Root { username, password }).await?; | ||||
|     db_connection.use_ns(ns).use_db(db).await?; | ||||
|     Ok(db_connection) | ||||
| } | ||||
| 
 | ||||
| pub async fn upsert_record<SomeRecord: Serialize + 'static>( | ||||
|     db: &Surreal<Client>, | ||||
|     table: &str, | ||||
|     id: &str, | ||||
|     my_record: SomeRecord, | ||||
| ) -> Result<(), Error> { | ||||
|     #[derive(Deserialize)] | ||||
|     struct Wrapper {} | ||||
|     let _: Option<Wrapper> = DB.create((table, id)).content(my_record).await?; | ||||
|     let _: Option<Wrapper> = db.create((table, id)).content(my_record).await?; | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()> { | ||||
| pub async fn migration0( | ||||
|     db: &Surreal<Client>, | ||||
|     old_data: &old_brain::BrainData, | ||||
| ) -> Result<(), Error> { | ||||
|     let accounts: Vec<Account> = old_data.into(); | ||||
|     let vm_nodes: Vec<VmNode> = old_data.into(); | ||||
|     let app_nodes: Vec<AppNode> = old_data.into(); | ||||
|     let vm_contracts: Vec<ActiveVm> = old_data.into(); | ||||
| 
 | ||||
|     let schema = std::fs::read_to_string(DB_SCHEMA_FILE)?; | ||||
|     db.query(schema).await?; | ||||
| 
 | ||||
|     println!("Inserting accounts..."); | ||||
|     let _: Vec<Account> = DB.insert(()).content(accounts).await?; | ||||
|     let _: Vec<Account> = db.insert(()).content(accounts).await?; | ||||
|     println!("Inserting vm nodes..."); | ||||
|     let _: Vec<VmNode> = DB.insert(()).content(vm_nodes).await?; | ||||
|     let _: Vec<VmNode> = db.insert(()).content(vm_nodes).await?; | ||||
|     println!("Inserting app nodes..."); | ||||
|     let _: Vec<AppNode> = DB.insert(()).content(app_nodes).await?; | ||||
|     let _: Vec<AppNode> = db.insert(()).content(app_nodes).await?; | ||||
|     println!("Inserting vm contracts..."); | ||||
|     let _: Vec<ActiveVm> = DB.insert("vm_contract").relation(vm_contracts).await?; | ||||
|     let _: Vec<ActiveVm> = db.insert("vm_contract").relation(vm_contracts).await?; | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
| @ -72,9 +89,9 @@ pub struct Account { | ||||
| } | ||||
| 
 | ||||
| impl Account { | ||||
|     pub async fn get(address: &str) -> Result<Self, Error> { | ||||
|     pub async fn get(db: &Surreal<Client>, address: &str) -> Result<Self, Error> { | ||||
|         let id = (ACCOUNT, address); | ||||
|         let account: Option<Self> = DB.select(id).await?; | ||||
|         let account: Option<Self> = db.select(id).await?; | ||||
|         let account = match account { | ||||
|             Some(account) => account, | ||||
|             None => { | ||||
| @ -84,9 +101,9 @@ impl Account { | ||||
|         Ok(account) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn airdrop(account: &str, tokens: u64) -> Result<(), Error> { | ||||
|     pub async fn airdrop(db: &Surreal<Client>, account: &str, tokens: u64) -> Result<(), Error> { | ||||
|         let tokens = tokens.saturating_mul(1_000_000_000); | ||||
|         let _ = DB | ||||
|         let _ = db | ||||
|             .query(format!("upsert account:{account} SET balance = (balance || 0) + {tokens};")) | ||||
|             .await?; | ||||
|         Ok(()) | ||||
| @ -94,8 +111,12 @@ impl Account { | ||||
| } | ||||
| 
 | ||||
| impl Account { | ||||
|     pub async fn is_banned_by_node(user: &str, node: &str) -> Result<bool, Error> { | ||||
|         let ban: Option<Self> = DB | ||||
|     pub async fn is_banned_by_node( | ||||
|         db: &Surreal<Client>, | ||||
|         user: &str, | ||||
|         node: &str, | ||||
|     ) -> Result<bool, Error> { | ||||
|         let ban: Option<Self> = db | ||||
|             .query(format!( | ||||
|                 "(select operator->ban[0] as ban
 | ||||
|                     from vm_node:{node} | ||||
| @ -139,15 +160,15 @@ pub struct VmNodeResources { | ||||
| } | ||||
| 
 | ||||
| impl VmNodeResources { | ||||
|     pub async fn merge(self, node_id: &str) -> Result<(), Error> { | ||||
|         let _: Option<VmNode> = DB.update((VM_NODE, node_id)).merge(self).await?; | ||||
|     pub async fn merge(self, db: &Surreal<Client>, node_id: &str) -> Result<(), Error> { | ||||
|         let _: Option<VmNode> = db.update((VM_NODE, node_id)).merge(self).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl VmNode { | ||||
|     pub async fn register(self) -> Result<(), Error> { | ||||
|         let _: Option<VmNode> = DB.upsert(self.id.clone()).content(self).await?; | ||||
|     pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> { | ||||
|         let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| @ -176,6 +197,7 @@ impl VmNodeWithReports { | ||||
|     // TODO: find a more elegant way to do this than importing gRPC in the DB module
 | ||||
|     // https://en.wikipedia.org/wiki/Dependency_inversion_principle
 | ||||
|     pub async fn find_by_filters( | ||||
|         db: &Surreal<Client>, | ||||
|         filters: detee_shared::snp::pb::vm_proto::VmNodeFilters, | ||||
|     ) -> Result<Vec<Self>, Error> { | ||||
|         let mut query = format!( | ||||
| @ -208,7 +230,7 @@ impl VmNodeWithReports { | ||||
|             query += &format!("&& ip = '{}' ", filters.ip); | ||||
|         } | ||||
|         query += ";"; | ||||
|         let mut result = DB.query(query).await?; | ||||
|         let mut result = db.query(query).await?; | ||||
|         let vm_nodes: Vec<Self> = result.take(0)?; | ||||
|         Ok(vm_nodes) | ||||
|     } | ||||
| @ -263,27 +285,27 @@ pub struct NewVmReq { | ||||
| } | ||||
| 
 | ||||
| impl NewVmReq { | ||||
|     pub async fn get(id: &str) -> Result<Option<Self>, Error> { | ||||
|         let new_vm_req: Option<Self> = DB.select((NEW_VM_REQ, id)).await?; | ||||
|     pub async fn get(db: &Surreal<Client>, id: &str) -> Result<Option<Self>, Error> { | ||||
|         let new_vm_req: Option<Self> = db.select((NEW_VM_REQ, id)).await?; | ||||
|         Ok(new_vm_req) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn delete(id: &str) -> Result<(), Error> { | ||||
|         let _: Option<Self> = DB.delete((NEW_VM_REQ, id)).await?; | ||||
|     pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<(), Error> { | ||||
|         let _: Option<Self> = db.delete((NEW_VM_REQ, id)).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn submit_error(id: &str, error: String) -> Result<(), Error> { | ||||
|     pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> { | ||||
|         #[derive(Serialize)] | ||||
|         struct NewVmError { | ||||
|             error: String, | ||||
|         } | ||||
|         let _: Option<Self> = DB.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?; | ||||
|         let _: Option<Self> = db.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn submit(self) -> Result<(), Error> { | ||||
|         let _: Vec<Self> = DB.insert(NEW_VM_REQ).relation(self).await?; | ||||
|     pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> { | ||||
|         let _: Vec<Self> = db.insert(NEW_VM_REQ).relation(self).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| @ -297,8 +319,8 @@ pub enum NewVmResp { | ||||
| } | ||||
| 
 | ||||
| impl NewVmResp { | ||||
|     pub async fn listen(vm_id: &str) -> Result<NewVmResp, Error> { | ||||
|         let mut resp = DB | ||||
|     pub async fn listen(db: &Surreal<Client>, vm_id: &str) -> Result<NewVmResp, Error> { | ||||
|         let mut resp = db | ||||
|             .query(format!("live select * from {NEW_VM_REQ} where id = {NEW_VM_REQ}:{vm_id};")) | ||||
|             .query(format!( | ||||
|                 "live select * from measurement_args where id = measurement_args:{vm_id};" | ||||
| @ -308,6 +330,7 @@ impl NewVmResp { | ||||
|         let mut args_stream = | ||||
|             resp.stream::<Notification<detee_shared::snp::pb::vm_proto::MeasurementArgs>>(1)?; | ||||
| 
 | ||||
|         tokio::time::timeout(Duration::from_secs(10), async { | ||||
|             loop { | ||||
|                 tokio::select! { | ||||
|                     new_vm_req_notif = new_vm_stream.next() => { | ||||
| @ -315,13 +338,8 @@ impl NewVmResp { | ||||
|                         if let Some(new_vm_req_notif) = new_vm_req_notif { | ||||
|                             match new_vm_req_notif { | ||||
|                                 Ok(new_vm_req_notif) => { | ||||
|                                 match new_vm_req_notif.action { | ||||
|                                     surrealdb::Action::Update => { | ||||
|                                         if !new_vm_req_notif.data.error.is_empty() { | ||||
|                                             return Ok(Self::Error(vm_id.to_string(), new_vm_req_notif.data.error)); | ||||
|                                         } | ||||
|                                     }, | ||||
|                                     _ => {} | ||||
|                                     if new_vm_req_notif.action == surrealdb::Action::Update && !new_vm_req_notif.data.error.is_empty() { | ||||
|                                         return Ok::<NewVmResp, Error>(Self::Error(vm_id.to_string(), new_vm_req_notif.data.error)); | ||||
|                                     }; | ||||
|                                 }, | ||||
|                                 Err(e) => return Err(e.into()), | ||||
| @ -332,11 +350,8 @@ impl NewVmResp { | ||||
|                         if let Some(args_notif) = args_notif { | ||||
|                             match args_notif { | ||||
|                                 Ok(args_notif) => { | ||||
|                                 match args_notif.action { | ||||
|                                     surrealdb::Action::Create => { | ||||
|                                     if args_notif.action == surrealdb::Action::Create { | ||||
|                                         return Ok(Self::Args(vm_id.to_string(), args_notif.data)); | ||||
|                                     }, | ||||
|                                     _ => {} | ||||
|                                     }; | ||||
|                                 }, | ||||
|                                 Err(e) => return Err(e.into()), | ||||
| @ -345,6 +360,7 @@ impl NewVmResp { | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         }).await? | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| @ -372,10 +388,11 @@ pub struct ActiveVm { | ||||
| 
 | ||||
| impl ActiveVm { | ||||
|     pub async fn activate( | ||||
|         db: &Surreal<Client>, | ||||
|         id: &str, | ||||
|         args: detee_shared::vm_proto::MeasurementArgs, | ||||
|     ) -> Result<(), Error> { | ||||
|         let new_vm_req = match NewVmReq::get(id).await? { | ||||
|         let new_vm_req = match NewVmReq::get(db, id).await? { | ||||
|             Some(r) => r, | ||||
|             None => return Ok(()), | ||||
|         }; | ||||
| @ -423,9 +440,9 @@ impl ActiveVm { | ||||
|             collected_at: new_vm_req.created_at, | ||||
|         }; | ||||
| 
 | ||||
|         let _: Vec<ActiveVm> = DB.insert(()).relation(active_vm).await?; | ||||
|         let _: Vec<ActiveVm> = db.insert(()).relation(active_vm).await?; | ||||
| 
 | ||||
|         NewVmReq::delete(id).await?; | ||||
|         NewVmReq::delete(db, id).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| @ -452,6 +469,7 @@ pub struct UpdateVmReq { | ||||
| pub async fn listen_for_node< | ||||
|     T: Into<DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>, | ||||
| >( | ||||
|     db: &Surreal<Client>, | ||||
|     node: &str, | ||||
|     tx: Sender<DaemonNotification>, | ||||
| ) -> Result<(), Error> { | ||||
| @ -465,14 +483,15 @@ pub async fn listen_for_node< | ||||
|         } | ||||
|     }; | ||||
|     let mut resp = | ||||
|         DB.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; | ||||
|         db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; | ||||
|     let mut live_stream = resp.stream::<Notification<T>>(0)?; | ||||
|     while let Some(result) = live_stream.next().await { | ||||
|         match result { | ||||
|             Ok(notification) => match notification.action { | ||||
|                 surrealdb::Action::Create => tx.send(notification.data.into()).await?, | ||||
|                 _ => {} | ||||
|             }, | ||||
|             Ok(notification) => { | ||||
|                 if notification.action == surrealdb::Action::Create { | ||||
|                     tx.send(notification.data.into()).await? | ||||
|                 } | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 log::warn!("listen_for_deletion DB stream failed for {node}: {e}"); | ||||
|                 return Err(Error::from(e)); | ||||
| @ -504,28 +523,31 @@ pub struct DeletedVm { | ||||
| } | ||||
| 
 | ||||
| impl DeletedVm { | ||||
|     pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> { | ||||
|     pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> { | ||||
|         let contract: Option<Self> = | ||||
|             DB.query(format!("select * from {DELETED_VM}:{uuid};")).await?.take(0)?; | ||||
|             db.query(format!("select * from {DELETED_VM}:{uuid};")).await?.take(0)?; | ||||
|         Ok(contract) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> { | ||||
|     pub async fn list_by_admin(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> { | ||||
|         let mut result = | ||||
|             DB.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?; | ||||
|             db.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?; | ||||
|         let contracts: Vec<Self> = result.take(0)?; | ||||
|         Ok(contracts) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> { | ||||
|     pub async fn list_by_node(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> { | ||||
|         let mut result = | ||||
|             DB.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?; | ||||
|             db.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?; | ||||
|         let contracts: Vec<Self> = result.take(0)?; | ||||
|         Ok(contracts) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn list_by_operator(operator: &str) -> Result<Vec<Self>, Error> { | ||||
|         let mut result = DB | ||||
|     pub async fn list_by_operator( | ||||
|         db: &Surreal<Client>, | ||||
|         operator: &str, | ||||
|     ) -> Result<Vec<Self>, Error> { | ||||
|         let mut result = db | ||||
|             .query(format!( | ||||
|                 "select
 | ||||
|                 (select * from ->operator->vm_node<-{DELETED_VM}) as contracts | ||||
| @ -603,30 +625,33 @@ pub struct ActiveVmWithNode { | ||||
| } | ||||
| 
 | ||||
| impl ActiveVmWithNode { | ||||
|     pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> { | ||||
|     pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> { | ||||
|         let contract: Option<Self> = | ||||
|             DB.query(format!("select * from {ACTIVE_VM}:{uuid} fetch out;")).await?.take(0)?; | ||||
|             db.query(format!("select * from {ACTIVE_VM}:{uuid} fetch out;")).await?.take(0)?; | ||||
|         Ok(contract) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> { | ||||
|         let mut result = DB | ||||
|     pub async fn list_by_admin(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> { | ||||
|         let mut result = db | ||||
|             .query(format!("select * from {ACTIVE_VM} where in = {ACCOUNT}:{admin} fetch out;")) | ||||
|             .await?; | ||||
|         let contracts: Vec<Self> = result.take(0)?; | ||||
|         Ok(contracts) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> { | ||||
|         let mut result = DB | ||||
|     pub async fn list_by_node(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> { | ||||
|         let mut result = db | ||||
|             .query(format!("select * from {ACTIVE_VM} where out = {VM_NODE}:{admin} fetch out;")) | ||||
|             .await?; | ||||
|         let contracts: Vec<Self> = result.take(0)?; | ||||
|         Ok(contracts) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn list_by_operator(operator: &str) -> Result<Vec<Self>, Error> { | ||||
|         let mut result = DB | ||||
|     pub async fn list_by_operator( | ||||
|         db: &Surreal<Client>, | ||||
|         operator: &str, | ||||
|     ) -> Result<Vec<Self>, Error> { | ||||
|         let mut result = db | ||||
|             .query(format!( | ||||
|                 "select
 | ||||
|                 (select * from ->operator->vm_node<-{ACTIVE_VM} fetch out) as contracts | ||||
| @ -699,7 +724,7 @@ pub struct AppNodeWithReports { | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| pub struct AppContract { | ||||
| pub struct ActiveApp { | ||||
|     id: RecordId, | ||||
|     #[serde(rename = "in")] | ||||
|     admin: RecordId, | ||||
| @ -720,6 +745,36 @@ pub struct AppContract { | ||||
|     hratls_pubkey: String, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| pub struct ActiveAppWithNode { | ||||
|     pub id: RecordId, | ||||
|     #[serde(rename = "in")] | ||||
|     pub admin: RecordId, | ||||
|     #[serde(rename = "out")] | ||||
|     pub app_node: AppNode, | ||||
|     pub app_name: String, | ||||
|     pub mapped_ports: Vec<(u64, u64)>, | ||||
|     pub host_ipv4: String, | ||||
|     pub vcpus: u64, | ||||
|     pub memory_mb: u64, | ||||
|     pub disk_size_gb: u64, | ||||
|     pub created_at: Datetime, | ||||
|     pub price_per_unit: u64, | ||||
|     pub locked_nano: u64, | ||||
|     pub collected_at: Datetime, | ||||
|     pub mr_enclave: String, | ||||
|     pub package_url: String, | ||||
|     pub hratls_pubkey: String, | ||||
| } | ||||
| 
 | ||||
| impl ActiveAppWithNode { | ||||
|     pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> { | ||||
|         let contract: Option<Self> = | ||||
|             db.query(format!("select * from {ACTIVE_APP}:{uuid} fetch out;")).await?.take(0)?; | ||||
|         Ok(contract) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| pub struct Ban { | ||||
|     id: RecordId, | ||||
| @ -750,18 +805,27 @@ pub struct Report { | ||||
|     to_node: RecordId, | ||||
|     created_at: Datetime, | ||||
|     pub reason: String, | ||||
|     pub contract_id: String, | ||||
| } | ||||
| 
 | ||||
| impl Report { | ||||
|     // TODO: test this functionality and remove this comment
 | ||||
|     pub async fn create( | ||||
|         db: &Surreal<Client>, | ||||
|         from_account: RecordId, | ||||
|         to_node: RecordId, | ||||
|         reason: String, | ||||
|         contract_id: String, | ||||
|     ) -> Result<(), Error> { | ||||
|         let _: Vec<Self> = DB | ||||
|         let _: Vec<Self> = db | ||||
|             .insert("report") | ||||
|             .relation(Report { from_account, to_node, created_at: Datetime::default(), reason }) | ||||
|             .relation(Report { | ||||
|                 from_account, | ||||
|                 to_node, | ||||
|                 created_at: Datetime::default(), | ||||
|                 reason, | ||||
|                 contract_id, | ||||
|             }) | ||||
|             .await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| @ -780,27 +844,28 @@ pub struct Operator { | ||||
| } | ||||
| 
 | ||||
| impl Operator { | ||||
|     pub async fn list() -> Result<Vec<Self>, Error> { | ||||
|         let mut result = DB | ||||
|             .query(format!( | ||||
|     pub async fn list(db: &Surreal<Client>) -> Result<Vec<Self>, Error> { | ||||
|         let mut result = db | ||||
|             .query( | ||||
|                 "array::distinct(array::flatten( [
 | ||||
|                     (select operator from vm_node group by operator).operator, | ||||
|                     (select operator from app_node group by operator).operator | ||||
|                 ]));" | ||||
|             )) | ||||
|                     .to_string(), | ||||
|             ) | ||||
|             .await?; | ||||
|         let operator_accounts: Vec<RecordId> = result.take(0)?; | ||||
|         let mut operators: Vec<Self> = Vec::new(); | ||||
|         for account in operator_accounts.iter() { | ||||
|             if let Some(operator) = Self::inspect(&account.key().to_string()).await? { | ||||
|             if let Some(operator) = Self::inspect(db, &account.key().to_string()).await? { | ||||
|                 operators.push(operator); | ||||
|             } | ||||
|         } | ||||
|         Ok(operators) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn inspect(account: &str) -> Result<Option<Self>, Error> { | ||||
|         let mut result = DB | ||||
|     pub async fn inspect(db: &Surreal<Client>, account: &str) -> Result<Option<Self>, Error> { | ||||
|         let mut result = db | ||||
|             .query(format!( | ||||
|                 "$vm_nodes = (select id from vm_node where operator = account:{account}).id;
 | ||||
|                 $app_nodes = (select id from app_node where operator = account:{account}).id; | ||||
| @ -821,10 +886,11 @@ impl Operator { | ||||
|     } | ||||
| 
 | ||||
|     pub async fn inspect_nodes( | ||||
|         db: &Surreal<Client>, | ||||
|         account: &str, | ||||
|     ) -> Result<(Option<Self>, Vec<VmNodeWithReports>, Vec<AppNodeWithReports>), Error> { | ||||
|         let operator = Self::inspect(account).await?; | ||||
|         let mut result = DB | ||||
|         let operator = Self::inspect(db, account).await?; | ||||
|         let mut result = db | ||||
|             .query(format!( | ||||
|                 "select *, operator, <-report.* as reports from vm_node
 | ||||
|                  where operator = account:{account};" | ||||
| @ -875,7 +941,7 @@ impl From<&old_brain::BrainData> for Vec<ActiveVm> { | ||||
|         for old_c in old_data.vm_contracts.iter() { | ||||
|             let mut mapped_ports = Vec::new(); | ||||
|             for port in old_c.exposed_ports.iter() { | ||||
|                 mapped_ports.push((*port, 8080 as u32)); | ||||
|                 mapped_ports.push((*port, 8080u32)); | ||||
|             } | ||||
|             contracts.push(ActiveVm { | ||||
|                 id: RecordId::from((ACTIVE_VM, old_c.uuid.replace("-", ""))), | ||||
|  | ||||
							
								
								
									
										116
									
								
								src/grpc.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										116
									
								
								src/grpc.rs
									
									
									
									
									
								
							| @ -15,16 +15,26 @@ use detee_shared::{ | ||||
|     }, | ||||
| }; | ||||
| use nanoid::nanoid; | ||||
| use surrealdb::{engine::remote::ws::Client, Surreal}; | ||||
| 
 | ||||
| use log::info; | ||||
| use std::pin::Pin; | ||||
| use std::sync::Arc; | ||||
| use surrealdb::RecordId; | ||||
| use tokio::sync::mpsc; | ||||
| use tokio_stream::wrappers::ReceiverStream; | ||||
| use tokio_stream::{Stream, StreamExt}; | ||||
| use tonic::{Request, Response, Status, Streaming}; | ||||
| 
 | ||||
| pub struct BrainGeneralCliForReal {} | ||||
| pub struct BrainGeneralCliForReal { | ||||
|     pub db: Arc<Surreal<Client>>, | ||||
| } | ||||
| 
 | ||||
| impl BrainGeneralCliForReal { | ||||
|     pub fn new(db: Arc<Surreal<Client>>) -> Self { | ||||
|         Self { db } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<db::Account> for AccountBalance { | ||||
|     fn from(account: db::Account) -> Self { | ||||
| @ -239,7 +249,15 @@ impl From<VmNodeResources> for db::VmNodeResources { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub struct BrainVmDaemonForReal {} | ||||
| pub struct BrainVmDaemonForReal { | ||||
|     pub db: Arc<Surreal<Client>>, | ||||
| } | ||||
| 
 | ||||
| impl BrainVmDaemonForReal { | ||||
|     pub fn new(db: Arc<Surreal<Client>>) -> Self { | ||||
|         Self { db } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[tonic::async_trait] | ||||
| impl BrainVmDaemon for BrainVmDaemonForReal { | ||||
| @ -267,11 +285,11 @@ impl BrainVmDaemon for BrainVmDaemonForReal { | ||||
|             max_ports_per_vm: 0, | ||||
|             offline_minutes: 0, | ||||
|         } | ||||
|         .register() | ||||
|         .register(&self.db) | ||||
|         .await?; | ||||
| 
 | ||||
|         info!("Sending existing contracts to {}", req.node_pubkey); | ||||
|         let contracts = db::ActiveVmWithNode::list_by_node(&req.node_pubkey).await?; | ||||
|         let contracts = db::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?; | ||||
|         let (tx, rx) = mpsc::channel(6); | ||||
|         tokio::spawn(async move { | ||||
|             for contract in contracts { | ||||
| @ -299,10 +317,11 @@ impl BrainVmDaemon for BrainVmDaemonForReal { | ||||
| 
 | ||||
|         let (tx, rx) = mpsc::channel(6); | ||||
|         { | ||||
|             let db = self.db.clone(); | ||||
|             let pubkey = pubkey.clone(); | ||||
|             let tx = tx.clone(); | ||||
|             tokio::spawn(async move { | ||||
|                 match db::listen_for_node::<db::DeletedVm>(&pubkey, tx).await { | ||||
|                 match db::listen_for_node::<db::DeletedVm>(&db, &pubkey, tx).await { | ||||
|                     Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"), | ||||
|                     Err(e) => { | ||||
|                         log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}") | ||||
| @ -311,17 +330,19 @@ impl BrainVmDaemon for BrainVmDaemonForReal { | ||||
|             }); | ||||
|         } | ||||
|         { | ||||
|             let db = self.db.clone(); | ||||
|             let pubkey = pubkey.clone(); | ||||
|             let tx = tx.clone(); | ||||
|             tokio::spawn(async move { | ||||
|                 let _ = db::listen_for_node::<db::NewVmReq>(&pubkey, tx.clone()).await; | ||||
|                 let _ = db::listen_for_node::<db::NewVmReq>(&db, &pubkey, tx.clone()).await; | ||||
|             }); | ||||
|         } | ||||
|         { | ||||
|             let db = self.db.clone(); | ||||
|             let pubkey = pubkey.clone(); | ||||
|             let tx = tx.clone(); | ||||
|             tokio::spawn(async move { | ||||
|                 let _ = db::listen_for_node::<db::UpdateVmReq>(&pubkey, tx.clone()).await; | ||||
|                 let _ = db::listen_for_node::<db::UpdateVmReq>(&db, &pubkey, tx.clone()).await; | ||||
|             }); | ||||
|         } | ||||
| 
 | ||||
| @ -361,27 +382,32 @@ impl BrainVmDaemon for BrainVmDaemonForReal { | ||||
|                         // TODO: move new_vm_req to active_vm
 | ||||
|                         // also handle failure properly
 | ||||
|                         if !new_vm_resp.error.is_empty() { | ||||
|                             db::NewVmReq::submit_error(&new_vm_resp.uuid, new_vm_resp.error) | ||||
|                             db::NewVmReq::submit_error( | ||||
|                                 &self.db, | ||||
|                                 &new_vm_resp.uuid, | ||||
|                                 new_vm_resp.error, | ||||
|                             ) | ||||
|                             .await?; | ||||
|                         } else { | ||||
|                             db::upsert_record( | ||||
|                                 &self.db, | ||||
|                                 "measurement_args", | ||||
|                                 &new_vm_resp.uuid, | ||||
|                                 new_vm_resp.args.clone(), | ||||
|                             ) | ||||
|                             .await?; | ||||
|                             if let Some(args) = new_vm_resp.args { | ||||
|                                 db::ActiveVm::activate(&new_vm_resp.uuid, args).await?; | ||||
|                                 db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?; | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                     Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { | ||||
|                     Some(vm_daemon_message::Msg::UpdateVmResp(_update_vm_resp)) => { | ||||
|                         todo!(); | ||||
|                         // self.data.submit_updatevm_resp(update_vm_resp).await;
 | ||||
|                     } | ||||
|                     Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { | ||||
|                         let node_resources: db::VmNodeResources = node_resources.into(); | ||||
|                         node_resources.merge(&pubkey).await?; | ||||
|                         node_resources.merge(&self.db, &pubkey).await?; | ||||
|                     } | ||||
|                     _ => {} | ||||
|                 }, | ||||
| @ -405,24 +431,32 @@ impl BrainGeneralCli for BrainGeneralCliForReal { | ||||
| 
 | ||||
|     async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> { | ||||
|         let req = check_sig_from_req(req)?; | ||||
|         Ok(Response::new(db::Account::get(&req.pubkey).await?.into())) | ||||
|         Ok(Response::new(db::Account::get(&self.db, &req.pubkey).await?.into())) | ||||
|     } | ||||
| 
 | ||||
|     async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> { | ||||
|         let req = check_sig_from_req(req)?; | ||||
|         let (account, node) = match db::ActiveVmWithNode::get_by_uuid(&req.contract).await? { | ||||
|         let (account, node, contract_id) = | ||||
|             match db::ActiveVmWithNode::get_by_uuid(&self.db, &req.contract).await? { | ||||
|                 Some(vm_contract) | ||||
|                     if vm_contract.admin.key().to_string() == req.admin_pubkey | ||||
|                         && vm_contract.vm_node.id.key().to_string() == req.node_pubkey => | ||||
|                 { | ||||
|                 (vm_contract.admin, vm_contract.vm_node.id) | ||||
|                     (vm_contract.admin, vm_contract.vm_node.id, vm_contract.id.to_string()) | ||||
|                 } | ||||
|                 _ => match db::ActiveAppWithNode::get_by_uuid(&self.db, &req.contract).await? { | ||||
|                     Some(app_contract) | ||||
|                         if app_contract.admin.key().to_string() == req.admin_pubkey | ||||
|                             && app_contract.app_node.id.key().to_string() == req.node_pubkey => | ||||
|                     { | ||||
|                         (app_contract.admin, app_contract.app_node.id, app_contract.id.to_string()) | ||||
|                     } | ||||
|                     _ => { | ||||
|                 // TODO: Hey, Noor! Please add app contract here.
 | ||||
|                         return Err(Status::unauthenticated("No contract found by this ID.")); | ||||
|                     } | ||||
|                 }, | ||||
|             }; | ||||
|         db::Report::create(account, node, req.reason).await?; | ||||
|         db::Report::create(&self.db, account, node, req.reason, contract_id).await?; | ||||
|         Ok(Response::new(Empty {})) | ||||
|     } | ||||
| 
 | ||||
| @ -431,7 +465,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal { | ||||
|         req: Request<Empty>, | ||||
|     ) -> Result<Response<Self::ListOperatorsStream>, Status> { | ||||
|         let _ = check_sig_from_req(req)?; | ||||
|         let operators = db::Operator::list().await?; | ||||
|         let operators = db::Operator::list(&self.db).await?; | ||||
|         let (tx, rx) = mpsc::channel(6); | ||||
|         tokio::spawn(async move { | ||||
|             for op in operators { | ||||
| @ -446,7 +480,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal { | ||||
|         &self, | ||||
|         req: Request<Pubkey>, | ||||
|     ) -> Result<Response<InspectOperatorResp>, Status> { | ||||
|         match db::Operator::inspect_nodes(&req.into_inner().pubkey).await? { | ||||
|         match db::Operator::inspect_nodes(&self.db, &req.into_inner().pubkey).await? { | ||||
|             (Some(op), vm_nodes, app_nodes) => Ok(Response::new(InspectOperatorResp { | ||||
|                 operator: Some(op.into()), | ||||
|                 vm_nodes: vm_nodes.into_iter().map(|n| n.into()).collect(), | ||||
| @ -490,7 +524,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal { | ||||
|     async fn airdrop(&self, req: Request<AirdropReq>) -> Result<Response<Empty>, Status> { | ||||
|         check_admin_key(&req)?; | ||||
|         let req = check_sig_from_req(req)?; | ||||
|         db::Account::airdrop(&req.pubkey, req.tokens).await?; | ||||
|         db::Account::airdrop(&self.db, &req.pubkey, req.tokens).await?; | ||||
|         Ok(Response::new(Empty {})) | ||||
|     } | ||||
| 
 | ||||
| @ -557,7 +591,15 @@ impl BrainGeneralCli for BrainGeneralCliForReal { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub struct BrainVmCliForReal {} | ||||
| pub struct BrainVmCliForReal { | ||||
|     pub db: Arc<Surreal<Client>>, | ||||
| } | ||||
| 
 | ||||
| impl BrainVmCliForReal { | ||||
|     pub fn new(db: Arc<Surreal<Client>>) -> Self { | ||||
|         Self { db } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[tonic::async_trait] | ||||
| impl BrainVmCli for BrainVmCliForReal { | ||||
| @ -567,19 +609,23 @@ impl BrainVmCli for BrainVmCliForReal { | ||||
|     async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> { | ||||
|         let req = check_sig_from_req(req)?; | ||||
|         info!("New VM requested via CLI: {req:?}"); | ||||
|         if db::Account::is_banned_by_node(&req.admin_pubkey, &req.node_pubkey).await? { | ||||
|         if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { | ||||
|             return Err(Status::permission_denied("This operator banned you. What did you do?")); | ||||
|         } | ||||
| 
 | ||||
|         let new_vm_req: db::NewVmReq = req.into(); | ||||
|         let id = new_vm_req.id.key().to_string(); | ||||
| 
 | ||||
|         let db = self.db.clone(); | ||||
| 
 | ||||
|         let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); | ||||
|         tokio::spawn(async move { | ||||
|             let _ = oneshot_tx.send(db::NewVmResp::listen(&id).await); | ||||
|             let _ = oneshot_tx.send(db::NewVmResp::listen(&db, &id).await); | ||||
|         }); | ||||
|         new_vm_req.submit().await?; | ||||
|         new_vm_req.submit(&self.db).await?; | ||||
| 
 | ||||
|         match oneshot_rx.await { | ||||
|             Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded("Request failed due to timeout. Please try again later or contact the DeTEE devs team.")), | ||||
|             Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), | ||||
|             Err(e) => { | ||||
|                 log::error!("Something weird happened. Reached error {e:?}"); | ||||
| @ -639,20 +685,20 @@ impl BrainVmCli for BrainVmCliForReal { | ||||
|         ); | ||||
|         let mut contracts = Vec::new(); | ||||
|         if !req.uuid.is_empty() { | ||||
|             if let Some(specific_contract) = db::ActiveVmWithNode::get_by_uuid(&req.uuid).await? { | ||||
|             if let Some(specific_contract) = | ||||
|                 db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? | ||||
|             { | ||||
|                 if specific_contract.admin.key().to_string() == req.wallet { | ||||
|                     contracts.push(specific_contract.into()); | ||||
|                     contracts.push(specific_contract); | ||||
|                 } | ||||
|                 // TODO: allow operator to inspect contracts
 | ||||
|             } | ||||
|         } else { | ||||
|             if req.as_operator { | ||||
|         } else if req.as_operator { | ||||
|             contracts | ||||
|                     .append(&mut db::ActiveVmWithNode::list_by_operator(&req.wallet).await?.into()); | ||||
|                 .append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?); | ||||
|         } else { | ||||
|             contracts | ||||
|                     .append(&mut db::ActiveVmWithNode::list_by_admin(&req.wallet).await?.into()); | ||||
|             } | ||||
|                 .append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?); | ||||
|         } | ||||
|         let (tx, rx) = mpsc::channel(6); | ||||
|         tokio::spawn(async move { | ||||
| @ -670,7 +716,7 @@ impl BrainVmCli for BrainVmCliForReal { | ||||
|     ) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> { | ||||
|         let req = check_sig_from_req(req)?; | ||||
|         info!("CLI requested ListVmNodesStream: {req:?}"); | ||||
|         let nodes = db::VmNodeWithReports::find_by_filters(req).await?; | ||||
|         let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; | ||||
|         let (tx, rx) = mpsc::channel(6); | ||||
|         tokio::spawn(async move { | ||||
|             for node in nodes { | ||||
| @ -688,7 +734,7 @@ impl BrainVmCli for BrainVmCliForReal { | ||||
|         let req = check_sig_from_req(req)?; | ||||
|         info!("Unknown CLI requested ListVmNodesStream: {req:?}"); | ||||
|         // TODO: optimize this query so that it gets only one node
 | ||||
|         let nodes = db::VmNodeWithReports::find_by_filters(req).await?; | ||||
|         let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; | ||||
|         if let Some(node) = nodes.into_iter().next() { | ||||
|             return Ok(Response::new(node.into())); | ||||
|         } | ||||
| @ -754,7 +800,7 @@ fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Res | ||||
|     let parsed_time = chrono::DateTime::parse_from_rfc3339(time) | ||||
|         .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; | ||||
|     let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); | ||||
|     if seconds_elapsed > 4 || seconds_elapsed < -4 { | ||||
|     if !(-4..=4).contains(&seconds_elapsed) { | ||||
|         return Err(Status::unauthenticated(format!( | ||||
|             "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", | ||||
|             parsed_time, now | ||||
| @ -795,7 +841,7 @@ fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Res | ||||
|         .verify(message.as_bytes(), &signature) | ||||
|         .map_err(|_| Status::unauthenticated("the signature is not valid"))?; | ||||
|     if let Some(req_pubkey) = req.get_pubkey() { | ||||
|         if pubkey_value.to_str().unwrap().to_string() != req_pubkey { | ||||
|         if *pubkey_value.to_str().unwrap() != req_pubkey { | ||||
|             return Err(Status::unauthenticated( | ||||
|                 "pubkey of signature does not match pubkey of request", | ||||
|             )); | ||||
| @ -809,7 +855,7 @@ fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Resul | ||||
|     let parsed_time = chrono::DateTime::parse_from_rfc3339(time) | ||||
|         .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; | ||||
|     let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); | ||||
|     if seconds_elapsed > 4 || seconds_elapsed < -4 { | ||||
|     if !(-4..=4).contains(&seconds_elapsed) { | ||||
|         return Err(Status::unauthenticated(format!( | ||||
|             "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", | ||||
|             parsed_time, now | ||||
|  | ||||
| @ -1,2 +1,8 @@ | ||||
| #[allow(dead_code)] | ||||
| pub mod prepare_test_env; | ||||
| #[allow(dead_code)] | ||||
| pub mod test_utils; | ||||
| #[allow(dead_code)] | ||||
| pub mod vm_cli_utils; | ||||
| #[allow(dead_code)] | ||||
| pub mod vm_daemon_utils; | ||||
|  | ||||
| @ -3,47 +3,64 @@ use detee_shared::{ | ||||
|     general_proto::brain_general_cli_server::BrainGeneralCliServer, | ||||
|     vm_proto::brain_vm_daemon_server::BrainVmDaemonServer, | ||||
| }; | ||||
| use dotenv::dotenv; | ||||
| use hyper_util::rt::TokioIo; | ||||
| use std::net::SocketAddr; | ||||
| use std::sync::Arc; | ||||
| use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmCliForReal, BrainVmDaemonForReal}; | ||||
| use surrealdb::engine::remote::ws::Client; | ||||
| use surrealdb::Surreal; | ||||
| use tokio::io::DuplexStream; | ||||
| use tokio::{net::TcpListener, sync::OnceCell}; | ||||
| use tonic::transport::{Channel, Endpoint, Server, Uri}; | ||||
| use tower::service_fn; | ||||
| 
 | ||||
| pub const DB_URL: &str = "localhost:8000"; | ||||
| pub const DB_NS: &str = "test_brain"; | ||||
| pub const DB_NAME: &str = "test_migration_db"; | ||||
| 
 | ||||
| pub static DB_STATE: OnceCell<()> = OnceCell::const_new(); | ||||
| 
 | ||||
| pub async fn prepare_test_db() { | ||||
|     DB_STATE | ||||
|         .get_or_init(|| async { | ||||
|             surreal_brain::db::init(DB_URL, DB_NS, DB_NAME) | ||||
|                 .await | ||||
|                 .expect("Failed to initialize the database"); | ||||
| pub async fn prepare_test_db() -> Surreal<Client> { | ||||
|     dotenv().ok(); | ||||
| 
 | ||||
|             let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); | ||||
|             surreal_brain::db::DB.query(format!("REMOVE DATABASE {DB_NAME}")).await.unwrap(); | ||||
|             surreal_brain::db::DB | ||||
|                 .query(std::fs::read_to_string("interim_tables.surql").unwrap()) | ||||
|     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||
|     let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); | ||||
|     let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env"); | ||||
|     let db_ns = "test_brain"; | ||||
|     let db_name = "test_migration_db"; | ||||
| 
 | ||||
|     let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) | ||||
|         .await | ||||
|         .unwrap(); | ||||
|             surreal_brain::db::migration0(&old_brain_data).await.unwrap(); | ||||
|     DB_STATE | ||||
|         .get_or_init(|| async { | ||||
|             let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); | ||||
|             db.query(format!("REMOVE DATABASE {db_name}")).await.unwrap(); | ||||
|             db.query(std::fs::read_to_string("interim_tables.surql").unwrap()).await.unwrap(); | ||||
|             surreal_brain::db::migration0(&db, &old_brain_data).await.unwrap(); | ||||
|         }) | ||||
|         .await; | ||||
|     db | ||||
| } | ||||
| 
 | ||||
| pub async fn run_service_in_background() -> SocketAddr { | ||||
|     dotenv().ok(); | ||||
|     let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||||
|     let addr = listener.local_addr().unwrap(); | ||||
| 
 | ||||
|     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||
|     let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); | ||||
|     let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env"); | ||||
|     let db_ns = "test_brain"; | ||||
|     let db_name = "test_migration_db"; | ||||
| 
 | ||||
|     tokio::spawn(async move { | ||||
|         let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) | ||||
|             .await | ||||
|             .unwrap(); | ||||
|         let db_arc = Arc::new(db); | ||||
| 
 | ||||
|         Server::builder() | ||||
|             .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal {})) | ||||
|             .add_service(BrainVmCliServer::new(BrainVmCliForReal {})) | ||||
|             .add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal {})) | ||||
|             .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone()))) | ||||
|             .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) | ||||
|             .await | ||||
|             .unwrap(); | ||||
| @ -54,46 +71,45 @@ pub async fn run_service_in_background() -> SocketAddr { | ||||
|     addr | ||||
| } | ||||
| 
 | ||||
| pub async fn run_service_for_stream_server() -> (DuplexStream, SocketAddr) { | ||||
| pub async fn run_service_for_stream_server() -> DuplexStream { | ||||
|     dotenv().ok(); | ||||
|     let (client, server) = tokio::io::duplex(1024); | ||||
| 
 | ||||
|     let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||||
|     let addr = listener.local_addr().unwrap(); | ||||
|     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||
|     let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); | ||||
|     let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env"); | ||||
|     let db_ns = "test_brain"; | ||||
|     let db_name = "test_migration_db"; | ||||
| 
 | ||||
|     tokio::spawn(async move { | ||||
|         let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) | ||||
|             .await | ||||
|             .unwrap(); | ||||
|         let db_arc = Arc::new(db); | ||||
| 
 | ||||
|         tonic::transport::Server::builder() | ||||
|             .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal {})) | ||||
|             .add_service(BrainVmCliServer::new(BrainVmCliForReal {})) | ||||
|             .add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal {})) | ||||
|             .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone()))) | ||||
|             .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) | ||||
|             .await | ||||
|     }); | ||||
|     (client, addr) | ||||
|     client | ||||
| } | ||||
| 
 | ||||
| pub async fn connect_stream_client_channel(c_stream: DuplexStream, addr: SocketAddr) -> Channel { | ||||
|     let url = format!("http://{}", addr); | ||||
| pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Channel { | ||||
|     let mut client = Some(c_stream); | ||||
| 
 | ||||
|     Endpoint::try_from(url) | ||||
|         .unwrap() | ||||
|     Endpoint::from_static("http://127.0.0.1:0") | ||||
|         .connect_with_connector(service_fn(move |_: Uri| { | ||||
|             let client = client.take(); | ||||
| 
 | ||||
|             async move { | ||||
|                 if let Some(client) = client { | ||||
|                     Ok(TokioIo::new(client)) | ||||
|                 } else { | ||||
|                     Err(std::io::Error::new(std::io::ErrorKind::Other, "Client already taken")) | ||||
|                 } | ||||
|             } | ||||
|             let client = client.take().unwrap(); | ||||
|             async move { Ok::<TokioIo<DuplexStream>, std::io::Error>(TokioIo::new(client)) } | ||||
|         })) | ||||
|         .await | ||||
|         .unwrap() | ||||
| } | ||||
| 
 | ||||
| #[allow(dead_code)] | ||||
| pub async fn run_service_for_stream() -> Channel { | ||||
|     let (client, addr) = run_service_for_stream_server().await; | ||||
|     connect_stream_client_channel(client, addr).await | ||||
|     let client = run_service_for_stream_server().await; | ||||
|     connect_stream_client_channel(client).await | ||||
| } | ||||
|  | ||||
| @ -38,7 +38,6 @@ impl Key { | ||||
|         Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) | ||||
|     } | ||||
| 
 | ||||
|     #[allow(dead_code)] | ||||
|     pub fn sign_stream_auth(&self, contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> { | ||||
|         let pubkey = self.pubkey.clone(); | ||||
|         let timestamp = chrono::Utc::now().to_rfc3339(); | ||||
|  | ||||
							
								
								
									
										47
									
								
								tests/common/vm_cli_utils.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										47
									
								
								tests/common/vm_cli_utils.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,47 @@ | ||||
| use super::test_utils::Key; | ||||
| use detee_shared::vm_proto; | ||||
| use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||
| use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; | ||||
| use surreal_brain::db; | ||||
| use surrealdb::engine::remote::ws::Client; | ||||
| use surrealdb::Surreal; | ||||
| use tonic::transport::Channel; | ||||
| 
 | ||||
| pub async fn create_new_vm( | ||||
|     db: &Surreal<Client>, | ||||
|     key: Key, | ||||
|     node_pubkey: String, | ||||
|     brain_channel: Channel, | ||||
| ) -> String { | ||||
|     let new_vm_req = vm_proto::NewVmReq { | ||||
|         admin_pubkey: key.pubkey.clone(), | ||||
|         node_pubkey, | ||||
|         price_per_unit: 1200, | ||||
|         extra_ports: vec![8080, 8081], | ||||
|         locked_nano: 0, | ||||
|         ..Default::default() | ||||
|     }; | ||||
| 
 | ||||
|     let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); | ||||
|     let new_vm_resp = | ||||
|         client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.unwrap().into_inner(); | ||||
| 
 | ||||
|     assert!(new_vm_resp.error.is_empty()); | ||||
|     assert!(new_vm_resp.uuid.len() == 40); | ||||
| 
 | ||||
|     // wait for update db
 | ||||
|     tokio::time::sleep(tokio::time::Duration::from_millis(700)).await; | ||||
| 
 | ||||
|     let vm_req_db: Option<db::NewVmReq> = | ||||
|         db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap(); | ||||
| 
 | ||||
|     if let Some(new_vm_req) = vm_req_db { | ||||
|         panic!("New VM request found in DB: {:?}", new_vm_req); | ||||
|     } | ||||
| 
 | ||||
|     let active_vm_op: Option<db::ActiveVm> = | ||||
|         db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap(); | ||||
|     let active_vm = active_vm_op.unwrap(); | ||||
| 
 | ||||
|     active_vm.id.key().to_string() | ||||
| } | ||||
							
								
								
									
										134
									
								
								tests/common/vm_daemon_utils.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										134
									
								
								tests/common/vm_daemon_utils.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,134 @@ | ||||
| use super::test_utils::Key; | ||||
| use detee_shared::vm_proto; | ||||
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||
| use detee_shared::vm_proto::RegisterVmNodeReq; | ||||
| use futures::StreamExt; | ||||
| use tokio::sync::mpsc; | ||||
| use tokio_stream::wrappers::ReceiverStream; | ||||
| use tonic::transport::Channel; | ||||
| 
 | ||||
| pub async fn mock_vm_daemon(brain_channel: Channel) -> String { | ||||
|     let daemon_client = BrainVmDaemonClient::new(brain_channel); | ||||
|     let daemon_key = Key::new(); | ||||
| 
 | ||||
|     register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; | ||||
| 
 | ||||
|     let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); | ||||
| 
 | ||||
|     tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); | ||||
| 
 | ||||
|     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); | ||||
|     tokio::spawn(daemon_msg_sender( | ||||
|         daemon_client.clone(), | ||||
|         daemon_key.clone(), | ||||
|         daemon_msg_tx.clone(), | ||||
|         rx, | ||||
|     )); | ||||
| 
 | ||||
|     tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx)); | ||||
| 
 | ||||
|     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||||
| 
 | ||||
|     daemon_key.pubkey | ||||
| } | ||||
| 
 | ||||
| pub async fn register_vm_node( | ||||
|     mut client: BrainVmDaemonClient<Channel>, | ||||
|     key: Key, | ||||
|     operator_wallet: String, | ||||
| ) -> Vec<vm_proto::VmContract> { | ||||
|     let node_pubkey = key.pubkey.clone(); | ||||
| 
 | ||||
|     let req = RegisterVmNodeReq { | ||||
|         node_pubkey, | ||||
|         operator_wallet, | ||||
|         main_ip: String::from("185.243.218.213"), | ||||
|         city: String::from("Oslo"), | ||||
|         country: String::from("Norway"), | ||||
|         region: String::from("EU"), | ||||
|         price: 1200, | ||||
|     }; | ||||
| 
 | ||||
|     let mut grpc_stream = | ||||
|         client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner(); | ||||
| 
 | ||||
|     let mut vm_contracts = Vec::new(); | ||||
|     while let Some(stream_update) = grpc_stream.next().await { | ||||
|         match stream_update { | ||||
|             Ok(vm_c) => { | ||||
|                 vm_contracts.push(vm_c); | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 panic!("Received error instead of vm_contracts: {e:?}"); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|     vm_contracts | ||||
| } | ||||
| 
 | ||||
| pub async fn daemon_listener( | ||||
|     mut client: BrainVmDaemonClient<Channel>, | ||||
|     key: Key, | ||||
|     tx: mpsc::Sender<vm_proto::BrainVmMessage>, | ||||
| ) { | ||||
|     let mut grpc_stream = | ||||
|         client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); | ||||
| 
 | ||||
|     while let Some(Ok(stream_update)) = grpc_stream.next().await { | ||||
|         log::info!("vm deamon got notified: {:?}", &stream_update); | ||||
|         let _ = tx.send(stream_update).await; | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub async fn daemon_msg_sender( | ||||
|     mut client: BrainVmDaemonClient<Channel>, | ||||
|     key: Key, | ||||
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, | ||||
|     rx: mpsc::Receiver<vm_proto::VmDaemonMessage>, | ||||
| ) { | ||||
|     let rx_stream = ReceiverStream::new(rx); | ||||
|     tx.send(vm_proto::VmDaemonMessage { | ||||
|         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), | ||||
|     }) | ||||
|     .await | ||||
|     .unwrap(); | ||||
|     client.daemon_messages(rx_stream).await.unwrap(); | ||||
| } | ||||
| 
 | ||||
| pub async fn daemon_engine( | ||||
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, | ||||
|     mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>, | ||||
| ) { | ||||
|     while let Some(brain_msg) = rx.recv().await { | ||||
|         match brain_msg.msg { | ||||
|             Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { | ||||
|                 let args = Some(vm_proto::MeasurementArgs { | ||||
|                     dtrfs_api_endpoint: String::from("184.107.169.199:48865"), | ||||
|                     exposed_ports: new_vm_req.extra_ports, | ||||
|                     ovmf_hash: String::from( | ||||
|                         "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", | ||||
|                     ), | ||||
|                     ips: vec![], | ||||
|                 }); | ||||
| 
 | ||||
|                 let new_vm_resp = vm_proto::NewVmResp { | ||||
|                     uuid: new_vm_req.uuid.clone(), | ||||
|                     args, | ||||
|                     error: String::new(), | ||||
|                 }; | ||||
| 
 | ||||
|                 let res_data = vm_proto::VmDaemonMessage { | ||||
|                     msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), | ||||
|                 }; | ||||
|                 tx.send(res_data).await.unwrap(); | ||||
|             } | ||||
|             Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { | ||||
|                 todo!() | ||||
|             } | ||||
|             Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { | ||||
|                 todo!() | ||||
|             } | ||||
|             None => todo!(), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @ -1,6 +1,8 @@ | ||||
| use common::{ | ||||
|     prepare_test_env::{prepare_test_db, run_service_for_stream, run_service_in_background}, | ||||
|     test_utils::Key, | ||||
|     vm_cli_utils::create_new_vm, | ||||
|     vm_daemon_utils::mock_vm_daemon, | ||||
| }; | ||||
| use detee_shared::common_proto::Empty; | ||||
| use detee_shared::general_proto::ReportNodeReq; | ||||
| @ -10,6 +12,8 @@ use detee_shared::{ | ||||
|     common_proto::Pubkey, general_proto::brain_general_cli_client::BrainGeneralCliClient, | ||||
| }; | ||||
| use futures::StreamExt; | ||||
| use surreal_brain::constants::VM_NODE; | ||||
| use surreal_brain::db::VmNodeWithReports; | ||||
| 
 | ||||
| mod common; | ||||
| 
 | ||||
| @ -34,29 +38,70 @@ async fn test_general_balance() { | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_report_node() { | ||||
|     prepare_test_db().await; | ||||
| async fn test_vm_creation() { | ||||
|     let db = prepare_test_db().await; | ||||
| 
 | ||||
|     let addr = run_service_in_background().await; | ||||
|     let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); | ||||
|     let brain_channel = run_service_for_stream().await; | ||||
|     let daemon_key = mock_vm_daemon(brain_channel.clone()).await; | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
| 
 | ||||
|     let _ = create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await; | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_report_node() { | ||||
|     let db = prepare_test_db().await; | ||||
| 
 | ||||
|     let brain_channel = run_service_for_stream().await; | ||||
|     let daemon_key = mock_vm_daemon(brain_channel.clone()).await; | ||||
|     let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone()); | ||||
| 
 | ||||
|     let key = Key::new(); | ||||
|     let pubkey = key.pubkey.clone(); | ||||
| 
 | ||||
|     // TODO: create contract, node and operator in db and use it here
 | ||||
|     let req_data = ReportNodeReq { | ||||
|         admin_pubkey: pubkey, | ||||
|         node_pubkey: String::from("node_pubkey"), | ||||
|     let report_req = ReportNodeReq { | ||||
|         admin_pubkey: pubkey.clone(), | ||||
|         node_pubkey: daemon_key.clone(), | ||||
|         contract: String::from("uuid"), | ||||
|         reason: String::from("reason"), | ||||
|     }; | ||||
| 
 | ||||
|     let report_error = client.report_node(key.sign_request(req_data).unwrap()).await.err().unwrap(); | ||||
|     let report_error = | ||||
|         client_gen_cli.report_node(key.sign_request(report_req).unwrap()).await.err().unwrap(); | ||||
| 
 | ||||
|     println!("Report error: {:?}", report_error); | ||||
|     assert_eq!(report_error.message(), "No contract found by this ID."); | ||||
| 
 | ||||
|     // verify report in db
 | ||||
|     let active_vm_id = | ||||
|         create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await; | ||||
| 
 | ||||
|     let reason = String::from("something went wrong on vm"); | ||||
|     let report_req = ReportNodeReq { | ||||
|         admin_pubkey: pubkey, | ||||
|         node_pubkey: daemon_key.clone(), | ||||
|         contract: active_vm_id, | ||||
|         reason: reason.clone(), | ||||
|     }; | ||||
| 
 | ||||
|     let _ = client_gen_cli | ||||
|         .report_node(key.sign_request(report_req).unwrap()) | ||||
|         .await | ||||
|         .unwrap() | ||||
|         .into_inner(); | ||||
| 
 | ||||
|     let vm_nodes: Vec<VmNodeWithReports> = db | ||||
|         .query(format!( | ||||
|             "SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};" | ||||
|         )) | ||||
|         .await | ||||
|         .unwrap() | ||||
|         .take(0) | ||||
|         .unwrap(); | ||||
| 
 | ||||
|     let vm_node_with_report = vm_nodes.get(0).unwrap(); | ||||
| 
 | ||||
|     assert!(vm_node_with_report.reports[0].reason == reason); | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
|  | ||||
| @ -1,17 +1,11 @@ | ||||
| use common::{ | ||||
|     prepare_test_env::{ | ||||
|         connect_stream_client_channel, prepare_test_db, run_service_for_stream_server, | ||||
|         run_service_in_background, | ||||
|     }, | ||||
|     prepare_test_env::{prepare_test_db, run_service_for_stream, run_service_in_background}, | ||||
|     test_utils::Key, | ||||
|     vm_daemon_utils::{mock_vm_daemon, register_vm_node}, | ||||
| }; | ||||
| use detee_shared::vm_proto; | ||||
| use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||
| use detee_shared::vm_proto::RegisterVmNodeReq; | ||||
| use futures::StreamExt; | ||||
| use tokio::task::JoinSet; | ||||
| use tokio_stream::wrappers::ReceiverStream; | ||||
| 
 | ||||
| mod common; | ||||
| 
 | ||||
| @ -31,144 +25,20 @@ async fn test_reg_vm_node() { | ||||
|     assert!(vm_contracts.is_empty()) | ||||
| } | ||||
| 
 | ||||
| async fn register_vm_node( | ||||
|     mut client: BrainVmDaemonClient<tonic::transport::Channel>, | ||||
|     key: Key, | ||||
|     operator_wallet: String, | ||||
| ) -> Vec<vm_proto::VmContract> { | ||||
|     let node_pubkey = key.pubkey.clone(); | ||||
| 
 | ||||
|     let req = RegisterVmNodeReq { | ||||
|         node_pubkey, | ||||
|         operator_wallet, | ||||
|         main_ip: String::from("185.243.218.213"), | ||||
|         city: String::from("Oslo"), | ||||
|         country: String::from("Norway"), | ||||
|         region: String::from("EU"), | ||||
|         price: 1200, | ||||
|     }; | ||||
| 
 | ||||
|     let mut grpc_stream = | ||||
|         client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner(); | ||||
| 
 | ||||
|     let mut vm_contracts = Vec::new(); | ||||
|     while let Some(stream_update) = grpc_stream.next().await { | ||||
|         match stream_update { | ||||
|             Ok(vm_c) => { | ||||
|                 vm_contracts.push(vm_c); | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 panic!("Received error instead of vm_contracts: {e:?}"); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|     vm_contracts | ||||
| } | ||||
| 
 | ||||
| #[tokio::test] | ||||
| async fn test_brain_message() { | ||||
|     // spawn grpc stream server
 | ||||
|     // mock a daemon
 | ||||
|     // mock a cli client to interact with brain
 | ||||
| 
 | ||||
|     // validate if something happening in "surreal_brain::db::NewVmReq", "surreal_brain::db::UpdateVmReq", "surreal_brain::db::DeletedVm" these table
 | ||||
|     // mock daemon will responde to brain
 | ||||
| 
 | ||||
|     env_logger::builder().filter_level(log::LevelFilter::Info).init(); | ||||
|     let _ = prepare_test_db().await; | ||||
|     let db = prepare_test_db().await; | ||||
| 
 | ||||
|     let (tokio_duplex, addr) = run_service_for_stream_server().await; | ||||
| 
 | ||||
|     let channel = connect_stream_client_channel(tokio_duplex, addr).await; | ||||
| 
 | ||||
|     let mut daemon_client = BrainVmDaemonClient::new(channel.clone()); | ||||
| 
 | ||||
|     let daemon_key = Key::new(); | ||||
| 
 | ||||
|     register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; | ||||
| 
 | ||||
|     let mut daemon_join_set = JoinSet::new(); | ||||
| 
 | ||||
|     let (tx, mut brain_msg_rx) = tokio::sync::mpsc::channel(1); | ||||
| 
 | ||||
|     // listen to brain
 | ||||
|     let mut daemon_client_01 = daemon_client.clone(); | ||||
|     let daemon_key_01 = daemon_key.clone(); | ||||
|     daemon_join_set.spawn(async move { | ||||
|         let mut grpc_stream = daemon_client_01 | ||||
|             .brain_messages(daemon_key_01.sign_stream_auth(vec![]).unwrap()) | ||||
|             .await | ||||
|             .unwrap() | ||||
|             .into_inner(); | ||||
| 
 | ||||
|         while let Some(Ok(stream_update)) = grpc_stream.next().await { | ||||
|             log::info!("vm deamon got notified: {:?}", &stream_update); | ||||
|             let _ = tx.send(stream_update).await; | ||||
|         } | ||||
|     }); | ||||
| 
 | ||||
|     // send to brain
 | ||||
|     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); | ||||
|     let daemon_msg_tx_01 = daemon_msg_tx.clone(); | ||||
|     let daemon_key_02 = daemon_key.clone(); | ||||
|     daemon_join_set.spawn(async move { | ||||
|         let rx_stream = ReceiverStream::new(rx); | ||||
|         daemon_msg_tx_01 | ||||
|             .send(vm_proto::VmDaemonMessage { | ||||
|                 msg: Some(vm_proto::vm_daemon_message::Msg::Auth( | ||||
|                     daemon_key_02.sign_stream_auth(vec![]).unwrap(), | ||||
|                 )), | ||||
|             }) | ||||
|             .await | ||||
|             .unwrap(); | ||||
|         daemon_client.daemon_messages(rx_stream).await.unwrap(); | ||||
|     }); | ||||
| 
 | ||||
|     // daemon engine
 | ||||
|     daemon_join_set.spawn(async move { | ||||
|         while let Some(brain_msg) = brain_msg_rx.recv().await { | ||||
|             match brain_msg.msg { | ||||
|                 Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { | ||||
|                     let args = Some(vm_proto::MeasurementArgs { | ||||
|                         dtrfs_api_endpoint: String::from("184.107.169.199:48865"), | ||||
|                         exposed_ports: new_vm_req.extra_ports, | ||||
|                         ovmf_hash: String::from( | ||||
|                             "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", | ||||
|                         ), | ||||
|                         ips: vec![], | ||||
|                     }); | ||||
| 
 | ||||
|                     let new_vm_resp = vm_proto::NewVmResp { | ||||
|                         uuid: new_vm_req.uuid.clone(), | ||||
|                         args, | ||||
|                         error: String::new(), | ||||
|                     }; | ||||
| 
 | ||||
|                     let res_data = vm_proto::VmDaemonMessage { | ||||
|                         msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), | ||||
|                     }; | ||||
|                     daemon_msg_tx.send(res_data).await.unwrap(); | ||||
|                 } | ||||
|                 Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { | ||||
|                     todo!() | ||||
|                 } | ||||
|                 Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { | ||||
|                     todo!() | ||||
|                 } | ||||
|                 None => todo!(), | ||||
|             } | ||||
|         } | ||||
|     }); | ||||
| 
 | ||||
|     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||||
| 
 | ||||
|     let mut cli_client = BrainVmCliClient::new(channel); | ||||
|     let brain_channel = run_service_for_stream().await; | ||||
|     let daemon_key = mock_vm_daemon(brain_channel.clone()).await; | ||||
|     let mut cli_client = BrainVmCliClient::new(brain_channel); | ||||
| 
 | ||||
|     let cli_key = Key::new(); | ||||
| 
 | ||||
|     let req = vm_proto::NewVmReq { | ||||
|         admin_pubkey: cli_key.pubkey.clone(), | ||||
|         node_pubkey: daemon_key.pubkey.clone(), | ||||
|         node_pubkey: daemon_key, | ||||
|         price_per_unit: 1200, | ||||
|         extra_ports: vec![8080, 8081], | ||||
|         locked_nano: 0, | ||||
| @ -181,8 +51,7 @@ async fn test_brain_message() { | ||||
|     assert!(new_vm_resp.uuid.len() == 40); | ||||
| 
 | ||||
|     let id = ("measurement_args", new_vm_resp.uuid); | ||||
|     let data_in_db: detee_shared::vm_proto::MeasurementArgs = | ||||
|         surreal_brain::db::DB.select(id).await.unwrap().unwrap(); | ||||
|     let data_in_db: detee_shared::vm_proto::MeasurementArgs = db.select(id).await.unwrap().unwrap(); | ||||
| 
 | ||||
|     assert_eq!(data_in_db, new_vm_resp.args.unwrap()); | ||||
| } | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user