Compare commits
	
		
			10 Commits
		
	
	
		
			32f6548eff
			...
			8fa8a64862
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 8fa8a64862 | |||
| a1664d61e4 | |||
| 28ff8dcd81 | |||
| 8abe8eebf4 | |||
| ea3169c024 | |||
| 8536c85fd5 | |||
| 5cd8317f91 | |||
| de5fba37c9 | |||
| ab39b12da9 | |||
| 92827161ad | 
							
								
								
									
										5
									
								
								.env
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										5
									
								
								.env
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,5 @@ | |||||||
|  | DB_URL = "localhost:8000" | ||||||
|  | DB_USER = "root" | ||||||
|  | DB_PASS = "root" | ||||||
|  | DB_NAMESPACE = "brain" | ||||||
|  | DB_NAME = "migration" | ||||||
							
								
								
									
										7
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										7
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -1078,6 +1078,12 @@ version = "0.3.3" | |||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" | checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" | ||||||
| 
 | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "dotenv" | ||||||
|  | version = "0.15.0" | ||||||
|  | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
|  | checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" | ||||||
|  | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "earcutr" | name = "earcutr" | ||||||
| version = "0.4.3" | version = "0.4.3" | ||||||
| @ -3777,6 +3783,7 @@ dependencies = [ | |||||||
|  "chrono", |  "chrono", | ||||||
|  "dashmap 6.1.0", |  "dashmap 6.1.0", | ||||||
|  "detee-shared", |  "detee-shared", | ||||||
|  |  "dotenv", | ||||||
|  "ed25519-dalek", |  "ed25519-dalek", | ||||||
|  "env_logger", |  "env_logger", | ||||||
|  "futures", |  "futures", | ||||||
|  | |||||||
| @ -21,6 +21,7 @@ log = "0.4.27" | |||||||
| env_logger = "0.11.8" | env_logger = "0.11.8" | ||||||
| thiserror = "2.0.12" | thiserror = "2.0.12" | ||||||
| nanoid = "0.4.0" | nanoid = "0.4.0" | ||||||
|  | dotenv = "0.15.0" | ||||||
| 
 | 
 | ||||||
| [profile.release] | [profile.release] | ||||||
| lto = true | lto = true | ||||||
|  | |||||||
							
								
								
									
										1
									
								
								README.md
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										1
									
								
								README.md
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1 @@ | |||||||
|  | # Brain Miration to SurrealDB | ||||||
| @ -131,3 +131,4 @@ DEFINE FIELD contract ON TABLE kick TYPE record<deleted_vm|deleted_app>; | |||||||
| DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node; | DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node; | ||||||
| DEFINE FIELD created_at ON TABLE report TYPE datetime; | DEFINE FIELD created_at ON TABLE report TYPE datetime; | ||||||
| DEFINE FIELD reason ON TABLE report TYPE string; | DEFINE FIELD reason ON TABLE report TYPE string; | ||||||
|  | DEFINE FIELD contract_id ON TABLE report TYPE string; | ||||||
|  | |||||||
| @ -1,9 +1,10 @@ | |||||||
|  | use std::sync::Arc; | ||||||
|  | 
 | ||||||
| use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; | use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; | ||||||
| use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; | use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; | ||||||
| use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; | use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; | ||||||
| use surreal_brain::constants::{ | use dotenv::dotenv; | ||||||
|     BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH, DB_ADDRESS, DB_NAME, DB_NS, | use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH}; | ||||||
| }; |  | ||||||
| use surreal_brain::db; | use surreal_brain::db; | ||||||
| use surreal_brain::grpc::BrainVmCliForReal; | use surreal_brain::grpc::BrainVmCliForReal; | ||||||
| use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmDaemonForReal}; | use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmDaemonForReal}; | ||||||
| @ -11,13 +12,24 @@ use tonic::transport::{Identity, Server, ServerTlsConfig}; | |||||||
| 
 | 
 | ||||||
| #[tokio::main] | #[tokio::main] | ||||||
| async fn main() { | async fn main() { | ||||||
|  |     dotenv().ok(); | ||||||
|     env_logger::builder().filter_level(log::LevelFilter::Debug).init(); |     env_logger::builder().filter_level(log::LevelFilter::Debug).init(); | ||||||
|     db::init(DB_ADDRESS, DB_NS, DB_NAME).await.unwrap(); | 
 | ||||||
|  |     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||||
|  |     let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); | ||||||
|  |     let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env"); | ||||||
|  |     let db_ns = std::env::var("DB_NAMESPACE").expect("DB_NAMESPACE not set in .env"); | ||||||
|  |     let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env"); | ||||||
|  | 
 | ||||||
|  |     let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap(); | ||||||
|  |     let db_arc = Arc::new(db); | ||||||
|  | 
 | ||||||
|     let addr = BRAIN_GRPC_ADDR.parse().unwrap(); |     let addr = BRAIN_GRPC_ADDR.parse().unwrap(); | ||||||
| 
 | 
 | ||||||
|     let snp_daemon_server = BrainVmDaemonServer::new(BrainVmDaemonForReal {}); |     let snp_daemon_server = BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone())); | ||||||
|     let snp_cli_server = BrainVmCliServer::new(BrainVmCliForReal {}); |     let snp_cli_server = BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone())); | ||||||
|     let general_service_server = BrainGeneralCliServer::new(BrainGeneralCliForReal {}); |     let general_service_server = | ||||||
|  |         BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone())); | ||||||
| 
 | 
 | ||||||
|     let cert = std::fs::read_to_string(CERT_PATH).unwrap(); |     let cert = std::fs::read_to_string(CERT_PATH).unwrap(); | ||||||
|     let key = std::fs::read_to_string(CERT_KEY_PATH).unwrap(); |     let key = std::fs::read_to_string(CERT_KEY_PATH).unwrap(); | ||||||
|  | |||||||
| @ -1,20 +1,24 @@ | |||||||
| // After deleting this migration, also delete old_brain structs
 | // After deleting this migration, also delete old_brain structs
 | ||||||
| // and dangling impls from the model
 | // and dangling impls from the model
 | ||||||
|  | use dotenv::dotenv; | ||||||
| use std::error::Error; | use std::error::Error; | ||||||
| use surreal_brain::constants::{DB_ADDRESS, DB_NAME, DB_NS}; |  | ||||||
| use surreal_brain::db::init; |  | ||||||
| use surreal_brain::{db, old_brain}; | use surreal_brain::{db, old_brain}; | ||||||
| 
 | 
 | ||||||
