diff --git a/.env b/.env index 8984b2f..7341fdb 100644 --- a/.env +++ b/.env @@ -5,3 +5,4 @@ DB_NAMESPACE = "brain" DB_NAME = "migration" CERT_PATH = "./tmp/brain-crt.pem" CERT_KEY_PATH = "./tmp/brain-key.pem" +# ADMIN_PUB_KEYS = "admin_key01, admin_key02, admin_key03" diff --git a/Cargo.lock b/Cargo.lock index 02c1f24..2d57616 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1972,6 +1972,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.15" @@ -3788,6 +3797,7 @@ dependencies = [ "env_logger", "futures", "hyper-util", + "itertools 0.14.0", "log", "nanoid", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index a1c47c5..40b329b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,5 +35,6 @@ anyhow = "1.0.98" bs58 = "0.5.1" ed25519-dalek = { version = "2.1.1", features = ["rand_core"] } hyper-util = "0.1.11" +itertools = "0.14.0" rand = "0.8" tower = "0.5.2" diff --git a/src/constants.rs b/src/constants.rs index c1a3ca1..bc95da8 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,3 +1,5 @@ +use std::sync::LazyLock; + pub const BRAIN_GRPC_ADDR: &str = "0.0.0.0:31337"; pub const CERT_PATH: &str = "/etc/detee/brain/brain-crt.pem"; pub const CERT_KEY_PATH: &str = "/etc/detee/brain/brain-key.pem"; @@ -5,11 +7,18 @@ pub const CONFIG_PATH: &str = "/etc/detee/brain/config.ini"; pub const DB_SCHEMA_FILE: &str = "interim_tables.surql"; -pub const ADMIN_ACCOUNTS: &[&str] = &[ - "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK", - "FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL", - "H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc", -]; +pub static ADMIN_ACCOUNTS: LazyLock> = LazyLock::new(|| { + let default_admin_keys = vec![ + "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK".to_string(), + "FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL".to_string(), + "H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc".to_string(), + ]; + + std::env::var("ADMIN_PUB_KEYS") + .ok() + .map(|keys| keys.split(',').map(|key| key.trim().to_string()).collect::>()) + .unwrap_or(default_admin_keys) +}); pub const OLD_BRAIN_DATA_PATH: &str = "./saved_data.yaml"; diff --git a/src/db/general.rs b/src/db/general.rs index 940de21..ae01c57 100644 --- a/src/db/general.rs +++ b/src/db/general.rs @@ -30,6 +30,18 @@ impl Account { Ok(account) } + pub async fn get_or_create(db: &Surreal, address: &str) -> Result { + let id = (ACCOUNT, address); + + match db.select(id).await? { + Some(account) => Ok(account), + None => { + let account: Option = db.create(id).await?; + account.ok_or(Error::FailedToCreateDBEntry) + } + } + } + pub async fn airdrop(db: &Surreal, account: &str, tokens: u64) -> Result<(), Error> { let tokens = tokens.saturating_mul(1_000_000_000); let _ = db diff --git a/src/db/mod.rs b/src/db/mod.rs index 24d08ee..6b3f6c4 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -22,6 +22,8 @@ pub enum Error { StdIo(#[from] std::io::Error), #[error(transparent)] TimeOut(#[from] tokio::time::error::Elapsed), + #[error("Failed to create account")] + FailedToCreateDBEntry, } pub mod prelude { @@ -29,7 +31,6 @@ pub mod prelude { pub use super::general::*; pub use super::vm::*; pub use super::*; - pub use detee_shared::snp::pb::vm_proto::{MeasurementArgs, VmNodeFilters}; } pub async fn db_connection( diff --git a/src/db/vm.rs b/src/db/vm.rs index 6b87bfc..5641dea 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -5,8 +5,9 @@ use super::Error; use crate::constants::{ ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT, }; -use crate::db::{MeasurementArgs, Report, VmNodeFilters}; +use crate::db::{Account, Report}; use crate::old_brain; +use detee_shared::vm_proto; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; use surrealdb::sql::Datetime; @@ -52,6 +53,7 @@ impl VmNodeResources { impl VmNode { pub async fn register(self, db: &Surreal) -> Result<(), Error> { + Account::get_or_create(db, &self.operator.key().to_string()).await?; let _: Option = db.upsert(self.id.clone()).content(self).await?; Ok(()) } @@ -82,7 +84,7 @@ impl VmNodeWithReports { // https://en.wikipedia.org/wiki/Dependency_inversion_principle pub async fn find_by_filters( db: &Surreal, - filters: VmNodeFilters, + filters: vm_proto::VmNodeFilters, ) -> Result, Error> { let mut query = format!( "select *, <-report.* as reports from {VM_NODE} where @@ -196,7 +198,7 @@ impl NewVmReq { /// first string is the vm_id pub enum WrappedMeasurement { - Args(String, MeasurementArgs), + Args(String, vm_proto::MeasurementArgs), Error(String, String), } @@ -223,9 +225,10 @@ impl WrappedMeasurement { )) .await?; let mut error_stream = resp.stream::>(0)?; - let mut args_stream = resp.stream::>(1)?; + let mut args_stream = resp.stream::>(1)?; - let args: Option = db.delete(("measurement_args", vm_id)).await?; + let args: Option = + db.delete(("measurement_args", vm_id)).await?; if let Some(args) = args { return Ok(Self::Args(vm_id.to_string(), args)); } @@ -254,7 +257,7 @@ impl WrappedMeasurement { match args_notif { Ok(args_notif) => { if args_notif.action == surrealdb::Action::Create { - let _: Option = db.delete(("measurement_args", vm_id)).await?; + let _: Option = db.delete(("measurement_args", vm_id)).await?; return Ok(Self::Args(vm_id.to_string(), args_notif.data)); }; }, @@ -317,7 +320,7 @@ impl ActiveVm { pub async fn activate( db: &Surreal, id: &str, - args: MeasurementArgs, + args: vm_proto::MeasurementArgs, ) -> Result<(), Error> { let new_vm_req = match NewVmReq::get(db, id).await? { Some(r) => r, diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs index 7833c5b..07bb334 100644 --- a/src/grpc/mod.rs +++ b/src/grpc/mod.rs @@ -166,7 +166,8 @@ pub fn check_admin_key(req: &Request) -> Result<(), Status> { }; let pubkey = pubkey .to_str() - .map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?; + .map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))? + .to_owned(); if !ADMIN_ACCOUNTS.contains(&pubkey) { return Err(Status::unauthenticated("This operation is reserved to admin accounts")); diff --git a/tests/common/prepare_test_env.rs b/tests/common/prepare_test_env.rs index 85f633e..7a1c825 100644 --- a/tests/common/prepare_test_env.rs +++ b/tests/common/prepare_test_env.rs @@ -1,3 +1,4 @@ +use anyhow::{anyhow, Result}; use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; @@ -15,9 +16,9 @@ use tokio::sync::OnceCell; use tonic::transport::{Channel, Endpoint, Server, Uri}; use tower::service_fn; -pub static DB_STATE: OnceCell<()> = OnceCell::const_new(); +pub static DB_STATE: OnceCell> = OnceCell::const_new(); -pub async fn prepare_test_db() -> Surreal { +pub async fn prepare_test_db() -> Result> { dotenv().ok(); let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); @@ -26,24 +27,25 @@ pub async fn prepare_test_db() -> Surreal { let db_ns = "test_brain"; let db_name = "test_migration_db"; - let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) - .await - .unwrap(); + let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?; DB_STATE .get_or_init(|| async { - let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); - db.query(format!("REMOVE DATABASE {db_name}")).await.unwrap(); - db.query(std::fs::read_to_string("interim_tables.surql").unwrap()).await.unwrap(); - surreal_brain::db::migration0(&db, &old_brain_data).await.unwrap(); + let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk() + .map_err(|e| anyhow!(e.to_string()))?; + + db.query(format!("REMOVE DATABASE {db_name}")).await?; + db.query(std::fs::read_to_string("interim_tables.surql")?).await?; + surreal_brain::db::migration0(&db, &old_brain_data).await?; + Ok::<(), anyhow::Error>(()) }) .await; - db + Ok(db) } -pub async fn run_service_in_background() -> SocketAddr { +pub async fn run_service_in_background() -> Result { dotenv().ok(); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); @@ -52,9 +54,8 @@ pub async fn run_service_in_background() -> SocketAddr { let db_name = "test_migration_db"; tokio::spawn(async move { - let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) - .await - .unwrap(); + let db = + surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?; let db_arc = Arc::new(db); Server::builder() @@ -62,13 +63,14 @@ pub async fn run_service_in_background() -> SocketAddr { .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) - .await - .unwrap(); + .await?; + + Ok::<(), anyhow::Error>(()) }); tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; - addr + Ok(addr) } pub async fn run_service_for_stream_server() -> DuplexStream { @@ -82,9 +84,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream { let db_name = "test_migration_db"; tokio::spawn(async move { - let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) - .await - .unwrap(); + let db = + surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?; let db_arc = Arc::new(db); tonic::transport::Server::builder() @@ -92,24 +93,26 @@ pub async fn run_service_for_stream_server() -> DuplexStream { .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) - .await + .await?; + + Ok::<(), anyhow::Error>(()) }); client } -pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Channel { +pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Result { let mut client = Some(c_stream); - Endpoint::from_static("http://127.0.0.1:0") + Ok(Endpoint::from_static("http://127.0.0.1:0") .connect_with_connector(service_fn(move |_: Uri| { let client = client.take().unwrap(); async move { Ok::, std::io::Error>(TokioIo::new(client)) } })) - .await - .unwrap() + .await?) } -pub async fn run_service_for_stream() -> Channel { +pub async fn run_service_for_stream() -> Result { let client = run_service_for_stream_server().await; + connect_stream_client_channel(client).await } diff --git a/tests/common/vm_cli_utils.rs b/tests/common/vm_cli_utils.rs index 55b0853..2d7c2c3 100644 --- a/tests/common/vm_cli_utils.rs +++ b/tests/common/vm_cli_utils.rs @@ -1,4 +1,8 @@ use super::test_utils::Key; +use anyhow::{anyhow, Result}; +use detee_shared::common_proto::Empty; +use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; +use detee_shared::general_proto::{AirdropReq, ReportNodeReq}; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; @@ -7,15 +11,27 @@ use surrealdb::engine::remote::ws::Client; use surrealdb::Surreal; use tonic::transport::Channel; +async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> { + let mut client = BrainGeneralCliClient::new(brain_channel.clone()); + let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount }; + + let admin_key = Key::new(); + std::env::set_var("ADMIN_PUB_KEYS", &admin_key.pubkey); + + client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?; + + Ok(()) +} + pub async fn create_new_vm( db: &Surreal, - key: Key, - node_pubkey: String, - brain_channel: Channel, -) -> String { + key: &Key, + node_pubkey: &str, + brain_channel: &Channel, +) -> Result { let new_vm_req = vm_proto::NewVmReq { admin_pubkey: key.pubkey.clone(), - node_pubkey, + node_pubkey: node_pubkey.to_string(), price_per_unit: 1200, extra_ports: vec![8080, 8081], locked_nano: 0, @@ -23,8 +39,7 @@ pub async fn create_new_vm( }; let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); - let new_vm_resp = - client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.unwrap().into_inner(); + let new_vm_resp = client_vm_cli.new_vm(key.sign_request(new_vm_req)?).await?.into_inner(); assert!(new_vm_resp.error.is_empty()); assert!(new_vm_resp.uuid.len() == 40); @@ -32,16 +47,34 @@ pub async fn create_new_vm( // wait for update db tokio::time::sleep(tokio::time::Duration::from_millis(700)).await; - let vm_req_db: Option = - db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap(); + let vm_req_db: Option = db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await?; if let Some(new_vm_req) = vm_req_db { panic!("New VM request found in DB: {:?}", new_vm_req); } let active_vm_op: Option = - db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap(); - let active_vm = active_vm_op.unwrap(); + db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await?; + let active_vm = active_vm_op.ok_or(anyhow!("Not found active vm in db"))?; - active_vm.id.key().to_string() + Ok(active_vm.id.key().to_string()) +} + +pub async fn report_node( + key: &Key, + brain_channel: &Channel, + node_pubkey: &str, + contract: &str, + reason: &str, +) -> Result> { + let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone()); + + let report_req = ReportNodeReq { + admin_pubkey: key.pubkey.clone(), + node_pubkey: node_pubkey.to_string(), + contract: contract.to_string(), + reason: reason.to_string(), + }; + + Ok(client_gen_cli.report_node(key.sign_request(report_req)?).await?) } diff --git a/tests/common/vm_daemon_utils.rs b/tests/common/vm_daemon_utils.rs index 3264a34..58a9446 100644 --- a/tests/common/vm_daemon_utils.rs +++ b/tests/common/vm_daemon_utils.rs @@ -1,4 +1,5 @@ use super::test_utils::Key; +use anyhow::Result; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; use detee_shared::vm_proto::RegisterVmNodeReq; @@ -7,11 +8,11 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; -pub async fn mock_vm_daemon(brain_channel: Channel) -> String { - let daemon_client = BrainVmDaemonClient::new(brain_channel); +pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result { + let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); let daemon_key = Key::new(); - register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; + register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?; let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); @@ -29,20 +30,20 @@ pub async fn mock_vm_daemon(brain_channel: Channel) -> String { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - daemon_key.pubkey + Ok(daemon_key.pubkey) } pub async fn register_vm_node( - mut client: BrainVmDaemonClient, - key: Key, - operator_wallet: String, -) -> Vec { + client: &mut BrainVmDaemonClient, + key: &Key, + operator_wallet: &str, +) -> Result> { log::info!("Registering vm_node: {}", key.pubkey); let node_pubkey = key.pubkey.clone(); let req = RegisterVmNodeReq { node_pubkey, - operator_wallet, + operator_wallet: operator_wallet.to_string(), main_ip: String::from("185.243.218.213"), city: String::from("Oslo"), country: String::from("Norway"), @@ -50,8 +51,7 @@ pub async fn register_vm_node( price: 1200, }; - let mut grpc_stream = - client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner(); + let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner(); let mut vm_contracts = Vec::new(); while let Some(stream_update) = grpc_stream.next().await { @@ -64,22 +64,23 @@ pub async fn register_vm_node( } } } - vm_contracts + Ok(vm_contracts) } pub async fn daemon_listener( mut client: BrainVmDaemonClient, key: Key, tx: mpsc::Sender, -) { +) -> Result<()> { log::info!("listening vm_daemon"); - let mut grpc_stream = - client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); + let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner(); while let Some(Ok(stream_update)) = grpc_stream.next().await { log::info!("vm deamon got notified: {:?}", &stream_update); let _ = tx.send(stream_update).await; } + + Ok(()) } pub async fn daemon_msg_sender( @@ -87,28 +88,29 @@ pub async fn daemon_msg_sender( key: Key, tx: mpsc::Sender, rx: mpsc::Receiver, -) { +) -> Result<()> { log::info!("sender vm_daemon"); let rx_stream = ReceiverStream::new(rx); tx.send(vm_proto::VmDaemonMessage { - msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), + msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)), }) - .await - .unwrap(); - client.daemon_messages(rx_stream).await.unwrap(); + .await?; + client.daemon_messages(rx_stream).await?; + Ok(()) } pub async fn daemon_engine( tx: mpsc::Sender, mut rx: mpsc::Receiver, -) { +) -> Result<()> { log::info!("daemon engine vm_daemon"); while let Some(brain_msg) = rx.recv().await { match brain_msg.msg { Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { + let exposed_ports = [vec![22], new_vm_req.extra_ports].concat(); let args = Some(vm_proto::MeasurementArgs { dtrfs_api_endpoint: String::from("184.107.169.199:48865"), - exposed_ports: new_vm_req.extra_ports, + exposed_ports, ovmf_hash: String::from( "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", ), @@ -124,7 +126,7 @@ pub async fn daemon_engine( let res_data = vm_proto::VmDaemonMessage { msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), }; - tx.send(res_data).await.unwrap(); + tx.send(res_data).await?; } Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { todo!() @@ -135,4 +137,6 @@ pub async fn daemon_engine( None => todo!(), } } + + Ok(()) } diff --git a/tests/grpc_general_test.rs b/tests/grpc_general_test.rs new file mode 100644 index 0000000..a5b9537 --- /dev/null +++ b/tests/grpc_general_test.rs @@ -0,0 +1,213 @@ +use common::prepare_test_env::{ + prepare_test_db, run_service_for_stream, run_service_in_background, +}; +use common::test_utils::Key; +use common::vm_cli_utils::{create_new_vm, report_node}; +use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; +use detee_shared::common_proto::{Empty, Pubkey}; +use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; +use detee_shared::general_proto::AirdropReq; +use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; +use futures::StreamExt; +use itertools::Itertools; +use std::vec; +use surreal_brain::constants::VM_NODE; +use surreal_brain::db::vm::VmNodeWithReports; + +mod common; + +#[tokio::test] +async fn test_general_balance() { + // env_logger::builder().filter_level(log::LevelFilter::Trace).init(); + prepare_test_db().await.unwrap(); + + let addr = run_service_in_background().await.unwrap(); + let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); + + let key = Key::new(); + let pubkey = key.pubkey.clone(); + let req_data = Pubkey { pubkey }; + + let req = key.sign_request(req_data).unwrap(); + + let acc_bal = client.get_balance(req).await.unwrap().into_inner(); + + assert_eq!(acc_bal.balance, 0); + assert_eq!(acc_bal.tmp_locked, 0); +} + +#[tokio::test] +async fn test_general_airdrop() { + // env_logger::builder().filter_level(log::LevelFilter::Trace).init(); + prepare_test_db().await.unwrap(); + + const AIRDROP_MULTIPLE: u64 = 1_000_000_000; + let airdrop_amount = 10; + + let addr = run_service_in_background().await.unwrap(); + let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); + + let admin_keys = vec![Key::new(), Key::new(), Key::new()]; + let admin_pub_keys = admin_keys.iter().map(|k| k.pubkey.clone()).join(", "); + std::env::set_var("ADMIN_PUB_KEYS", admin_pub_keys); + + let user_01_key = Key::new(); + let user_01_pubkey = user_01_key.pubkey.clone(); + + let airdrop_req = AirdropReq { pubkey: user_01_pubkey.clone(), tokens: airdrop_amount }; + + // user airdroping himself + let err = + client.airdrop(user_01_key.sign_request(airdrop_req.clone()).unwrap()).await.err().unwrap(); + assert_eq!(err.message(), "This operation is reserved to admin accounts"); + + // other user airdroping + let err = + client.airdrop(Key::new().sign_request(airdrop_req.clone()).unwrap()).await.err().unwrap(); + assert_eq!(err.message(), "This operation is reserved to admin accounts"); + + let _ = client.airdrop(admin_keys[0].sign_request(airdrop_req.clone()).unwrap()).await.unwrap(); + + let bal_req_data = Pubkey { pubkey: user_01_pubkey }; + let bal_req = user_01_key.sign_request(bal_req_data.clone()).unwrap(); + let acc_bal_user_01 = client.get_balance(bal_req).await.unwrap().into_inner(); + + assert_eq!(acc_bal_user_01.balance, airdrop_amount * AIRDROP_MULTIPLE); + assert_eq!(acc_bal_user_01.tmp_locked, 0); + + // second airdrop from same admin + let _ = client.airdrop(admin_keys[0].sign_request(airdrop_req.clone()).unwrap()).await.unwrap(); + + let acc_bal_user_01 = client + .get_balance(user_01_key.sign_request(bal_req_data.clone()).unwrap()) + .await + .unwrap() + .into_inner(); + + assert_eq!(acc_bal_user_01.balance, 2 * airdrop_amount * AIRDROP_MULTIPLE); + + // third airdrop from another admin + let _ = client.airdrop(admin_keys[1].sign_request(airdrop_req.clone()).unwrap()).await.unwrap(); + + let acc_bal_user_01 = client + .get_balance(user_01_key.sign_request(bal_req_data).unwrap()) + .await + .unwrap() + .into_inner(); + + assert_eq!(acc_bal_user_01.balance, 3 * airdrop_amount * AIRDROP_MULTIPLE); + + // self airdrop + let airdrop_req = AirdropReq { pubkey: admin_keys[2].pubkey.clone(), tokens: airdrop_amount }; + + let _ = client.airdrop(admin_keys[2].sign_request(airdrop_req.clone()).unwrap()).await.unwrap(); + + let bal_req_data = Pubkey { pubkey: admin_keys[2].pubkey.clone() }; + let acc_bal_admin_3 = client + .get_balance(admin_keys[2].sign_request(bal_req_data.clone()).unwrap()) + .await + .unwrap() + .into_inner(); + + assert_eq!(acc_bal_admin_3.balance, airdrop_amount * AIRDROP_MULTIPLE); +} + +#[tokio::test] +async fn test_report_node() { + let db = prepare_test_db().await.unwrap(); + + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); + + let key = Key::new(); + + let report_error = + report_node(&key, &brain_channel, &daemon_key, "uuid", "reason").await.err().unwrap(); + + log::info!("Report error: {:?}", report_error); + assert!(report_error.to_string().contains("No contract found by this ID.")); + + let active_vm_id = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap(); + + let reason = String::from("something went wrong on vm"); + + let _ = report_node(&key, &brain_channel, &daemon_key, &active_vm_id, &reason) + .await + .unwrap() + .into_inner(); + + let vm_nodes: Vec = db + .query(format!( + "SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};" + )) + .await + .unwrap() + .take(0) + .unwrap(); + + let vm_node_with_report = &vm_nodes[0]; + + assert!(vm_node_with_report.reports[0].reason == reason); +} + +#[tokio::test] +// TODO: register some operators before testing this +async fn test_list_operators() { + prepare_test_db().await.unwrap(); + + let channel = run_service_for_stream().await.unwrap(); + + let mut client = BrainGeneralCliClient::new(channel); + + let key = Key::new(); + + let mut grpc_stream = + client.list_operators(key.sign_request(Empty {}).unwrap()).await.unwrap().into_inner(); + + let mut operators = Vec::new(); + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(op) => { + operators.push(op); + } + Err(e) => { + panic!("Received error instead of operators: {e:?}"); + } + } + } + + assert!(!operators.is_empty()) +} + +#[tokio::test] +async fn test_inspect_operator() { + prepare_test_db().await.unwrap(); + + let brain_channel = run_service_for_stream().await.unwrap(); + let mut cli_client = BrainGeneralCliClient::new(brain_channel.clone()); + let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); + let key = Key::new(); + let daemon_key = Key::new(); + let operator_key = Key::new(); + + let err = cli_client + .inspect_operator(key.sign_request(Pubkey { pubkey: operator_key.pubkey.clone() }).unwrap()) + .await + .err() + .unwrap(); + + assert_eq!(err.message(), "The wallet you specified is not an operator"); + + // TODO: test with app node also + register_vm_node(&mut daemon_client, &daemon_key, &operator_key.pubkey).await.unwrap(); + + let inspect_response = cli_client + .inspect_operator(key.sign_request(Pubkey { pubkey: operator_key.pubkey.clone() }).unwrap()) + .await + .unwrap() + .into_inner(); + + assert!(inspect_response.app_nodes.is_empty()); + assert!(!inspect_response.vm_nodes.is_empty()); + assert_eq!(&inspect_response.vm_nodes[0].operator, &operator_key.pubkey); +} diff --git a/tests/grpc_test.rs b/tests/grpc_test.rs deleted file mode 100644 index 33d6ce5..0000000 --- a/tests/grpc_test.rs +++ /dev/null @@ -1,166 +0,0 @@ -use common::prepare_test_env::{ - prepare_test_db, run_service_for_stream, run_service_in_background, -}; -use common::test_utils::Key; -use common::vm_cli_utils::create_new_vm; -use common::vm_daemon_utils::mock_vm_daemon; -use detee_shared::common_proto::{Empty, Pubkey}; -use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; -use detee_shared::general_proto::ReportNodeReq; -use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; -use detee_shared::vm_proto::ListVmContractsReq; -use futures::StreamExt; -use surreal_brain::constants::VM_NODE; -use surreal_brain::db::vm::VmNodeWithReports; - -mod common; - -#[tokio::test] -async fn test_general_balance() { - // env_logger::builder().filter_level(log::LevelFilter::Trace).init(); - let _ = prepare_test_db().await; - - let addr = run_service_in_background().await; - let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); - - let key = Key::new(); - let pubkey = key.pubkey.clone(); - let req_data = Pubkey { pubkey }; - - let req = key.sign_request(req_data).unwrap(); - - let acc_bal = client.get_balance(req).await.unwrap().into_inner(); - - assert_eq!(acc_bal.balance, 0); - assert_eq!(acc_bal.tmp_locked, 0); -} - -#[tokio::test] -async fn test_vm_creation() { - let db = prepare_test_db().await; - - let brain_channel = run_service_for_stream().await; - let daemon_key = mock_vm_daemon(brain_channel.clone()).await; - - let key = Key::new(); - - let _ = create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await; -} - -#[tokio::test] -async fn test_report_node() { - let db = prepare_test_db().await; - - let brain_channel = run_service_for_stream().await; - let daemon_key = mock_vm_daemon(brain_channel.clone()).await; - let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone()); - - let key = Key::new(); - let pubkey = key.pubkey.clone(); - - let report_req = ReportNodeReq { - admin_pubkey: pubkey.clone(), - node_pubkey: daemon_key.clone(), - contract: String::from("uuid"), - reason: String::from("reason"), - }; - - let report_error = - client_gen_cli.report_node(key.sign_request(report_req).unwrap()).await.err().unwrap(); - - println!("Report error: {:?}", report_error); - assert_eq!(report_error.message(), "No contract found by this ID."); - - let active_vm_id = - create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await; - - let reason = String::from("something went wrong on vm"); - let report_req = ReportNodeReq { - admin_pubkey: pubkey, - node_pubkey: daemon_key.clone(), - contract: active_vm_id, - reason: reason.clone(), - }; - - let _ = client_gen_cli - .report_node(key.sign_request(report_req).unwrap()) - .await - .unwrap() - .into_inner(); - - let vm_nodes: Vec = db - .query(format!( - "SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};" - )) - .await - .unwrap() - .take(0) - .unwrap(); - - let vm_node_with_report = vm_nodes.get(0).unwrap(); - - assert!(vm_node_with_report.reports[0].reason == reason); -} - -#[tokio::test] -// TODO: register some operators before testing this -async fn test_list_operators() { - prepare_test_db().await; - - let channel = run_service_for_stream().await; - - let mut client = BrainGeneralCliClient::new(channel); - - let key = Key::new(); - - let mut grpc_stream = - client.list_operators(key.sign_request(Empty {}).unwrap()).await.unwrap().into_inner(); - - let mut operators = Vec::new(); - while let Some(stream_update) = grpc_stream.next().await { - match stream_update { - Ok(op) => { - operators.push(op); - } - Err(e) => { - panic!("Received error instead of operators: {e:?}"); - } - } - } - - assert!(!operators.is_empty()) -} - -#[tokio::test] -// TODO: create vm for this user before testing this -async fn test_list_vm_contracts() { - prepare_test_db().await; - - let channel = run_service_for_stream().await; - let mut client = BrainVmCliClient::new(channel); - - let key = Key::new(); - let pubkey = key.pubkey.clone(); - - let req_data = - ListVmContractsReq { wallet: pubkey, uuid: String::from("uuid"), as_operator: false }; - - let mut grpc_stream = - client.list_vm_contracts(key.sign_request(req_data).unwrap()).await.unwrap().into_inner(); - - let mut vm_contracts = Vec::new(); - while let Some(stream_update) = grpc_stream.next().await { - match stream_update { - Ok(vm_c) => { - vm_contracts.push(vm_c); - } - Err(e) => { - panic!("Received error instead of vm_contracts: {e:?}"); - } - } - } - - assert!(vm_contracts.is_empty()) - - // verify report in db -} diff --git a/tests/grpc_vm_cli_test.rs b/tests/grpc_vm_cli_test.rs new file mode 100644 index 0000000..e1e1df4 --- /dev/null +++ b/tests/grpc_vm_cli_test.rs @@ -0,0 +1,90 @@ +use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; +use common::test_utils::Key; +use common::vm_cli_utils::create_new_vm; +use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; +use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; +use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; +use detee_shared::vm_proto::{ListVmContractsReq, NewVmReq}; +use futures::StreamExt; +use std::vec; + +mod common; + +#[tokio::test] +async fn test_vm_creation() { + let db = prepare_test_db().await.unwrap(); + // env_logger::builder().filter_level(log::LevelFilter::Error).init(); + + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); + + let key = Key::new(); + + let _ = create_new_vm(&db, &key, &daemon_key, &brain_channel).await; +} + +#[tokio::test] +async fn test_vm_creation_timeout() { + prepare_test_db().await.unwrap(); + // env_logger::builder().filter_level(log::LevelFilter::Error).init(); + + let brain_channel = run_service_for_stream().await.unwrap(); + let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); + let daemon_key = Key::new(); + + register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await.unwrap(); + + let key = Key::new(); + + let new_vm_req = NewVmReq { + admin_pubkey: key.pubkey.clone(), + node_pubkey: daemon_key.pubkey, + price_per_unit: 1200, + extra_ports: vec![8080, 8081], + locked_nano: 0, + ..Default::default() + }; + + let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); + let timeout_error = + client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.err().unwrap(); + + assert_eq!( + timeout_error.message(), + "Network timeout. Please try again later or contact the DeTEE devs team.", + ) +} + +#[tokio::test] +// TODO: create vm for this user before testing this +async fn test_list_vm_contracts() { + prepare_test_db().await.unwrap(); + + let channel = run_service_for_stream().await.unwrap(); + let mut client = BrainVmCliClient::new(channel); + + let key = Key::new(); + let pubkey = key.pubkey.clone(); + + let req_data = + ListVmContractsReq { wallet: pubkey, uuid: String::from("uuid"), as_operator: false }; + + let mut grpc_stream = + client.list_vm_contracts(key.sign_request(req_data).unwrap()).await.unwrap().into_inner(); + + let mut vm_contracts = Vec::new(); + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(vm_c) => { + vm_contracts.push(vm_c); + } + Err(e) => { + panic!("Received error instead of vm_contracts: {e:?}"); + } + } + } + + assert!(vm_contracts.is_empty()) + + // verify report in db +} diff --git a/tests/grpc_vm_daemon_test.rs b/tests/grpc_vm_daemon_test.rs index 0d7ab9b..89d7983 100644 --- a/tests/grpc_vm_daemon_test.rs +++ b/tests/grpc_vm_daemon_test.rs @@ -11,27 +11,24 @@ mod common; #[tokio::test] async fn test_reg_vm_node() { - prepare_test_db().await; + prepare_test_db().await.unwrap(); - let addr = run_service_in_background().await; - let client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap(); + let addr = run_service_in_background().await.unwrap(); + let mut client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap(); - let operator_wallet = Key::new().pubkey; - - let key = Key::new(); - - let vm_contracts = register_vm_node(client, key, operator_wallet).await; + let vm_contracts = + register_vm_node(&mut client, &Key::new(), &Key::new().pubkey).await.unwrap(); assert!(vm_contracts.is_empty()) } #[tokio::test] async fn test_brain_message() { - env_logger::builder().filter_level(log::LevelFilter::Info).init(); - let db = prepare_test_db().await; + env_logger::builder().filter_level(log::LevelFilter::Error).init(); + prepare_test_db().await.unwrap(); - let brain_channel = run_service_for_stream().await; - let daemon_key = mock_vm_daemon(brain_channel.clone()).await; + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); let mut cli_client = BrainVmCliClient::new(brain_channel); let cli_key = Key::new(); @@ -49,9 +46,6 @@ async fn test_brain_message() { assert!(new_vm_resp.error.is_empty()); assert!(new_vm_resp.uuid.len() == 40); - - let id = ("measurement_args", new_vm_resp.uuid); - let data_in_db: detee_shared::vm_proto::MeasurementArgs = db.select(id).await.unwrap().unwrap(); - - assert_eq!(data_in_db, new_vm_resp.args.unwrap()); + assert!(new_vm_resp.args.is_some()); + assert!(new_vm_resp.args.unwrap().exposed_ports.len() == 3); }