| #[tokio::main] | #[tokio::main] | ||||||
| async fn main() -> Result<(), Box<dyn Error>> { | async fn main() -> Result<(), Box<dyn Error>> { | ||||||
|  |     dotenv().ok(); | ||||||
|     let old_brain_data = old_brain::BrainData::load_from_disk()?; |     let old_brain_data = old_brain::BrainData::load_from_disk()?; | ||||||
|     // println!("{}", serde_yaml::to_string(&old_brain_data)?);
 |     // println!("{}", serde_yaml::to_string(&old_brain_data)?);
 | ||||||
| 
 | 
 | ||||||
|     init(DB_ADDRESS, DB_NS, DB_NAME).await?; |     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||||
|  |     let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); | ||||||
|  |     let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env"); | ||||||
|  |     let db_ns = std::env::var("DB_NAMESPACE").expect("DB_NAMESPACE not set in .env"); | ||||||
|  |     let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env"); | ||||||
| 
 | 
 | ||||||
|     let result = db::migration0(&old_brain_data).await?; |     let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap(); | ||||||
| 
 | 
 | ||||||
|     println!("{result:?}"); |     db::migration0(&db, &old_brain_data).await?; | ||||||
| 
 | 
 | ||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
|  | |||||||
| @ -2,13 +2,7 @@ pub const BRAIN_GRPC_ADDR: &str = "0.0.0.0:31337"; | |||||||
| pub const CERT_PATH: &str = "./tmp/brain-crt.pem"; | pub const CERT_PATH: &str = "./tmp/brain-crt.pem"; | ||||||
| pub const CERT_KEY_PATH: &str = "./tmp/brain-key.pem"; | pub const CERT_KEY_PATH: &str = "./tmp/brain-key.pem"; | ||||||
| 
 | 
 | ||||||
| pub const DB_ADDRESS: &str = "localhost:8000"; | pub const DB_SCHEMA_FILE: &str = "interim_tables.surql"; | ||||||
| pub const DB_NS: &str = "brain"; |  | ||||||
| pub const DB_NAME: &str = "migration"; |  | ||||||
| 
 |  | ||||||
| // TODO: read from .env
 |  | ||||||
| pub const DB_USER: &str = "root"; |  | ||||||
| pub const DB_PASS: &str = "root"; |  | ||||||
| 
 | 
 | ||||||
| pub const ADMIN_ACCOUNTS: &[&str] = &[ | pub const ADMIN_ACCOUNTS: &[&str] = &[ | ||||||
|     "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK", |     "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK", | ||||||
| @ -26,6 +20,8 @@ pub const UPDATE_VM_REQ: &str = "update_vm_req"; | |||||||
| pub const DELETED_VM: &str = "deleted_vm"; | pub const DELETED_VM: &str = "deleted_vm"; | ||||||
| pub const VM_CONTRACT: &str = "vm_contract"; | pub const VM_CONTRACT: &str = "vm_contract"; | ||||||
| 
 | 
 | ||||||
|  | pub const ACTIVE_APP: &str = "active_app"; | ||||||
|  | 
 | ||||||
| pub const ID_ALPHABET: [char; 62] = [ | pub const ID_ALPHABET: [char; 62] = [ | ||||||
|     '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', |     '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', | ||||||
|     'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', |     'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', | ||||||
|  | |||||||
							
								
								
									
										274
									
								
								src/db.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										274
									
								
								src/db.rs
									
									
									
									
									
								
							| @ -1,11 +1,12 @@ | |||||||
|  | use std::{str::FromStr, time::Duration}; | ||||||
|  | 
 | ||||||
| pub use crate::constants::{ | pub use crate::constants::{ | ||||||
|     ACCOUNT, ACTIVE_VM, DB_ADDRESS, DB_NAME, DB_NS, DB_PASS, DB_USER, DELETED_VM, ID_ALPHABET, |     ACCOUNT, ACTIVE_APP, ACTIVE_VM, DB_SCHEMA_FILE, DELETED_VM, ID_ALPHABET, NEW_VM_REQ, | ||||||
|     NEW_VM_REQ, UPDATE_VM_REQ, VM_CONTRACT, VM_NODE, |     UPDATE_VM_REQ, VM_CONTRACT, VM_NODE, | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| use crate::old_brain; | use crate::old_brain; | ||||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||||
| use std::{str::FromStr, sync::LazyLock}; |  | ||||||
| use surrealdb::{ | use surrealdb::{ | ||||||
|     engine::remote::ws::{Client, Ws}, |     engine::remote::ws::{Client, Ws}, | ||||||
|     opt::auth::Root, |     opt::auth::Root, | ||||||
| @ -15,49 +16,65 @@ use surrealdb::{ | |||||||
| use tokio::sync::mpsc::Sender; | use tokio::sync::mpsc::Sender; | ||||||
| use tokio_stream::StreamExt as _; | use tokio_stream::StreamExt as _; | ||||||
| 
 | 
 | ||||||
| pub static DB: LazyLock<Surreal<Client>> = LazyLock::new(Surreal::init); |  | ||||||
| 
 |  | ||||||
| #[derive(thiserror::Error, Debug)] | #[derive(thiserror::Error, Debug)] | ||||||
| pub enum Error { | pub enum Error { | ||||||
|     #[error("Internal DB error: {0}")] |     #[error("Internal DB error: {0}")] | ||||||
|     DataBase(#[from] surrealdb::Error), |     DataBase(#[from] surrealdb::Error), | ||||||
|     #[error("Daemon channel got closed: {0}")] |     #[error("Daemon channel got closed: {0}")] | ||||||
|     DaemonConnection(#[from] tokio::sync::mpsc::error::SendError<DaemonNotification>), |     DaemonConnection(#[from] tokio::sync::mpsc::error::SendError<DaemonNotification>), | ||||||
|  |     #[error(transparent)] | ||||||
|  |     StdIo(#[from] std::io::Error), | ||||||
|  |     #[error(transparent)] | ||||||
|  |     TimeOut(#[from] tokio::time::error::Elapsed), | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn init(db_address: &str, ns: &str, db: &str) -> surrealdb::Result<()> { | pub async fn db_connection( | ||||||
|     DB.connect::<Ws>(db_address).await?; |     db_address: &str, | ||||||
|  |     username: &str, | ||||||
|  |     password: &str, | ||||||
|  |     ns: &str, | ||||||
|  |     db: &str, | ||||||
|  | ) -> Result<Surreal<Client>, Error> { | ||||||
|  |     let db_connection: Surreal<Client> = Surreal::init(); | ||||||
|  |     db_connection.connect::<Ws>(db_address).await?; | ||||||
|     // Sign in to the server
 |     // Sign in to the server
 | ||||||
|     DB.signin(Root { username: DB_USER, password: DB_PASS }).await?; |     db_connection.signin(Root { username, password }).await?; | ||||||
|     DB.use_ns(ns).use_db(db).await?; |     db_connection.use_ns(ns).use_db(db).await?; | ||||||
|     Ok(()) |     Ok(db_connection) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn upsert_record<SomeRecord: Serialize + 'static>( | pub async fn upsert_record<SomeRecord: Serialize + 'static>( | ||||||
|  |     db: &Surreal<Client>, | ||||||
|     table: &str, |     table: &str, | ||||||
|     id: &str, |     id: &str, | ||||||
|     my_record: SomeRecord, |     my_record: SomeRecord, | ||||||
| ) -> Result<(), Error> { | ) -> Result<(), Error> { | ||||||
|     #[derive(Deserialize)] |     #[derive(Deserialize)] | ||||||
|     struct Wrapper {} |     struct Wrapper {} | ||||||
|     let _: Option<Wrapper> = DB.create((table, id)).content(my_record).await?; |     let _: Option<Wrapper> = db.create((table, id)).content(my_record).await?; | ||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()> { | pub async fn migration0( | ||||||
|  |     db: &Surreal<Client>, | ||||||
|  |     old_data: &old_brain::BrainData, | ||||||
|  | ) -> Result<(), Error> { | ||||||
|     let accounts: Vec<Account> = old_data.into(); |     let accounts: Vec<Account> = old_data.into(); | ||||||
|     let vm_nodes: Vec<VmNode> = old_data.into(); |     let vm_nodes: Vec<VmNode> = old_data.into(); | ||||||
|     let app_nodes: Vec<AppNode> = old_data.into(); |     let app_nodes: Vec<AppNode> = old_data.into(); | ||||||
|     let vm_contracts: Vec<ActiveVm> = old_data.into(); |     let vm_contracts: Vec<ActiveVm> = old_data.into(); | ||||||
| 
 | 
 | ||||||
|  |     let schema = std::fs::read_to_string(DB_SCHEMA_FILE)?; | ||||||
|  |     db.query(schema).await?; | ||||||
|  | 
 | ||||||
|     println!("Inserting accounts..."); |     println!("Inserting accounts..."); | ||||||
|     let _: Vec<Account> = DB.insert(()).content(accounts).await?; |     let _: Vec<Account> = db.insert(()).content(accounts).await?; | ||||||
|     println!("Inserting vm nodes..."); |     println!("Inserting vm nodes..."); | ||||||
|     let _: Vec<VmNode> = DB.insert(()).content(vm_nodes).await?; |     let _: Vec<VmNode> = db.insert(()).content(vm_nodes).await?; | ||||||
|     println!("Inserting app nodes..."); |     println!("Inserting app nodes..."); | ||||||
|     let _: Vec<AppNode> = DB.insert(()).content(app_nodes).await?; |     let _: Vec<AppNode> = db.insert(()).content(app_nodes).await?; | ||||||
|     println!("Inserting vm contracts..."); |     println!("Inserting vm contracts..."); | ||||||
|     let _: Vec<ActiveVm> = DB.insert("vm_contract").relation(vm_contracts).await?; |     let _: Vec<ActiveVm> = db.insert("vm_contract").relation(vm_contracts).await?; | ||||||
| 
 | 
 | ||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
| @ -72,9 +89,9 @@ pub struct Account { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl Account { | impl Account { | ||||||
|     pub async fn get(address: &str) -> Result<Self, Error> { |     pub async fn get(db: &Surreal<Client>, address: &str) -> Result<Self, Error> { | ||||||
|         let id = (ACCOUNT, address); |         let id = (ACCOUNT, address); | ||||||
|         let account: Option<Self> = DB.select(id).await?; |         let account: Option<Self> = db.select(id).await?; | ||||||
|         let account = match account { |         let account = match account { | ||||||
|             Some(account) => account, |             Some(account) => account, | ||||||
|             None => { |             None => { | ||||||
| @ -84,9 +101,9 @@ impl Account { | |||||||
|         Ok(account) |         Ok(account) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn airdrop(account: &str, tokens: u64) -> Result<(), Error> { |     pub async fn airdrop(db: &Surreal<Client>, account: &str, tokens: u64) -> Result<(), Error> { | ||||||
|         let tokens = tokens.saturating_mul(1_000_000_000); |         let tokens = tokens.saturating_mul(1_000_000_000); | ||||||
|         let _ = DB |         let _ = db | ||||||
|             .query(format!("upsert account:{account} SET balance = (balance || 0) + {tokens};")) |             .query(format!("upsert account:{account} SET balance = (balance || 0) + {tokens};")) | ||||||
|             .await?; |             .await?; | ||||||
|         Ok(()) |         Ok(()) | ||||||
| @ -94,8 +111,12 @@ impl Account { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl Account { | impl Account { | ||||||
|     pub async fn is_banned_by_node(user: &str, node: &str) -> Result<bool, Error> { |     pub async fn is_banned_by_node( | ||||||
|         let ban: Option<Self> = DB |         db: &Surreal<Client>, | ||||||
|  |         user: &str, | ||||||
|  |         node: &str, | ||||||
|  |     ) -> Result<bool, Error> { | ||||||
|  |         let ban: Option<Self> = db | ||||||
|             .query(format!( |             .query(format!( | ||||||
|                 "(select operator->ban[0] as ban
 |                 "(select operator->ban[0] as ban
 | ||||||
|                     from vm_node:{node} |                     from vm_node:{node} | ||||||
| @ -139,15 +160,15 @@ pub struct VmNodeResources { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl VmNodeResources { | impl VmNodeResources { | ||||||
|     pub async fn merge(self, node_id: &str) -> Result<(), Error> { |     pub async fn merge(self, db: &Surreal<Client>, node_id: &str) -> Result<(), Error> { | ||||||
|         let _: Option<VmNode> = DB.update((VM_NODE, node_id)).merge(self).await?; |         let _: Option<VmNode> = db.update((VM_NODE, node_id)).merge(self).await?; | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl VmNode { | impl VmNode { | ||||||
|     pub async fn register(self) -> Result<(), Error> { |     pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> { | ||||||
|         let _: Option<VmNode> = DB.upsert(self.id.clone()).content(self).await?; |         let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?; | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @ -176,6 +197,7 @@ impl VmNodeWithReports { | |||||||
|     // TODO: find a more elegant way to do this than importing gRPC in the DB module
 |     // TODO: find a more elegant way to do this than importing gRPC in the DB module
 | ||||||
|     // https://en.wikipedia.org/wiki/Dependency_inversion_principle
 |     // https://en.wikipedia.org/wiki/Dependency_inversion_principle
 | ||||||
|     pub async fn find_by_filters( |     pub async fn find_by_filters( | ||||||
|  |         db: &Surreal<Client>, | ||||||
|         filters: detee_shared::snp::pb::vm_proto::VmNodeFilters, |         filters: detee_shared::snp::pb::vm_proto::VmNodeFilters, | ||||||
|     ) -> Result<Vec<Self>, Error> { |     ) -> Result<Vec<Self>, Error> { | ||||||
|         let mut query = format!( |         let mut query = format!( | ||||||
| @ -208,7 +230,7 @@ impl VmNodeWithReports { | |||||||
|             query += &format!("&& ip = '{}' ", filters.ip); |             query += &format!("&& ip = '{}' ", filters.ip); | ||||||
|         } |         } | ||||||
|         query += ";"; |         query += ";"; | ||||||
|         let mut result = DB.query(query).await?; |         let mut result = db.query(query).await?; | ||||||
|         let vm_nodes: Vec<Self> = result.take(0)?; |         let vm_nodes: Vec<Self> = result.take(0)?; | ||||||
|         Ok(vm_nodes) |         Ok(vm_nodes) | ||||||
|     } |     } | ||||||
| @ -263,27 +285,27 @@ pub struct NewVmReq { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl NewVmReq { | impl NewVmReq { | ||||||
|     pub async fn get(id: &str) -> Result<Option<Self>, Error> { |     pub async fn get(db: &Surreal<Client>, id: &str) -> Result<Option<Self>, Error> { | ||||||
|         let new_vm_req: Option<Self> = DB.select((NEW_VM_REQ, id)).await?; |         let new_vm_req: Option<Self> = db.select((NEW_VM_REQ, id)).await?; | ||||||
|         Ok(new_vm_req) |         Ok(new_vm_req) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn delete(id: &str) -> Result<(), Error> { |     pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<(), Error> { | ||||||
|         let _: Option<Self> = DB.delete((NEW_VM_REQ, id)).await?; |         let _: Option<Self> = db.delete((NEW_VM_REQ, id)).await?; | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn submit_error(id: &str, error: String) -> Result<(), Error> { |     pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> { | ||||||
|         #[derive(Serialize)] |         #[derive(Serialize)] | ||||||
|         struct NewVmError { |         struct NewVmError { | ||||||
|             error: String, |             error: String, | ||||||
|         } |         } | ||||||
|         let _: Option<Self> = DB.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?; |         let _: Option<Self> = db.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?; | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn submit(self) -> Result<(), Error> { |     pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> { | ||||||
|         let _: Vec<Self> = DB.insert(NEW_VM_REQ).relation(self).await?; |         let _: Vec<Self> = db.insert(NEW_VM_REQ).relation(self).await?; | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @ -297,8 +319,8 @@ pub enum NewVmResp { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl NewVmResp { | impl NewVmResp { | ||||||
|     pub async fn listen(vm_id: &str) -> Result<NewVmResp, Error> { |     pub async fn listen(db: &Surreal<Client>, vm_id: &str) -> Result<NewVmResp, Error> { | ||||||
|         let mut resp = DB |         let mut resp = db | ||||||
|             .query(format!("live select * from {NEW_VM_REQ} where id = {NEW_VM_REQ}:{vm_id};")) |             .query(format!("live select * from {NEW_VM_REQ} where id = {NEW_VM_REQ}:{vm_id};")) | ||||||
|             .query(format!( |             .query(format!( | ||||||
|                 "live select * from measurement_args where id = measurement_args:{vm_id};" |                 "live select * from measurement_args where id = measurement_args:{vm_id};" | ||||||
| @ -308,43 +330,37 @@ impl NewVmResp { | |||||||
|         let mut args_stream = |         let mut args_stream = | ||||||
|             resp.stream::<Notification<detee_shared::snp::pb::vm_proto::MeasurementArgs>>(1)?; |             resp.stream::<Notification<detee_shared::snp::pb::vm_proto::MeasurementArgs>>(1)?; | ||||||
| 
 | 
 | ||||||
|         loop { |         tokio::time::timeout(Duration::from_secs(10), async { | ||||||
|             tokio::select! { |             loop { | ||||||
|                 new_vm_req_notif = new_vm_stream.next() => { |                 tokio::select! { | ||||||
|                     log::debug!("Got stream 1..."); |                     new_vm_req_notif = new_vm_stream.next() => { | ||||||
|                     if let Some(new_vm_req_notif) = new_vm_req_notif { |                         log::debug!("Got stream 1..."); | ||||||
|                         match new_vm_req_notif { |                         if let Some(new_vm_req_notif) = new_vm_req_notif { | ||||||
|                             Ok(new_vm_req_notif) => { |                             match new_vm_req_notif { | ||||||
|                                 match new_vm_req_notif.action { |                                 Ok(new_vm_req_notif) => { | ||||||
|                                     surrealdb::Action::Update => { |                                     if new_vm_req_notif.action == surrealdb::Action::Update && !new_vm_req_notif.data.error.is_empty() { | ||||||
|                                         if !new_vm_req_notif.data.error.is_empty() { |                                         return Ok::<NewVmResp, Error>(Self::Error(vm_id.to_string(), new_vm_req_notif.data.error)); | ||||||
|                                             return Ok(Self::Error(vm_id.to_string(), new_vm_req_notif.data.error)); |                                     }; | ||||||
|                                         } |                                 }, | ||||||
|                                     }, |                                 Err(e) => return Err(e.into()), | ||||||
|                                     _ => {} |                             } | ||||||
|                                 }; |  | ||||||
|                             }, |  | ||||||
|                             Err(e) => return Err(e.into()), |  | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
|                 } |                     args_notif = args_stream.next() => { | ||||||
|                 args_notif = args_stream.next() => { |                         if let Some(args_notif) = args_notif { | ||||||
|                     if let Some(args_notif) = args_notif { |                             match args_notif { | ||||||
|                         match args_notif { |                                 Ok(args_notif) => { | ||||||
|                             Ok(args_notif) => { |                                     if args_notif.action == surrealdb::Action::Create { | ||||||
|                                 match args_notif.action { |  | ||||||
|                                     surrealdb::Action::Create => { |  | ||||||
|                                         return Ok(Self::Args(vm_id.to_string(), args_notif.data)); |                                         return Ok(Self::Args(vm_id.to_string(), args_notif.data)); | ||||||
|                                     }, |                                     }; | ||||||
|                                     _ => {} |                                 }, | ||||||
|                                 }; |                                 Err(e) => return Err(e.into()), | ||||||
|                             }, |                             } | ||||||
|                             Err(e) => return Err(e.into()), |  | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         }).await? | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -372,10 +388,11 @@ pub struct ActiveVm { | |||||||
| 
 | 
 | ||||||
| impl ActiveVm { | impl ActiveVm { | ||||||
|     pub async fn activate( |     pub async fn activate( | ||||||
|  |         db: &Surreal<Client>, | ||||||
|         id: &str, |         id: &str, | ||||||
|         args: detee_shared::vm_proto::MeasurementArgs, |         args: detee_shared::vm_proto::MeasurementArgs, | ||||||
|     ) -> Result<(), Error> { |     ) -> Result<(), Error> { | ||||||
|         let new_vm_req = match NewVmReq::get(id).await? { |         let new_vm_req = match NewVmReq::get(db, id).await? { | ||||||
|             Some(r) => r, |             Some(r) => r, | ||||||
|             None => return Ok(()), |             None => return Ok(()), | ||||||
|         }; |         }; | ||||||
| @ -423,9 +440,9 @@ impl ActiveVm { | |||||||
|             collected_at: new_vm_req.created_at, |             collected_at: new_vm_req.created_at, | ||||||
|         }; |         }; | ||||||
| 
 | 
 | ||||||
|         let _: Vec<ActiveVm> = DB.insert(()).relation(active_vm).await?; |         let _: Vec<ActiveVm> = db.insert(()).relation(active_vm).await?; | ||||||
| 
 | 
 | ||||||
|         NewVmReq::delete(id).await?; |         NewVmReq::delete(db, id).await?; | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @ -452,6 +469,7 @@ pub struct UpdateVmReq { | |||||||
| pub async fn listen_for_node< | pub async fn listen_for_node< | ||||||
|     T: Into<DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>, |     T: Into<DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>, | ||||||
| >( | >( | ||||||
|  |     db: &Surreal<Client>, | ||||||
|     node: &str, |     node: &str, | ||||||
|     tx: Sender<DaemonNotification>, |     tx: Sender<DaemonNotification>, | ||||||
| ) -> Result<(), Error> { | ) -> Result<(), Error> { | ||||||
| @ -465,14 +483,15 @@ pub async fn listen_for_node< | |||||||
|         } |         } | ||||||
|     }; |     }; | ||||||
|     let mut resp = |     let mut resp = | ||||||
|         DB.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; |         db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; | ||||||
|     let mut live_stream = resp.stream::<Notification<T>>(0)?; |     let mut live_stream = resp.stream::<Notification<T>>(0)?; | ||||||
|     while let Some(result) = live_stream.next().await { |     while let Some(result) = live_stream.next().await { | ||||||
|         match result { |         match result { | ||||||
|             Ok(notification) => match notification.action { |             Ok(notification) => { | ||||||
|                 surrealdb::Action::Create => tx.send(notification.data.into()).await?, |                 if notification.action == surrealdb::Action::Create { | ||||||
|                 _ => {} |                     tx.send(notification.data.into()).await? | ||||||
|             }, |                 } | ||||||
|  |             } | ||||||
|             Err(e) => { |             Err(e) => { | ||||||
|                 log::warn!("listen_for_deletion DB stream failed for {node}: {e}"); |                 log::warn!("listen_for_deletion DB stream failed for {node}: {e}"); | ||||||
|                 return Err(Error::from(e)); |                 return Err(Error::from(e)); | ||||||
| @ -504,28 +523,31 @@ pub struct DeletedVm { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl DeletedVm { | impl DeletedVm { | ||||||
|     pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> { |     pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> { | ||||||
|         let contract: Option<Self> = |         let contract: Option<Self> = | ||||||
|             DB.query(format!("select * from {DELETED_VM}:{uuid};")).await?.take(0)?; |             db.query(format!("select * from {DELETED_VM}:{uuid};")).await?.take(0)?; | ||||||
|         Ok(contract) |         Ok(contract) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> { |     pub async fn list_by_admin(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> { | ||||||
|         let mut result = |         let mut result = | ||||||
|             DB.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?; |             db.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?; | ||||||
|         let contracts: Vec<Self> = result.take(0)?; |         let contracts: Vec<Self> = result.take(0)?; | ||||||
|         Ok(contracts) |         Ok(contracts) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> { |     pub async fn list_by_node(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> { | ||||||
|         let mut result = |         let mut result = | ||||||
|             DB.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?; |             db.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?; | ||||||
|         let contracts: Vec<Self> = result.take(0)?; |         let contracts: Vec<Self> = result.take(0)?; | ||||||
|         Ok(contracts) |         Ok(contracts) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn list_by_operator(operator: &str) -> Result<Vec<Self>, Error> { |     pub async fn list_by_operator( | ||||||
|         let mut result = DB |         db: &Surreal<Client>, | ||||||
|  |         operator: &str, | ||||||
|  |     ) -> Result<Vec<Self>, Error> { | ||||||
|  |         let mut result = db | ||||||
|             .query(format!( |             .query(format!( | ||||||
|                 "select
 |                 "select
 | ||||||
|                 (select * from ->operator->vm_node<-{DELETED_VM}) as contracts |                 (select * from ->operator->vm_node<-{DELETED_VM}) as contracts | ||||||
| @ -603,30 +625,33 @@ pub struct ActiveVmWithNode { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl ActiveVmWithNode { | impl ActiveVmWithNode { | ||||||
|     pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> { |     pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> { | ||||||
|         let contract: Option<Self> = |         let contract: Option<Self> = | ||||||
|             DB.query(format!("select * from {ACTIVE_VM}:{uuid} fetch out;")).await?.take(0)?; |             db.query(format!("select * from {ACTIVE_VM}:{uuid} fetch out;")).await?.take(0)?; | ||||||
|         Ok(contract) |         Ok(contract) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> { |     pub async fn list_by_admin(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> { | ||||||
|         let mut result = DB |         let mut result = db | ||||||
|             .query(format!("select * from {ACTIVE_VM} where in = {ACCOUNT}:{admin} fetch out;")) |             .query(format!("select * from {ACTIVE_VM} where in = {ACCOUNT}:{admin} fetch out;")) | ||||||
|             .await?; |             .await?; | ||||||
|         let contracts: Vec<Self> = result.take(0)?; |         let contracts: Vec<Self> = result.take(0)?; | ||||||
|         Ok(contracts) |         Ok(contracts) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> { |     pub async fn list_by_node(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> { | ||||||
|         let mut result = DB |         let mut result = db | ||||||
|             .query(format!("select * from {ACTIVE_VM} where out = {VM_NODE}:{admin} fetch out;")) |             .query(format!("select * from {ACTIVE_VM} where out = {VM_NODE}:{admin} fetch out;")) | ||||||
|             .await?; |             .await?; | ||||||
|         let contracts: Vec<Self> = result.take(0)?; |         let contracts: Vec<Self> = result.take(0)?; | ||||||
|         Ok(contracts) |         Ok(contracts) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn list_by_operator(operator: &str) -> Result<Vec<Self>, Error> { |     pub async fn list_by_operator( | ||||||
|         let mut result = DB |         db: &Surreal<Client>, | ||||||
|  |         operator: &str, | ||||||
|  |     ) -> Result<Vec<Self>, Error> { | ||||||
|  |         let mut result = db | ||||||
|             .query(format!( |             .query(format!( | ||||||
|                 "select
 |                 "select
 | ||||||
|                 (select * from ->operator->vm_node<-{ACTIVE_VM} fetch out) as contracts |                 (select * from ->operator->vm_node<-{ACTIVE_VM} fetch out) as contracts | ||||||
| @ -699,7 +724,7 @@ pub struct AppNodeWithReports { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Debug, Serialize, Deserialize)] | #[derive(Debug, Serialize, Deserialize)] | ||||||
| pub struct AppContract { | pub struct ActiveApp { | ||||||
|     id: RecordId, |     id: RecordId, | ||||||
|     #[serde(rename = "in")] |     #[serde(rename = "in")] | ||||||
|     admin: RecordId, |     admin: RecordId, | ||||||
| @ -720,6 +745,36 @@ pub struct AppContract { | |||||||
|     hratls_pubkey: String, |     hratls_pubkey: String, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | #[derive(Debug, Serialize, Deserialize)] | ||||||
|  | pub struct ActiveAppWithNode { | ||||||
|  |     pub id: RecordId, | ||||||
|  |     #[serde(rename = "in")] | ||||||
|  |     pub admin: RecordId, | ||||||
|  |     #[serde(rename = "out")] | ||||||
|  |     pub app_node: AppNode, | ||||||
|  |     pub app_name: String, | ||||||
|  |     pub mapped_ports: Vec<(u64, u64)>, | ||||||
|  |     pub host_ipv4: String, | ||||||
|  |     pub vcpus: u64, | ||||||
|  |     pub memory_mb: u64, | ||||||
|  |     pub disk_size_gb: u64, | ||||||
|  |     pub created_at: Datetime, | ||||||
|  |     pub price_per_unit: u64, | ||||||
|  |     pub locked_nano: u64, | ||||||
|  |     pub collected_at: Datetime, | ||||||
|  |     pub mr_enclave: String, | ||||||
|  |     pub package_url: String, | ||||||
|  |     pub hratls_pubkey: String, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl ActiveAppWithNode { | ||||||
|  |     pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> { | ||||||
|  |         let contract: Option<Self> = | ||||||
|  |             db.query(format!("select * from {ACTIVE_APP}:{uuid} fetch out;")).await?.take(0)?; | ||||||
|  |         Ok(contract) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
| #[derive(Debug, Serialize, Deserialize)] | #[derive(Debug, Serialize, Deserialize)] | ||||||
| pub struct Ban { | pub struct Ban { | ||||||
|     id: RecordId, |     id: RecordId, | ||||||
| @ -750,18 +805,27 @@ pub struct Report { | |||||||
|     to_node: RecordId, |     to_node: RecordId, | ||||||
|     created_at: Datetime, |     created_at: Datetime, | ||||||
|     pub reason: String, |     pub reason: String, | ||||||
|  |     pub contract_id: String, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl Report { | impl Report { | ||||||
|     // TODO: test this functionality and remove this comment
 |     // TODO: test this functionality and remove this comment
 | ||||||
|     pub async fn create( |     pub async fn create( | ||||||
|  |         db: &Surreal<Client>, | ||||||
|         from_account: RecordId, |         from_account: RecordId, | ||||||
|         to_node: RecordId, |         to_node: RecordId, | ||||||
|         reason: String, |         reason: String, | ||||||
|  |         contract_id: String, | ||||||
|     ) -> Result<(), Error> { |     ) -> Result<(), Error> { | ||||||
|         let _: Vec<Self> = DB |         let _: Vec<Self> = db | ||||||
|             .insert("report") |             .insert("report") | ||||||
|             .relation(Report { from_account, to_node, created_at: Datetime::default(), reason }) |             .relation(Report { | ||||||
|  |                 from_account, | ||||||
|  |                 to_node, | ||||||
|  |                 created_at: Datetime::default(), | ||||||
|  |                 reason, | ||||||
|  |                 contract_id, | ||||||
|  |             }) | ||||||
|             .await?; |             .await?; | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| @ -780,27 +844,28 @@ pub struct Operator { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl Operator { | impl Operator { | ||||||
|     pub async fn list() -> Result<Vec<Self>, Error> { |     pub async fn list(db: &Surreal<Client>) -> Result<Vec<Self>, Error> { | ||||||
|         let mut result = DB |         let mut result = db | ||||||
|             .query(format!( |             .query( | ||||||
|                 "array::distinct(array::flatten( [
 |                 "array::distinct(array::flatten( [
 | ||||||
|                     (select operator from vm_node group by operator).operator, |                     (select operator from vm_node group by operator).operator, | ||||||
|                     (select operator from app_node group by operator).operator |                     (select operator from app_node group by operator).operator | ||||||
|                 ]));" |                 ]));" | ||||||
|             )) |                     .to_string(), | ||||||
|  |             ) | ||||||
|             .await?; |             .await?; | ||||||
|         let operator_accounts: Vec<RecordId> = result.take(0)?; |         let operator_accounts: Vec<RecordId> = result.take(0)?; | ||||||
|         let mut operators: Vec<Self> = Vec::new(); |         let mut operators: Vec<Self> = Vec::new(); | ||||||
|         for account in operator_accounts.iter() { |         for account in operator_accounts.iter() { | ||||||
|             if let Some(operator) = Self::inspect(&account.key().to_string()).await? { |             if let Some(operator) = Self::inspect(db, &account.key().to_string()).await? { | ||||||
|                 operators.push(operator); |                 operators.push(operator); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|         Ok(operators) |         Ok(operators) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn inspect(account: &str) -> Result<Option<Self>, Error> { |     pub async fn inspect(db: &Surreal<Client>, account: &str) -> Result<Option<Self>, Error> { | ||||||
|         let mut result = DB |         let mut result = db | ||||||
|             .query(format!( |             .query(format!( | ||||||
|                 "$vm_nodes = (select id from vm_node where operator = account:{account}).id;
 |                 "$vm_nodes = (select id from vm_node where operator = account:{account}).id;
 | ||||||
|                 $app_nodes = (select id from app_node where operator = account:{account}).id; |                 $app_nodes = (select id from app_node where operator = account:{account}).id; | ||||||
| @ -821,10 +886,11 @@ impl Operator { | |||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn inspect_nodes( |     pub async fn inspect_nodes( | ||||||
|  |         db: &Surreal<Client>, | ||||||
|         account: &str, |         account: &str, | ||||||
|     ) -> Result<(Option<Self>, Vec<VmNodeWithReports>, Vec<AppNodeWithReports>), Error> { |     ) -> Result<(Option<Self>, Vec<VmNodeWithReports>, Vec<AppNodeWithReports>), Error> { | ||||||
|         let operator = Self::inspect(account).await?; |         let operator = Self::inspect(db, account).await?; | ||||||
|         let mut result = DB |         let mut result = db | ||||||
|             .query(format!( |             .query(format!( | ||||||
|                 "select *, operator, <-report.* as reports from vm_node
 |                 "select *, operator, <-report.* as reports from vm_node
 | ||||||
|                  where operator = account:{account};" |                  where operator = account:{account};" | ||||||
| @ -875,7 +941,7 @@ impl From<&old_brain::BrainData> for Vec<ActiveVm> { | |||||||
|         for old_c in old_data.vm_contracts.iter() { |         for old_c in old_data.vm_contracts.iter() { | ||||||
|             let mut mapped_ports = Vec::new(); |             let mut mapped_ports = Vec::new(); | ||||||
|             for port in old_c.exposed_ports.iter() { |             for port in old_c.exposed_ports.iter() { | ||||||
|                 mapped_ports.push((*port, 8080 as u32)); |                 mapped_ports.push((*port, 8080u32)); | ||||||
|             } |             } | ||||||
|             contracts.push(ActiveVm { |             contracts.push(ActiveVm { | ||||||
|                 id: RecordId::from((ACTIVE_VM, old_c.uuid.replace("-", ""))), |                 id: RecordId::from((ACTIVE_VM, old_c.uuid.replace("-", ""))), | ||||||
|  | |||||||
							
								
								
									
										140
									
								
								src/grpc.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										140
									
								
								src/grpc.rs
									
									
									
									
									
								
							| @ -15,16 +15,26 @@ use detee_shared::{ | |||||||
|     }, |     }, | ||||||
| }; | }; | ||||||
| use nanoid::nanoid; | use nanoid::nanoid; | ||||||
|  | use surrealdb::{engine::remote::ws::Client, Surreal}; | ||||||
| 
 | 
 | ||||||
| use log::info; | use log::info; | ||||||
| use std::pin::Pin; | use std::pin::Pin; | ||||||
|  | use std::sync::Arc; | ||||||
| use surrealdb::RecordId; | use surrealdb::RecordId; | ||||||
| use tokio::sync::mpsc; | use tokio::sync::mpsc; | ||||||
| use tokio_stream::wrappers::ReceiverStream; | use tokio_stream::wrappers::ReceiverStream; | ||||||
| use tokio_stream::{Stream, StreamExt}; | use tokio_stream::{Stream, StreamExt}; | ||||||
| use tonic::{Request, Response, Status, Streaming}; | use tonic::{Request, Response, Status, Streaming}; | ||||||
| 
 | 
 | ||||||
| pub struct BrainGeneralCliForReal {} | pub struct BrainGeneralCliForReal { | ||||||
|  |     pub db: Arc<Surreal<Client>>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl BrainGeneralCliForReal { | ||||||
|  |     pub fn new(db: Arc<Surreal<Client>>) -> Self { | ||||||
|  |         Self { db } | ||||||
|  |     } | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| impl From<db::Account> for AccountBalance { | impl From<db::Account> for AccountBalance { | ||||||
|     fn from(account: db::Account) -> Self { |     fn from(account: db::Account) -> Self { | ||||||
| @ -239,7 +249,15 @@ impl From<VmNodeResources> for db::VmNodeResources { | |||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub struct BrainVmDaemonForReal {} | pub struct BrainVmDaemonForReal { | ||||||
|  |     pub db: Arc<Surreal<Client>>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl BrainVmDaemonForReal { | ||||||
|  |     pub fn new(db: Arc<Surreal<Client>>) -> Self { | ||||||
|  |         Self { db } | ||||||
|  |     } | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| #[tonic::async_trait] | #[tonic::async_trait] | ||||||
| impl BrainVmDaemon for BrainVmDaemonForReal { | impl BrainVmDaemon for BrainVmDaemonForReal { | ||||||
| @ -267,11 +285,11 @@ impl BrainVmDaemon for BrainVmDaemonForReal { | |||||||
|             max_ports_per_vm: 0, |             max_ports_per_vm: 0, | ||||||
|             offline_minutes: 0, |             offline_minutes: 0, | ||||||
|         } |         } | ||||||
|         .register() |         .register(&self.db) | ||||||
|         .await?; |         .await?; | ||||||
| 
 | 
 | ||||||
|         info!("Sending existing contracts to {}", req.node_pubkey); |         info!("Sending existing contracts to {}", req.node_pubkey); | ||||||
|         let contracts = db::ActiveVmWithNode::list_by_node(&req.node_pubkey).await?; |         let contracts = db::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?; | ||||||
|         let (tx, rx) = mpsc::channel(6); |         let (tx, rx) = mpsc::channel(6); | ||||||
|         tokio::spawn(async move { |         tokio::spawn(async move { | ||||||
|             for contract in contracts { |             for contract in contracts { | ||||||
| @ -299,10 +317,11 @@ impl BrainVmDaemon for BrainVmDaemonForReal { | |||||||
| 
 | 
 | ||||||
|         let (tx, rx) = mpsc::channel(6); |         let (tx, rx) = mpsc::channel(6); | ||||||
|         { |         { | ||||||
|  |             let db = self.db.clone(); | ||||||
|             let pubkey = pubkey.clone(); |             let pubkey = pubkey.clone(); | ||||||
|             let tx = tx.clone(); |             let tx = tx.clone(); | ||||||
|             tokio::spawn(async move { |             tokio::spawn(async move { | ||||||
|                 match db::listen_for_node::<db::DeletedVm>(&pubkey, tx).await { |                 match db::listen_for_node::<db::DeletedVm>(&db, &pubkey, tx).await { | ||||||
|                     Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"), |                     Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"), | ||||||
|                     Err(e) => { |                     Err(e) => { | ||||||
|                         log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}") |                         log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}") | ||||||
| @ -311,17 +330,19 @@ impl BrainVmDaemon for BrainVmDaemonForReal { | |||||||
|             }); |             }); | ||||||
|         } |         } | ||||||
|         { |         { | ||||||
|  |             let db = self.db.clone(); | ||||||
|             let pubkey = pubkey.clone(); |             let pubkey = pubkey.clone(); | ||||||
|             let tx = tx.clone(); |             let tx = tx.clone(); | ||||||
|             tokio::spawn(async move { |             tokio::spawn(async move { | ||||||
|                 let _ = db::listen_for_node::<db::NewVmReq>(&pubkey, tx.clone()).await; |                 let _ = db::listen_for_node::<db::NewVmReq>(&db, &pubkey, tx.clone()).await; | ||||||
|             }); |             }); | ||||||
|         } |         } | ||||||
|         { |         { | ||||||
|  |             let db = self.db.clone(); | ||||||
|             let pubkey = pubkey.clone(); |             let pubkey = pubkey.clone(); | ||||||
|             let tx = tx.clone(); |             let tx = tx.clone(); | ||||||
|             tokio::spawn(async move { |             tokio::spawn(async move { | ||||||
|                 let _ = db::listen_for_node::<db::UpdateVmReq>(&pubkey, tx.clone()).await; |                 let _ = db::listen_for_node::<db::UpdateVmReq>(&db, &pubkey, tx.clone()).await; | ||||||
|             }); |             }); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
| @ -361,27 +382,32 @@ impl BrainVmDaemon for BrainVmDaemonForReal { | |||||||
|                         // TODO: move new_vm_req to active_vm
 |                         // TODO: move new_vm_req to active_vm
 | ||||||
|                         // also handle failure properly
 |                         // also handle failure properly
 | ||||||
|                         if !new_vm_resp.error.is_empty() { |                         if !new_vm_resp.error.is_empty() { | ||||||
|                             db::NewVmReq::submit_error(&new_vm_resp.uuid, new_vm_resp.error) |                             db::NewVmReq::submit_error( | ||||||
|                                 .await?; |                                 &self.db, | ||||||
|  |                                 &new_vm_resp.uuid, | ||||||
|  |                                 new_vm_resp.error, | ||||||
|  |                             ) | ||||||
|  |                             .await?; | ||||||
|                         } else { |                         } else { | ||||||
|                             db::upsert_record( |                             db::upsert_record( | ||||||
|  |                                 &self.db, | ||||||
|                                 "measurement_args", |                                 "measurement_args", | ||||||
|                                 &new_vm_resp.uuid, |                                 &new_vm_resp.uuid, | ||||||
|                                 new_vm_resp.args.clone(), |                                 new_vm_resp.args.clone(), | ||||||
|                             ) |                             ) | ||||||
|                             .await?; |                             .await?; | ||||||
|                             if let Some(args) = new_vm_resp.args { |                             if let Some(args) = new_vm_resp.args { | ||||||
|                                 db::ActiveVm::activate(&new_vm_resp.uuid, args).await?; |                                 db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?; | ||||||
|                             } |                             } | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
|                     Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { |                     Some(vm_daemon_message::Msg::UpdateVmResp(_update_vm_resp)) => { | ||||||
|                         todo!(); |                         todo!(); | ||||||
|                         // self.data.submit_updatevm_resp(update_vm_resp).await;
 |                         // self.data.submit_updatevm_resp(update_vm_resp).await;
 | ||||||
|                     } |                     } | ||||||
|                     Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { |                     Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { | ||||||
|                         let node_resources: db::VmNodeResources = node_resources.into(); |                         let node_resources: db::VmNodeResources = node_resources.into(); | ||||||
|                         node_resources.merge(&pubkey).await?; |                         node_resources.merge(&self.db, &pubkey).await?; | ||||||
|                     } |                     } | ||||||
|                     _ => {} |                     _ => {} | ||||||
|                 }, |                 }, | ||||||
| @ -405,24 +431,32 @@ impl BrainGeneralCli for BrainGeneralCliForReal { | |||||||
| 
 | 
 | ||||||
|     async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> { |     async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         Ok(Response::new(db::Account::get(&req.pubkey).await?.into())) |         Ok(Response::new(db::Account::get(&self.db, &req.pubkey).await?.into())) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> { |     async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         let (account, node) = match db::ActiveVmWithNode::get_by_uuid(&req.contract).await? { |         let (account, node, contract_id) = | ||||||
|             Some(vm_contract) |             match db::ActiveVmWithNode::get_by_uuid(&self.db, &req.contract).await? { | ||||||
|                 if vm_contract.admin.key().to_string() == req.admin_pubkey |                 Some(vm_contract) | ||||||
|                     && vm_contract.vm_node.id.key().to_string() == req.node_pubkey => |                     if vm_contract.admin.key().to_string() == req.admin_pubkey | ||||||
|             { |                         && vm_contract.vm_node.id.key().to_string() == req.node_pubkey => | ||||||
|                 (vm_contract.admin, vm_contract.vm_node.id) |                 { | ||||||
|             } |                     (vm_contract.admin, vm_contract.vm_node.id, vm_contract.id.to_string()) | ||||||
|             _ => { |                 } | ||||||
|                 // TODO: Hey, Noor! Please add app contract here.
 |                 _ => match db::ActiveAppWithNode::get_by_uuid(&self.db, &req.contract).await? { | ||||||
|                 return Err(Status::unauthenticated("No contract found by this ID.")); |                     Some(app_contract) | ||||||
|             } |                         if app_contract.admin.key().to_string() == req.admin_pubkey | ||||||
|         }; |                             && app_contract.app_node.id.key().to_string() == req.node_pubkey => | ||||||
|         db::Report::create(account, node, req.reason).await?; |                     { | ||||||
|  |                         (app_contract.admin, app_contract.app_node.id, app_contract.id.to_string()) | ||||||
|  |                     } | ||||||
|  |                     _ => { | ||||||
|  |                         return Err(Status::unauthenticated("No contract found by this ID.")); | ||||||
|  |                     } | ||||||
|  |                 }, | ||||||
|  |             }; | ||||||
|  |         db::Report::create(&self.db, account, node, req.reason, contract_id).await?; | ||||||
|         Ok(Response::new(Empty {})) |         Ok(Response::new(Empty {})) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| @ -431,7 +465,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal { | |||||||
|         req: Request<Empty>, |         req: Request<Empty>, | ||||||
|     ) -> Result<Response<Self::ListOperatorsStream>, Status> { |     ) -> Result<Response<Self::ListOperatorsStream>, Status> { | ||||||
|         let _ = check_sig_from_req(req)?; |         let _ = check_sig_from_req(req)?; | ||||||
|         let operators = db::Operator::list().await?; |         let operators = db::Operator::list(&self.db).await?; | ||||||
|         let (tx, rx) = mpsc::channel(6); |         let (tx, rx) = mpsc::channel(6); | ||||||
|         tokio::spawn(async move { |         tokio::spawn(async move { | ||||||
|             for op in operators { |             for op in operators { | ||||||
| @ -446,7 +480,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal { | |||||||
|         &self, |         &self, | ||||||
|         req: Request<Pubkey>, |         req: Request<Pubkey>, | ||||||
|     ) -> Result<Response<InspectOperatorResp>, Status> { |     ) -> Result<Response<InspectOperatorResp>, Status> { | ||||||
|         match db::Operator::inspect_nodes(&req.into_inner().pubkey).await? { |         match db::Operator::inspect_nodes(&self.db, &req.into_inner().pubkey).await? { | ||||||
|             (Some(op), vm_nodes, app_nodes) => Ok(Response::new(InspectOperatorResp { |             (Some(op), vm_nodes, app_nodes) => Ok(Response::new(InspectOperatorResp { | ||||||
|                 operator: Some(op.into()), |                 operator: Some(op.into()), | ||||||
|                 vm_nodes: vm_nodes.into_iter().map(|n| n.into()).collect(), |                 vm_nodes: vm_nodes.into_iter().map(|n| n.into()).collect(), | ||||||
| @ -490,7 +524,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal { | |||||||
|     async fn airdrop(&self, req: Request<AirdropReq>) -> Result<Response<Empty>, Status> { |     async fn airdrop(&self, req: Request<AirdropReq>) -> Result<Response<Empty>, Status> { | ||||||
|         check_admin_key(&req)?; |         check_admin_key(&req)?; | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         db::Account::airdrop(&req.pubkey, req.tokens).await?; |         db::Account::airdrop(&self.db, &req.pubkey, req.tokens).await?; | ||||||
|         Ok(Response::new(Empty {})) |         Ok(Response::new(Empty {})) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| @ -557,7 +591,15 @@ impl BrainGeneralCli for BrainGeneralCliForReal { | |||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub struct BrainVmCliForReal {} | pub struct BrainVmCliForReal { | ||||||
|  |     pub db: Arc<Surreal<Client>>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl BrainVmCliForReal { | ||||||
|  |     pub fn new(db: Arc<Surreal<Client>>) -> Self { | ||||||
|  |         Self { db } | ||||||
|  |     } | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| #[tonic::async_trait] | #[tonic::async_trait] | ||||||
| impl BrainVmCli for BrainVmCliForReal { | impl BrainVmCli for BrainVmCliForReal { | ||||||
| @ -567,19 +609,23 @@ impl BrainVmCli for BrainVmCliForReal { | |||||||
|     async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> { |     async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         info!("New VM requested via CLI: {req:?}"); |         info!("New VM requested via CLI: {req:?}"); | ||||||
|         if db::Account::is_banned_by_node(&req.admin_pubkey, &req.node_pubkey).await? { |         if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { | ||||||
|             return Err(Status::permission_denied("This operator banned you. What did you do?")); |             return Err(Status::permission_denied("This operator banned you. What did you do?")); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         let new_vm_req: db::NewVmReq = req.into(); |         let new_vm_req: db::NewVmReq = req.into(); | ||||||
|         let id = new_vm_req.id.key().to_string(); |         let id = new_vm_req.id.key().to_string(); | ||||||
|  | 
 | ||||||
|  |         let db = self.db.clone(); | ||||||
|  | 
 | ||||||
|         let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); |         let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); | ||||||
|         tokio::spawn(async move { |         tokio::spawn(async move { | ||||||
|             let _ = oneshot_tx.send(db::NewVmResp::listen(&id).await); |             let _ = oneshot_tx.send(db::NewVmResp::listen(&db, &id).await); | ||||||
|         }); |         }); | ||||||
|         new_vm_req.submit().await?; |         new_vm_req.submit(&self.db).await?; | ||||||
| 
 | 
 | ||||||
|         match oneshot_rx.await { |         match oneshot_rx.await { | ||||||
|  |             Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded("Request failed due to timeout. Please try again later or contact the DeTEE devs team.")), | ||||||
|             Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), |             Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), | ||||||
|             Err(e) => { |             Err(e) => { | ||||||
|                 log::error!("Something weird happened. Reached error {e:?}"); |                 log::error!("Something weird happened. Reached error {e:?}"); | ||||||
| @ -639,20 +685,20 @@ impl BrainVmCli for BrainVmCliForReal { | |||||||
|         ); |         ); | ||||||
|         let mut contracts = Vec::new(); |         let mut contracts = Vec::new(); | ||||||
|         if !req.uuid.is_empty() { |         if !req.uuid.is_empty() { | ||||||
|             if let Some(specific_contract) = db::ActiveVmWithNode::get_by_uuid(&req.uuid).await? { |             if let Some(specific_contract) = | ||||||
|  |                 db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? | ||||||
|  |             { | ||||||
|                 if specific_contract.admin.key().to_string() == req.wallet { |                 if specific_contract.admin.key().to_string() == req.wallet { | ||||||
|                     contracts.push(specific_contract.into()); |                     contracts.push(specific_contract); | ||||||
|                 } |                 } | ||||||
|                 // TODO: allow operator to inspect contracts
 |                 // TODO: allow operator to inspect contracts
 | ||||||
|             } |             } | ||||||
|  |         } else if req.as_operator { | ||||||
|  |             contracts | ||||||
|  |                 .append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?); | ||||||
|         } else { |         } else { | ||||||
|             if req.as_operator { |             contracts | ||||||
|                 contracts |                 .append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?); | ||||||
|                     .append(&mut db::ActiveVmWithNode::list_by_operator(&req.wallet).await?.into()); |  | ||||||
|             } else { |  | ||||||
|                 contracts |  | ||||||
|                     .append(&mut db::ActiveVmWithNode::list_by_admin(&req.wallet).await?.into()); |  | ||||||
|             } |  | ||||||
|         } |         } | ||||||
|         let (tx, rx) = mpsc::channel(6); |         let (tx, rx) = mpsc::channel(6); | ||||||
|         tokio::spawn(async move { |         tokio::spawn(async move { | ||||||
| @ -670,7 +716,7 @@ impl BrainVmCli for BrainVmCliForReal { | |||||||
|     ) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> { |     ) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> { | ||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         info!("CLI requested ListVmNodesStream: {req:?}"); |         info!("CLI requested ListVmNodesStream: {req:?}"); | ||||||
|         let nodes = db::VmNodeWithReports::find_by_filters(req).await?; |         let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; | ||||||
|         let (tx, rx) = mpsc::channel(6); |         let (tx, rx) = mpsc::channel(6); | ||||||
|         tokio::spawn(async move { |         tokio::spawn(async move { | ||||||
|             for node in nodes { |             for node in nodes { | ||||||
| @ -688,7 +734,7 @@ impl BrainVmCli for BrainVmCliForReal { | |||||||
|         let req = check_sig_from_req(req)?; |         let req = check_sig_from_req(req)?; | ||||||
|         info!("Unknown CLI requested ListVmNodesStream: {req:?}"); |         info!("Unknown CLI requested ListVmNodesStream: {req:?}"); | ||||||
|         // TODO: optimize this query so that it gets only one node
 |         // TODO: optimize this query so that it gets only one node
 | ||||||
|         let nodes = db::VmNodeWithReports::find_by_filters(req).await?; |         let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; | ||||||
|         if let Some(node) = nodes.into_iter().next() { |         if let Some(node) = nodes.into_iter().next() { | ||||||
|             return Ok(Response::new(node.into())); |             return Ok(Response::new(node.into())); | ||||||
|         } |         } | ||||||
| @ -754,7 +800,7 @@ fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Res | |||||||
|     let parsed_time = chrono::DateTime::parse_from_rfc3339(time) |     let parsed_time = chrono::DateTime::parse_from_rfc3339(time) | ||||||
|         .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; |         .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; | ||||||
|     let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); |     let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); | ||||||
|     if seconds_elapsed > 4 || seconds_elapsed < -4 { |     if !(-4..=4).contains(&seconds_elapsed) { | ||||||
|         return Err(Status::unauthenticated(format!( |         return Err(Status::unauthenticated(format!( | ||||||
|             "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", |             "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", | ||||||
|             parsed_time, now |             parsed_time, now | ||||||
| @ -795,7 +841,7 @@ fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Res | |||||||
|         .verify(message.as_bytes(), &signature) |         .verify(message.as_bytes(), &signature) | ||||||
|         .map_err(|_| Status::unauthenticated("the signature is not valid"))?; |         .map_err(|_| Status::unauthenticated("the signature is not valid"))?; | ||||||
|     if let Some(req_pubkey) = req.get_pubkey() { |     if let Some(req_pubkey) = req.get_pubkey() { | ||||||
|         if pubkey_value.to_str().unwrap().to_string() != req_pubkey { |         if *pubkey_value.to_str().unwrap() != req_pubkey { | ||||||
|             return Err(Status::unauthenticated( |             return Err(Status::unauthenticated( | ||||||
|                 "pubkey of signature does not match pubkey of request", |                 "pubkey of signature does not match pubkey of request", | ||||||
|             )); |             )); | ||||||
| @ -809,7 +855,7 @@ fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Resul | |||||||
|     let parsed_time = chrono::DateTime::parse_from_rfc3339(time) |     let parsed_time = chrono::DateTime::parse_from_rfc3339(time) | ||||||
|         .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; |         .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; | ||||||
|     let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); |     let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); | ||||||
|     if seconds_elapsed > 4 || seconds_elapsed < -4 { |     if !(-4..=4).contains(&seconds_elapsed) { | ||||||
|         return Err(Status::unauthenticated(format!( |         return Err(Status::unauthenticated(format!( | ||||||
|             "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", |             "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", | ||||||
|             parsed_time, now |             parsed_time, now | ||||||
|  | |||||||
| @ -1,2 +1,8 @@ | |||||||
|  | #[allow(dead_code)] | ||||||
| pub mod prepare_test_env; | pub mod prepare_test_env; | ||||||
|  | #[allow(dead_code)] | ||||||
| pub mod test_utils; | pub mod test_utils; | ||||||
|  | #[allow(dead_code)] | ||||||
|  | pub mod vm_cli_utils; | ||||||
|  | #[allow(dead_code)] | ||||||
|  | pub mod vm_daemon_utils; | ||||||
|  | |||||||
| @ -3,47 +3,64 @@ use detee_shared::{ | |||||||
|     general_proto::brain_general_cli_server::BrainGeneralCliServer, |     general_proto::brain_general_cli_server::BrainGeneralCliServer, | ||||||
|     vm_proto::brain_vm_daemon_server::BrainVmDaemonServer, |     vm_proto::brain_vm_daemon_server::BrainVmDaemonServer, | ||||||
| }; | }; | ||||||
|  | use dotenv::dotenv; | ||||||
| use hyper_util::rt::TokioIo; | use hyper_util::rt::TokioIo; | ||||||
| use std::net::SocketAddr; | use std::net::SocketAddr; | ||||||
|  | use std::sync::Arc; | ||||||
| use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmCliForReal, BrainVmDaemonForReal}; | use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmCliForReal, BrainVmDaemonForReal}; | ||||||
|  | use surrealdb::engine::remote::ws::Client; | ||||||
|  | use surrealdb::Surreal; | ||||||
| use tokio::io::DuplexStream; | use tokio::io::DuplexStream; | ||||||
| use tokio::{net::TcpListener, sync::OnceCell}; | use tokio::{net::TcpListener, sync::OnceCell}; | ||||||
| use tonic::transport::{Channel, Endpoint, Server, Uri}; | use tonic::transport::{Channel, Endpoint, Server, Uri}; | ||||||
| use tower::service_fn; | use tower::service_fn; | ||||||
| 
 | 
 | ||||||
| pub const DB_URL: &str = "localhost:8000"; |  | ||||||
| pub const DB_NS: &str = "test_brain"; |  | ||||||
| pub const DB_NAME: &str = "test_migration_db"; |  | ||||||
| 
 |  | ||||||
| pub static DB_STATE: OnceCell<()> = OnceCell::const_new(); | pub static DB_STATE: OnceCell<()> = OnceCell::const_new(); | ||||||
| 
 | 
 | ||||||
| pub async fn prepare_test_db() { | pub async fn prepare_test_db() -> Surreal<Client> { | ||||||
|  |     dotenv().ok(); | ||||||
|  | 
 | ||||||
|  |     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||||
|  |     let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); | ||||||
|  |     let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env"); | ||||||
|  |     let db_ns = "test_brain"; | ||||||
|  |     let db_name = "test_migration_db"; | ||||||
|  | 
 | ||||||
|  |     let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) | ||||||
|  |         .await | ||||||
|  |         .unwrap(); | ||||||
|     DB_STATE |     DB_STATE | ||||||
|         .get_or_init(|| async { |         .get_or_init(|| async { | ||||||
|             surreal_brain::db::init(DB_URL, DB_NS, DB_NAME) |  | ||||||
|                 .await |  | ||||||
|                 .expect("Failed to initialize the database"); |  | ||||||
| 
 |  | ||||||
|             let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); |             let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); | ||||||
|             surreal_brain::db::DB.query(format!("REMOVE DATABASE {DB_NAME}")).await.unwrap(); |             db.query(format!("REMOVE DATABASE {db_name}")).await.unwrap(); | ||||||
|             surreal_brain::db::DB |             db.query(std::fs::read_to_string("interim_tables.surql").unwrap()).await.unwrap(); | ||||||
|                 .query(std::fs::read_to_string("interim_tables.surql").unwrap()) |             surreal_brain::db::migration0(&db, &old_brain_data).await.unwrap(); | ||||||
|                 .await |  | ||||||
|                 .unwrap(); |  | ||||||
|             surreal_brain::db::migration0(&old_brain_data).await.unwrap(); |  | ||||||
|         }) |         }) | ||||||
|         .await; |         .await; | ||||||
|  |     db | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn run_service_in_background() -> SocketAddr { | pub async fn run_service_in_background() -> SocketAddr { | ||||||
|  |     dotenv().ok(); | ||||||
|     let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); |     let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||||||
|     let addr = listener.local_addr().unwrap(); |     let addr = listener.local_addr().unwrap(); | ||||||
| 
 | 
 | ||||||
|  |     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||||
|  |     let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); | ||||||
|  |     let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env"); | ||||||
|  |     let db_ns = "test_brain"; | ||||||
|  |     let db_name = "test_migration_db"; | ||||||
|  | 
 | ||||||
|     tokio::spawn(async move { |     tokio::spawn(async move { | ||||||
|  |         let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) | ||||||
|  |             .await | ||||||
|  |             .unwrap(); | ||||||
|  |         let db_arc = Arc::new(db); | ||||||
|  | 
 | ||||||
|         Server::builder() |         Server::builder() | ||||||
|             .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal {})) |             .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone()))) | ||||||
|             .add_service(BrainVmCliServer::new(BrainVmCliForReal {})) |             .add_service(BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone()))) | ||||||
|             .add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal {})) |             .add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone()))) | ||||||
|             .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) |             .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) | ||||||
|             .await |             .await | ||||||
|             .unwrap(); |             .unwrap(); | ||||||
| @ -54,46 +71,45 @@ pub async fn run_service_in_background() -> SocketAddr { | |||||||
|     addr |     addr | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn run_service_for_stream_server() -> (DuplexStream, SocketAddr) { | pub async fn run_service_for_stream_server() -> DuplexStream { | ||||||
|  |     dotenv().ok(); | ||||||
|     let (client, server) = tokio::io::duplex(1024); |     let (client, server) = tokio::io::duplex(1024); | ||||||
| 
 | 
 | ||||||
|     let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); |     let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); | ||||||
|     let addr = listener.local_addr().unwrap(); |     let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); | ||||||
|  |     let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env"); | ||||||
|  |     let db_ns = "test_brain"; | ||||||
|  |     let db_name = "test_migration_db"; | ||||||
| 
 | 
 | ||||||
|     tokio::spawn(async move { |     tokio::spawn(async move { | ||||||
|  |         let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) | ||||||
|  |             .await | ||||||
|  |             .unwrap(); | ||||||
|  |         let db_arc = Arc::new(db); | ||||||
|  | 
 | ||||||
|         tonic::transport::Server::builder() |         tonic::transport::Server::builder() | ||||||
|             .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal {})) |             .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone()))) | ||||||
|             .add_service(BrainVmCliServer::new(BrainVmCliForReal {})) |             .add_service(BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone()))) | ||||||
|             .add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal {})) |             .add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone()))) | ||||||
|             .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) |             .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) | ||||||
|             .await |             .await | ||||||
|     }); |     }); | ||||||
|     (client, addr) |     client | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn connect_stream_client_channel(c_stream: DuplexStream, addr: SocketAddr) -> Channel { | pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Channel { | ||||||
|     let url = format!("http://{}", addr); |  | ||||||
|     let mut client = Some(c_stream); |     let mut client = Some(c_stream); | ||||||
| 
 | 
 | ||||||
|     Endpoint::try_from(url) |     Endpoint::from_static("http://127.0.0.1:0") | ||||||
|         .unwrap() |  | ||||||
|         .connect_with_connector(service_fn(move |_: Uri| { |         .connect_with_connector(service_fn(move |_: Uri| { | ||||||
|             let client = client.take(); |             let client = client.take().unwrap(); | ||||||
| 
 |             async move { Ok::<TokioIo<DuplexStream>, std::io::Error>(TokioIo::new(client)) } | ||||||
|             async move { |  | ||||||
|                 if let Some(client) = client { |  | ||||||
|                     Ok(TokioIo::new(client)) |  | ||||||
|                 } else { |  | ||||||
|                     Err(std::io::Error::new(std::io::ErrorKind::Other, "Client already taken")) |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         })) |         })) | ||||||
|         .await |         .await | ||||||
|         .unwrap() |         .unwrap() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[allow(dead_code)] |  | ||||||
| pub async fn run_service_for_stream() -> Channel { | pub async fn run_service_for_stream() -> Channel { | ||||||
|     let (client, addr) = run_service_for_stream_server().await; |     let client = run_service_for_stream_server().await; | ||||||
|     connect_stream_client_channel(client, addr).await |     connect_stream_client_channel(client).await | ||||||
| } | } | ||||||
|  | |||||||
| @ -38,7 +38,6 @@ impl Key { | |||||||
|         Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) |         Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     #[allow(dead_code)] |  | ||||||
|     pub fn sign_stream_auth(&self, contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> { |     pub fn sign_stream_auth(&self, contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> { | ||||||
|         let pubkey = self.pubkey.clone(); |         let pubkey = self.pubkey.clone(); | ||||||
|         let timestamp = chrono::Utc::now().to_rfc3339(); |         let timestamp = chrono::Utc::now().to_rfc3339(); | ||||||
|  | |||||||
							
								
								
									
										47
									
								
								tests/common/vm_cli_utils.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										47
									
								
								tests/common/vm_cli_utils.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,47 @@ | |||||||
|  | use super::test_utils::Key; | ||||||
|  | use detee_shared::vm_proto; | ||||||
|  | use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||||
|  | use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; | ||||||
|  | use surreal_brain::db; | ||||||
|  | use surrealdb::engine::remote::ws::Client; | ||||||
|  | use surrealdb::Surreal; | ||||||
|  | use tonic::transport::Channel; | ||||||
|  | 
 | ||||||
|  | pub async fn create_new_vm( | ||||||
|  |     db: &Surreal<Client>, | ||||||
|  |     key: Key, | ||||||
|  |     node_pubkey: String, | ||||||
|  |     brain_channel: Channel, | ||||||
|  | ) -> String { | ||||||
|  |     let new_vm_req = vm_proto::NewVmReq { | ||||||
|  |         admin_pubkey: key.pubkey.clone(), | ||||||
|  |         node_pubkey, | ||||||
|  |         price_per_unit: 1200, | ||||||
|  |         extra_ports: vec![8080, 8081], | ||||||
|  |         locked_nano: 0, | ||||||
|  |         ..Default::default() | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); | ||||||
|  |     let new_vm_resp = | ||||||
|  |         client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.unwrap().into_inner(); | ||||||
|  | 
 | ||||||
|  |     assert!(new_vm_resp.error.is_empty()); | ||||||
|  |     assert!(new_vm_resp.uuid.len() == 40); | ||||||
|  | 
 | ||||||
|  |     // wait for update db
 | ||||||
|  |     tokio::time::sleep(tokio::time::Duration::from_millis(700)).await; | ||||||
|  | 
 | ||||||
|  |     let vm_req_db: Option<db::NewVmReq> = | ||||||
|  |         db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap(); | ||||||
|  | 
 | ||||||
|  |     if let Some(new_vm_req) = vm_req_db { | ||||||
|  |         panic!("New VM request found in DB: {:?}", new_vm_req); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     let active_vm_op: Option<db::ActiveVm> = | ||||||
|  |         db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap(); | ||||||
|  |     let active_vm = active_vm_op.unwrap(); | ||||||
|  | 
 | ||||||
|  |     active_vm.id.key().to_string() | ||||||
|  | } | ||||||
							
								
								
									
										134
									
								
								tests/common/vm_daemon_utils.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										134
									
								
								tests/common/vm_daemon_utils.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,134 @@ | |||||||
|  | use super::test_utils::Key; | ||||||
|  | use detee_shared::vm_proto; | ||||||
|  | use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||||
|  | use detee_shared::vm_proto::RegisterVmNodeReq; | ||||||
|  | use futures::StreamExt; | ||||||
|  | use tokio::sync::mpsc; | ||||||
|  | use tokio_stream::wrappers::ReceiverStream; | ||||||
|  | use tonic::transport::Channel; | ||||||
|  | 
 | ||||||
|  | pub async fn mock_vm_daemon(brain_channel: Channel) -> String { | ||||||
|  |     let daemon_client = BrainVmDaemonClient::new(brain_channel); | ||||||
|  |     let daemon_key = Key::new(); | ||||||
|  | 
 | ||||||
|  |     register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; | ||||||
|  | 
 | ||||||
|  |     let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); | ||||||
|  | 
 | ||||||
|  |     tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); | ||||||
|  | 
 | ||||||
|  |     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); | ||||||
|  |     tokio::spawn(daemon_msg_sender( | ||||||
|  |         daemon_client.clone(), | ||||||
|  |         daemon_key.clone(), | ||||||
|  |         daemon_msg_tx.clone(), | ||||||
|  |         rx, | ||||||
|  |     )); | ||||||
|  | 
 | ||||||
|  |     tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx)); | ||||||
|  | 
 | ||||||
|  |     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||||||
|  | 
 | ||||||
|  |     daemon_key.pubkey | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn register_vm_node( | ||||||
|  |     mut client: BrainVmDaemonClient<Channel>, | ||||||
|  |     key: Key, | ||||||
|  |     operator_wallet: String, | ||||||
|  | ) -> Vec<vm_proto::VmContract> { | ||||||
|  |     let node_pubkey = key.pubkey.clone(); | ||||||
|  | 
 | ||||||
|  |     let req = RegisterVmNodeReq { | ||||||
|  |         node_pubkey, | ||||||
|  |         operator_wallet, | ||||||
|  |         main_ip: String::from("185.243.218.213"), | ||||||
|  |         city: String::from("Oslo"), | ||||||
|  |         country: String::from("Norway"), | ||||||
|  |         region: String::from("EU"), | ||||||
|  |         price: 1200, | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     let mut grpc_stream = | ||||||
|  |         client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner(); | ||||||
|  | 
 | ||||||
|  |     let mut vm_contracts = Vec::new(); | ||||||
|  |     while let Some(stream_update) = grpc_stream.next().await { | ||||||
|  |         match stream_update { | ||||||
|  |             Ok(vm_c) => { | ||||||
|  |                 vm_contracts.push(vm_c); | ||||||
|  |             } | ||||||
|  |             Err(e) => { | ||||||
|  |                 panic!("Received error instead of vm_contracts: {e:?}"); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     vm_contracts | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn daemon_listener( | ||||||
|  |     mut client: BrainVmDaemonClient<Channel>, | ||||||
|  |     key: Key, | ||||||
|  |     tx: mpsc::Sender<vm_proto::BrainVmMessage>, | ||||||
|  | ) { | ||||||
|  |     let mut grpc_stream = | ||||||
|  |         client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); | ||||||
|  | 
 | ||||||
|  |     while let Some(Ok(stream_update)) = grpc_stream.next().await { | ||||||
|  |         log::info!("vm deamon got notified: {:?}", &stream_update); | ||||||
|  |         let _ = tx.send(stream_update).await; | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn daemon_msg_sender( | ||||||
|  |     mut client: BrainVmDaemonClient<Channel>, | ||||||
|  |     key: Key, | ||||||
|  |     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, | ||||||
|  |     rx: mpsc::Receiver<vm_proto::VmDaemonMessage>, | ||||||
|  | ) { | ||||||
|  |     let rx_stream = ReceiverStream::new(rx); | ||||||
|  |     tx.send(vm_proto::VmDaemonMessage { | ||||||
|  |         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), | ||||||
|  |     }) | ||||||
|  |     .await | ||||||
|  |     .unwrap(); | ||||||
|  |     client.daemon_messages(rx_stream).await.unwrap(); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn daemon_engine( | ||||||
|  |     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, | ||||||
|  |     mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>, | ||||||
|  | ) { | ||||||
|  |     while let Some(brain_msg) = rx.recv().await { | ||||||
|  |         match brain_msg.msg { | ||||||
|  |             Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { | ||||||
|  |                 let args = Some(vm_proto::MeasurementArgs { | ||||||
|  |                     dtrfs_api_endpoint: String::from("184.107.169.199:48865"), | ||||||
|  |                     exposed_ports: new_vm_req.extra_ports, | ||||||
|  |                     ovmf_hash: String::from( | ||||||
|  |                         "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", | ||||||
|  |                     ), | ||||||
|  |                     ips: vec![], | ||||||
|  |                 }); | ||||||
|  | 
 | ||||||
|  |                 let new_vm_resp = vm_proto::NewVmResp { | ||||||
|  |                     uuid: new_vm_req.uuid.clone(), | ||||||
|  |                     args, | ||||||
|  |                     error: String::new(), | ||||||
|  |                 }; | ||||||
|  | 
 | ||||||
|  |                 let res_data = vm_proto::VmDaemonMessage { | ||||||
|  |                     msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), | ||||||
|  |                 }; | ||||||
|  |                 tx.send(res_data).await.unwrap(); | ||||||
|  |             } | ||||||
|  |             Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { | ||||||
|  |                 todo!() | ||||||
|  |             } | ||||||
|  |             Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { | ||||||
|  |                 todo!() | ||||||
|  |             } | ||||||
|  |             None => todo!(), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
| @ -1,6 +1,8 @@ | |||||||
| use common::{ | use common::{ | ||||||
|     prepare_test_env::{prepare_test_db, run_service_for_stream, run_service_in_background}, |     prepare_test_env::{prepare_test_db, run_service_for_stream, run_service_in_background}, | ||||||
|     test_utils::Key, |     test_utils::Key, | ||||||
|  |     vm_cli_utils::create_new_vm, | ||||||
|  |     vm_daemon_utils::mock_vm_daemon, | ||||||
| }; | }; | ||||||
| use detee_shared::common_proto::Empty; | use detee_shared::common_proto::Empty; | ||||||
| use detee_shared::general_proto::ReportNodeReq; | use detee_shared::general_proto::ReportNodeReq; | ||||||
| @ -10,6 +12,8 @@ use detee_shared::{ | |||||||
|     common_proto::Pubkey, general_proto::brain_general_cli_client::BrainGeneralCliClient, |     common_proto::Pubkey, general_proto::brain_general_cli_client::BrainGeneralCliClient, | ||||||
| }; | }; | ||||||
| use futures::StreamExt; | use futures::StreamExt; | ||||||
|  | use surreal_brain::constants::VM_NODE; | ||||||
|  | use surreal_brain::db::VmNodeWithReports; | ||||||
| 
 | 
 | ||||||
| mod common; | mod common; | ||||||
| 
 | 
 | ||||||
| @ -34,29 +38,70 @@ async fn test_general_balance() { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[tokio::test] | #[tokio::test] | ||||||
| async fn test_report_node() { | async fn test_vm_creation() { | ||||||
|     prepare_test_db().await; |     let db = prepare_test_db().await; | ||||||
| 
 | 
 | ||||||
|     let addr = run_service_in_background().await; |     let brain_channel = run_service_for_stream().await; | ||||||
|     let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); |     let daemon_key = mock_vm_daemon(brain_channel.clone()).await; | ||||||
|  | 
 | ||||||
|  |     let key = Key::new(); | ||||||
|  | 
 | ||||||
|  |     let _ = create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[tokio::test] | ||||||
|  | async fn test_report_node() { | ||||||
|  |     let db = prepare_test_db().await; | ||||||
|  | 
 | ||||||
|  |     let brain_channel = run_service_for_stream().await; | ||||||
|  |     let daemon_key = mock_vm_daemon(brain_channel.clone()).await; | ||||||
|  |     let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone()); | ||||||
| 
 | 
 | ||||||
|     let key = Key::new(); |     let key = Key::new(); | ||||||
|     let pubkey = key.pubkey.clone(); |     let pubkey = key.pubkey.clone(); | ||||||
| 
 | 
 | ||||||
|     // TODO: create contract, node and operator in db and use it here
 |     let report_req = ReportNodeReq { | ||||||
|     let req_data = ReportNodeReq { |         admin_pubkey: pubkey.clone(), | ||||||
|         admin_pubkey: pubkey, |         node_pubkey: daemon_key.clone(), | ||||||
|         node_pubkey: String::from("node_pubkey"), |  | ||||||
|         contract: String::from("uuid"), |         contract: String::from("uuid"), | ||||||
|         reason: String::from("reason"), |         reason: String::from("reason"), | ||||||
|     }; |     }; | ||||||
| 
 | 
 | ||||||
|     let report_error = client.report_node(key.sign_request(req_data).unwrap()).await.err().unwrap(); |     let report_error = | ||||||
|  |         client_gen_cli.report_node(key.sign_request(report_req).unwrap()).await.err().unwrap(); | ||||||
| 
 | 
 | ||||||
|     println!("Report error: {:?}", report_error); |     println!("Report error: {:?}", report_error); | ||||||
|     assert_eq!(report_error.message(), "No contract found by this ID."); |     assert_eq!(report_error.message(), "No contract found by this ID."); | ||||||
| 
 | 
 | ||||||
|     // verify report in db
 |     let active_vm_id = | ||||||
|  |         create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await; | ||||||
|  | 
 | ||||||
|  |     let reason = String::from("something went wrong on vm"); | ||||||
|  |     let report_req = ReportNodeReq { | ||||||
|  |         admin_pubkey: pubkey, | ||||||
|  |         node_pubkey: daemon_key.clone(), | ||||||
|  |         contract: active_vm_id, | ||||||
|  |         reason: reason.clone(), | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     let _ = client_gen_cli | ||||||
|  |         .report_node(key.sign_request(report_req).unwrap()) | ||||||
|  |         .await | ||||||
|  |         .unwrap() | ||||||
|  |         .into_inner(); | ||||||
|  | 
 | ||||||
|  |     let vm_nodes: Vec<VmNodeWithReports> = db | ||||||
|  |         .query(format!( | ||||||
|  |             "SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};" | ||||||
|  |         )) | ||||||
|  |         .await | ||||||
|  |         .unwrap() | ||||||
|  |         .take(0) | ||||||
|  |         .unwrap(); | ||||||
|  | 
 | ||||||
|  |     let vm_node_with_report = vm_nodes.get(0).unwrap(); | ||||||
|  | 
 | ||||||
|  |     assert!(vm_node_with_report.reports[0].reason == reason); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[tokio::test] | #[tokio::test] | ||||||
|  | |||||||
| @ -1,17 +1,11 @@ | |||||||
| use common::{ | use common::{ | ||||||
|     prepare_test_env::{ |     prepare_test_env::{prepare_test_db, run_service_for_stream, run_service_in_background}, | ||||||
|         connect_stream_client_channel, prepare_test_db, run_service_for_stream_server, |  | ||||||
|         run_service_in_background, |  | ||||||
|     }, |  | ||||||
|     test_utils::Key, |     test_utils::Key, | ||||||
|  |     vm_daemon_utils::{mock_vm_daemon, register_vm_node}, | ||||||
| }; | }; | ||||||
| use detee_shared::vm_proto; | use detee_shared::vm_proto; | ||||||
| use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||||
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||||
| use detee_shared::vm_proto::RegisterVmNodeReq; |  | ||||||
| use futures::StreamExt; |  | ||||||
| use tokio::task::JoinSet; |  | ||||||
| use tokio_stream::wrappers::ReceiverStream; |  | ||||||
| 
 | 
 | ||||||
| mod common; | mod common; | ||||||
| 
 | 
 | ||||||
| @ -31,144 +25,20 @@ async fn test_reg_vm_node() { | |||||||
|     assert!(vm_contracts.is_empty()) |     assert!(vm_contracts.is_empty()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn register_vm_node( |  | ||||||
|     mut client: BrainVmDaemonClient<tonic::transport::Channel>, |  | ||||||
|     key: Key, |  | ||||||
|     operator_wallet: String, |  | ||||||
| ) -> Vec<vm_proto::VmContract> { |  | ||||||
|     let node_pubkey = key.pubkey.clone(); |  | ||||||
| 
 |  | ||||||
|     let req = RegisterVmNodeReq { |  | ||||||
|         node_pubkey, |  | ||||||
|         operator_wallet, |  | ||||||
|         main_ip: String::from("185.243.218.213"), |  | ||||||
|         city: String::from("Oslo"), |  | ||||||
|         country: String::from("Norway"), |  | ||||||
|         region: String::from("EU"), |  | ||||||
|         price: 1200, |  | ||||||
|     }; |  | ||||||
| 
 |  | ||||||
|     let mut grpc_stream = |  | ||||||
|         client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner(); |  | ||||||
| 
 |  | ||||||
|     let mut vm_contracts = Vec::new(); |  | ||||||
|     while let Some(stream_update) = grpc_stream.next().await { |  | ||||||
|         match stream_update { |  | ||||||
|             Ok(vm_c) => { |  | ||||||
|                 vm_contracts.push(vm_c); |  | ||||||
|             } |  | ||||||
|             Err(e) => { |  | ||||||
|                 panic!("Received error instead of vm_contracts: {e:?}"); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|     vm_contracts |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| #[tokio::test] | #[tokio::test] | ||||||
| async fn test_brain_message() { | async fn test_brain_message() { | ||||||
|     // spawn grpc stream server
 |  | ||||||
|     // mock a daemon
 |  | ||||||
|     // mock a cli client to interact with brain
 |  | ||||||
| 
 |  | ||||||
|     // validate if something happening in "surreal_brain::db::NewVmReq", "surreal_brain::db::UpdateVmReq", "surreal_brain::db::DeletedVm" these table
 |  | ||||||
|     // mock daemon will responde to brain
 |  | ||||||
| 
 |  | ||||||
|     env_logger::builder().filter_level(log::LevelFilter::Info).init(); |     env_logger::builder().filter_level(log::LevelFilter::Info).init(); | ||||||
|     let _ = prepare_test_db().await; |     let db = prepare_test_db().await; | ||||||
| 
 | 
 | ||||||
|     let (tokio_duplex, addr) = run_service_for_stream_server().await; |     let brain_channel = run_service_for_stream().await; | ||||||
| 
 |     let daemon_key = mock_vm_daemon(brain_channel.clone()).await; | ||||||
|     let channel = connect_stream_client_channel(tokio_duplex, addr).await; |     let mut cli_client = BrainVmCliClient::new(brain_channel); | ||||||
| 
 |  | ||||||
|     let mut daemon_client = BrainVmDaemonClient::new(channel.clone()); |  | ||||||
| 
 |  | ||||||
|     let daemon_key = Key::new(); |  | ||||||
| 
 |  | ||||||
|     register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; |  | ||||||
| 
 |  | ||||||
|     let mut daemon_join_set = JoinSet::new(); |  | ||||||
| 
 |  | ||||||
|     let (tx, mut brain_msg_rx) = tokio::sync::mpsc::channel(1); |  | ||||||
| 
 |  | ||||||
|     // listen to brain
 |  | ||||||
|     let mut daemon_client_01 = daemon_client.clone(); |  | ||||||
|     let daemon_key_01 = daemon_key.clone(); |  | ||||||
|     daemon_join_set.spawn(async move { |  | ||||||
|         let mut grpc_stream = daemon_client_01 |  | ||||||
|             .brain_messages(daemon_key_01.sign_stream_auth(vec![]).unwrap()) |  | ||||||
|             .await |  | ||||||
|             .unwrap() |  | ||||||
|             .into_inner(); |  | ||||||
| 
 |  | ||||||
|         while let Some(Ok(stream_update)) = grpc_stream.next().await { |  | ||||||
|             log::info!("vm deamon got notified: {:?}", &stream_update); |  | ||||||
|             let _ = tx.send(stream_update).await; |  | ||||||
|         } |  | ||||||
|     }); |  | ||||||
| 
 |  | ||||||
|     // send to brain
 |  | ||||||
|     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); |  | ||||||
|     let daemon_msg_tx_01 = daemon_msg_tx.clone(); |  | ||||||
|     let daemon_key_02 = daemon_key.clone(); |  | ||||||
|     daemon_join_set.spawn(async move { |  | ||||||
|         let rx_stream = ReceiverStream::new(rx); |  | ||||||
|         daemon_msg_tx_01 |  | ||||||
|             .send(vm_proto::VmDaemonMessage { |  | ||||||
|                 msg: Some(vm_proto::vm_daemon_message::Msg::Auth( |  | ||||||
|                     daemon_key_02.sign_stream_auth(vec![]).unwrap(), |  | ||||||
|                 )), |  | ||||||
|             }) |  | ||||||
|             .await |  | ||||||
|             .unwrap(); |  | ||||||
|         daemon_client.daemon_messages(rx_stream).await.unwrap(); |  | ||||||
|     }); |  | ||||||
| 
 |  | ||||||
|     // daemon engine
 |  | ||||||
|     daemon_join_set.spawn(async move { |  | ||||||
|         while let Some(brain_msg) = brain_msg_rx.recv().await { |  | ||||||
|             match brain_msg.msg { |  | ||||||
|                 Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { |  | ||||||
|                     let args = Some(vm_proto::MeasurementArgs { |  | ||||||
|                         dtrfs_api_endpoint: String::from("184.107.169.199:48865"), |  | ||||||
|                         exposed_ports: new_vm_req.extra_ports, |  | ||||||
|                         ovmf_hash: String::from( |  | ||||||
|                             "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", |  | ||||||
|                         ), |  | ||||||
|                         ips: vec![], |  | ||||||
|                     }); |  | ||||||
| 
 |  | ||||||
|                     let new_vm_resp = vm_proto::NewVmResp { |  | ||||||
|                         uuid: new_vm_req.uuid.clone(), |  | ||||||
|                         args, |  | ||||||
|                         error: String::new(), |  | ||||||
|                     }; |  | ||||||
| 
 |  | ||||||
|                     let res_data = vm_proto::VmDaemonMessage { |  | ||||||
|                         msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), |  | ||||||
|                     }; |  | ||||||
|                     daemon_msg_tx.send(res_data).await.unwrap(); |  | ||||||
|                 } |  | ||||||
|                 Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { |  | ||||||
|                     todo!() |  | ||||||
|                 } |  | ||||||
|                 Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { |  | ||||||
|                     todo!() |  | ||||||
|                 } |  | ||||||
|                 None => todo!(), |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     }); |  | ||||||
| 
 |  | ||||||
|     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; |  | ||||||
| 
 |  | ||||||
|     let mut cli_client = BrainVmCliClient::new(channel); |  | ||||||
| 
 | 
 | ||||||
|     let cli_key = Key::new(); |     let cli_key = Key::new(); | ||||||
| 
 | 
 | ||||||
|     let req = vm_proto::NewVmReq { |     let req = vm_proto::NewVmReq { | ||||||
|         admin_pubkey: cli_key.pubkey.clone(), |         admin_pubkey: cli_key.pubkey.clone(), | ||||||
|         node_pubkey: daemon_key.pubkey.clone(), |         node_pubkey: daemon_key, | ||||||
|         price_per_unit: 1200, |         price_per_unit: 1200, | ||||||
|         extra_ports: vec![8080, 8081], |         extra_ports: vec![8080, 8081], | ||||||
|         locked_nano: 0, |         locked_nano: 0, | ||||||
| @ -181,8 +51,7 @@ async fn test_brain_message() { | |||||||
|     assert!(new_vm_resp.uuid.len() == 40); |     assert!(new_vm_resp.uuid.len() == 40); | ||||||
| 
 | 
 | ||||||
|     let id = ("measurement_args", new_vm_resp.uuid); |     let id = ("measurement_args", new_vm_resp.uuid); | ||||||
|     let data_in_db: detee_shared::vm_proto::MeasurementArgs = |     let data_in_db: detee_shared::vm_proto::MeasurementArgs = db.select(id).await.unwrap().unwrap(); | ||||||
|         surreal_brain::db::DB.select(id).await.unwrap().unwrap(); |  | ||||||
| 
 | 
 | ||||||
|     assert_eq!(data_in_db, new_vm_resp.args.unwrap()); |     assert_eq!(data_in_db, new_vm_resp.args.unwrap()); | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user