diff --git a/tests/common/prepare_test_env.rs b/tests/common/prepare_test_env.rs index 85f633e..7a1c825 100644 --- a/tests/common/prepare_test_env.rs +++ b/tests/common/prepare_test_env.rs @@ -1,3 +1,4 @@ +use anyhow::{anyhow, Result}; use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; @@ -15,9 +16,9 @@ use tokio::sync::OnceCell; use tonic::transport::{Channel, Endpoint, Server, Uri}; use tower::service_fn; -pub static DB_STATE: OnceCell<()> = OnceCell::const_new(); +pub static DB_STATE: OnceCell> = OnceCell::const_new(); -pub async fn prepare_test_db() -> Surreal { +pub async fn prepare_test_db() -> Result> { dotenv().ok(); let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); @@ -26,24 +27,25 @@ pub async fn prepare_test_db() -> Surreal { let db_ns = "test_brain"; let db_name = "test_migration_db"; - let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) - .await - .unwrap(); + let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?; DB_STATE .get_or_init(|| async { - let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); - db.query(format!("REMOVE DATABASE {db_name}")).await.unwrap(); - db.query(std::fs::read_to_string("interim_tables.surql").unwrap()).await.unwrap(); - surreal_brain::db::migration0(&db, &old_brain_data).await.unwrap(); + let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk() + .map_err(|e| anyhow!(e.to_string()))?; + + db.query(format!("REMOVE DATABASE {db_name}")).await?; + db.query(std::fs::read_to_string("interim_tables.surql")?).await?; + surreal_brain::db::migration0(&db, &old_brain_data).await?; + Ok::<(), anyhow::Error>(()) }) .await; - db + Ok(db) } -pub async fn run_service_in_background() -> SocketAddr { +pub async fn run_service_in_background() -> Result { dotenv().ok(); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); @@ -52,9 +54,8 @@ pub async fn run_service_in_background() -> SocketAddr { let db_name = "test_migration_db"; tokio::spawn(async move { - let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) - .await - .unwrap(); + let db = + surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?; let db_arc = Arc::new(db); Server::builder() @@ -62,13 +63,14 @@ pub async fn run_service_in_background() -> SocketAddr { .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) - .await - .unwrap(); + .await?; + + Ok::<(), anyhow::Error>(()) }); tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; - addr + Ok(addr) } pub async fn run_service_for_stream_server() -> DuplexStream { @@ -82,9 +84,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream { let db_name = "test_migration_db"; tokio::spawn(async move { - let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) - .await - .unwrap(); + let db = + surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?; let db_arc = Arc::new(db); tonic::transport::Server::builder() @@ -92,24 +93,26 @@ pub async fn run_service_for_stream_server() -> DuplexStream { .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) - .await + .await?; + + Ok::<(), anyhow::Error>(()) }); client } -pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Channel { +pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Result { let mut client = Some(c_stream); - Endpoint::from_static("http://127.0.0.1:0") + Ok(Endpoint::from_static("http://127.0.0.1:0") .connect_with_connector(service_fn(move |_: Uri| { let client = client.take().unwrap(); async move { Ok::, std::io::Error>(TokioIo::new(client)) } })) - .await - .unwrap() + .await?) } -pub async fn run_service_for_stream() -> Channel { +pub async fn run_service_for_stream() -> Result { let client = run_service_for_stream_server().await; + connect_stream_client_channel(client).await } diff --git a/tests/common/vm_cli_utils.rs b/tests/common/vm_cli_utils.rs index 55b0853..ad94cb1 100644 --- a/tests/common/vm_cli_utils.rs +++ b/tests/common/vm_cli_utils.rs @@ -1,4 +1,8 @@ use super::test_utils::Key; +use anyhow::{anyhow, Result}; +use detee_shared::common_proto::Empty; +use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; +use detee_shared::general_proto::ReportNodeReq; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; @@ -9,13 +13,13 @@ use tonic::transport::Channel; pub async fn create_new_vm( db: &Surreal, - key: Key, - node_pubkey: String, - brain_channel: Channel, -) -> String { + key: &Key, + node_pubkey: &str, + brain_channel: &Channel, +) -> Result { let new_vm_req = vm_proto::NewVmReq { admin_pubkey: key.pubkey.clone(), - node_pubkey, + node_pubkey: node_pubkey.to_string(), price_per_unit: 1200, extra_ports: vec![8080, 8081], locked_nano: 0, @@ -23,8 +27,7 @@ pub async fn create_new_vm( }; let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); - let new_vm_resp = - client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.unwrap().into_inner(); + let new_vm_resp = client_vm_cli.new_vm(key.sign_request(new_vm_req)?).await?.into_inner(); assert!(new_vm_resp.error.is_empty()); assert!(new_vm_resp.uuid.len() == 40); @@ -32,16 +35,34 @@ pub async fn create_new_vm( // wait for update db tokio::time::sleep(tokio::time::Duration::from_millis(700)).await; - let vm_req_db: Option = - db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap(); + let vm_req_db: Option = db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await?; if let Some(new_vm_req) = vm_req_db { panic!("New VM request found in DB: {:?}", new_vm_req); } let active_vm_op: Option = - db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap(); - let active_vm = active_vm_op.unwrap(); + db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await?; + let active_vm = active_vm_op.ok_or(anyhow!("Not found active vm in db"))?; - active_vm.id.key().to_string() + Ok(active_vm.id.key().to_string()) +} + +pub async fn report_node( + key: &Key, + brain_channel: &Channel, + node_pubkey: &str, + contract: &str, + reason: &str, +) -> Result> { + let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone()); + + let report_req = ReportNodeReq { + admin_pubkey: key.pubkey.clone(), + node_pubkey: node_pubkey.to_string(), + contract: contract.to_string(), + reason: reason.to_string(), + }; + + Ok(client_gen_cli.report_node(key.sign_request(report_req)?).await?) } diff --git a/tests/common/vm_daemon_utils.rs b/tests/common/vm_daemon_utils.rs index 3264a34..818e47d 100644 --- a/tests/common/vm_daemon_utils.rs +++ b/tests/common/vm_daemon_utils.rs @@ -1,4 +1,5 @@ use super::test_utils::Key; +use anyhow::Result; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; use detee_shared::vm_proto::RegisterVmNodeReq; @@ -7,11 +8,11 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; -pub async fn mock_vm_daemon(brain_channel: Channel) -> String { - let daemon_client = BrainVmDaemonClient::new(brain_channel); +pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result { + let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); let daemon_key = Key::new(); - register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; + register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?; let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); @@ -29,20 +30,20 @@ pub async fn mock_vm_daemon(brain_channel: Channel) -> String { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - daemon_key.pubkey + Ok(daemon_key.pubkey) } pub async fn register_vm_node( - mut client: BrainVmDaemonClient, - key: Key, - operator_wallet: String, -) -> Vec { + client: &mut BrainVmDaemonClient, + key: &Key, + operator_wallet: &str, +) -> Result> { log::info!("Registering vm_node: {}", key.pubkey); let node_pubkey = key.pubkey.clone(); let req = RegisterVmNodeReq { node_pubkey, - operator_wallet, + operator_wallet: operator_wallet.to_string(), main_ip: String::from("185.243.218.213"), city: String::from("Oslo"), country: String::from("Norway"), @@ -50,8 +51,7 @@ pub async fn register_vm_node( price: 1200, }; - let mut grpc_stream = - client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner(); + let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner(); let mut vm_contracts = Vec::new(); while let Some(stream_update) = grpc_stream.next().await { @@ -64,22 +64,23 @@ pub async fn register_vm_node( } } } - vm_contracts + Ok(vm_contracts) } pub async fn daemon_listener( mut client: BrainVmDaemonClient, key: Key, tx: mpsc::Sender, -) { +) -> Result<()> { log::info!("listening vm_daemon"); - let mut grpc_stream = - client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); + let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner(); while let Some(Ok(stream_update)) = grpc_stream.next().await { log::info!("vm deamon got notified: {:?}", &stream_update); let _ = tx.send(stream_update).await; } + + Ok(()) } pub async fn daemon_msg_sender( @@ -87,21 +88,21 @@ pub async fn daemon_msg_sender( key: Key, tx: mpsc::Sender, rx: mpsc::Receiver, -) { +) -> Result<()> { log::info!("sender vm_daemon"); let rx_stream = ReceiverStream::new(rx); tx.send(vm_proto::VmDaemonMessage { - msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), + msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)), }) - .await - .unwrap(); - client.daemon_messages(rx_stream).await.unwrap(); + .await?; + client.daemon_messages(rx_stream).await?; + Ok(()) } pub async fn daemon_engine( tx: mpsc::Sender, mut rx: mpsc::Receiver, -) { +) -> Result<()> { log::info!("daemon engine vm_daemon"); while let Some(brain_msg) = rx.recv().await { match brain_msg.msg { @@ -124,7 +125,7 @@ pub async fn daemon_engine( let res_data = vm_proto::VmDaemonMessage { msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), }; - tx.send(res_data).await.unwrap(); + tx.send(res_data).await?; } Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { todo!() @@ -135,4 +136,6 @@ pub async fn daemon_engine( None => todo!(), } } + + Ok(()) } diff --git a/tests/grpc_test.rs b/tests/grpc_test.rs index ed29e7f..4579a47 100644 --- a/tests/grpc_test.rs +++ b/tests/grpc_test.rs @@ -2,11 +2,11 @@ use common::prepare_test_env::{ prepare_test_db, run_service_for_stream, run_service_in_background, }; use common::test_utils::Key; -use common::vm_cli_utils::create_new_vm; +use common::vm_cli_utils::{create_new_vm, report_node}; use common::vm_daemon_utils::mock_vm_daemon; use detee_shared::common_proto::{Empty, Pubkey}; use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; -use detee_shared::general_proto::{AirdropReq, ReportNodeReq}; +use detee_shared::general_proto::AirdropReq; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::ListVmContractsReq; use futures::StreamExt; @@ -20,9 +20,9 @@ mod common; #[tokio::test] async fn test_general_balance() { // env_logger::builder().filter_level(log::LevelFilter::Trace).init(); - let _ = prepare_test_db().await; + prepare_test_db().await.unwrap(); - let addr = run_service_in_background().await; + let addr = run_service_in_background().await.unwrap(); let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); let key = Key::new(); @@ -40,12 +40,12 @@ async fn test_general_balance() { #[tokio::test] async fn test_general_airdrop() { // env_logger::builder().filter_level(log::LevelFilter::Trace).init(); - let _ = prepare_test_db().await; + prepare_test_db().await.unwrap(); const AIRDROP_MULTIPLE: u64 = 1_000_000_000; let airdrop_amount = 10; - let addr = run_service_in_background().await; + let addr = run_service_in_background().await.unwrap(); let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); let admin_keys = vec![Key::new(), Key::new(), Key::new()]; @@ -115,53 +115,36 @@ async fn test_general_airdrop() { #[tokio::test] async fn test_vm_creation() { - let db = prepare_test_db().await; + let db = prepare_test_db().await.unwrap(); - let brain_channel = run_service_for_stream().await; - let daemon_key = mock_vm_daemon(brain_channel.clone()).await; + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); let key = Key::new(); - let _ = create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await; + let _ = create_new_vm(&db, &key, &daemon_key, &brain_channel).await; } #[tokio::test] async fn test_report_node() { - let db = prepare_test_db().await; + let db = prepare_test_db().await.unwrap(); - let brain_channel = run_service_for_stream().await; - let daemon_key = mock_vm_daemon(brain_channel.clone()).await; - let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone()); + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); let key = Key::new(); - let pubkey = key.pubkey.clone(); - - let report_req = ReportNodeReq { - admin_pubkey: pubkey.clone(), - node_pubkey: daemon_key.clone(), - contract: String::from("uuid"), - reason: String::from("reason"), - }; let report_error = - client_gen_cli.report_node(key.sign_request(report_req).unwrap()).await.err().unwrap(); + report_node(&key, &brain_channel, &daemon_key, "uuid", "reason").await.err().unwrap(); - println!("Report error: {:?}", report_error); - assert_eq!(report_error.message(), "No contract found by this ID."); + log::info!("Report error: {:?}", report_error); + assert!(report_error.to_string().contains("No contract found by this ID.")); - let active_vm_id = - create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await; + let active_vm_id = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap(); let reason = String::from("something went wrong on vm"); - let report_req = ReportNodeReq { - admin_pubkey: pubkey, - node_pubkey: daemon_key.clone(), - contract: active_vm_id, - reason: reason.clone(), - }; - let _ = client_gen_cli - .report_node(key.sign_request(report_req).unwrap()) + let _ = report_node(&key, &brain_channel, &daemon_key, &active_vm_id, &reason) .await .unwrap() .into_inner(); @@ -175,7 +158,7 @@ async fn test_report_node() { .take(0) .unwrap(); - let vm_node_with_report = vm_nodes.get(0).unwrap(); + let vm_node_with_report = &vm_nodes[0]; assert!(vm_node_with_report.reports[0].reason == reason); } @@ -183,9 +166,9 @@ async fn test_report_node() { #[tokio::test] // TODO: register some operators before testing this async fn test_list_operators() { - prepare_test_db().await; + prepare_test_db().await.unwrap(); - let channel = run_service_for_stream().await; + let channel = run_service_for_stream().await.unwrap(); let mut client = BrainGeneralCliClient::new(channel); @@ -212,9 +195,9 @@ async fn test_list_operators() { #[tokio::test] // TODO: create vm for this user before testing this async fn test_list_vm_contracts() { - prepare_test_db().await; + prepare_test_db().await.unwrap(); - let channel = run_service_for_stream().await; + let channel = run_service_for_stream().await.unwrap(); let mut client = BrainVmCliClient::new(channel); let key = Key::new(); diff --git a/tests/grpc_vm_daemon_test.rs b/tests/grpc_vm_daemon_test.rs index 0d7ab9b..ef9264a 100644 --- a/tests/grpc_vm_daemon_test.rs +++ b/tests/grpc_vm_daemon_test.rs @@ -11,27 +11,24 @@ mod common; #[tokio::test] async fn test_reg_vm_node() { - prepare_test_db().await; + prepare_test_db().await.unwrap(); - let addr = run_service_in_background().await; - let client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap(); + let addr = run_service_in_background().await.unwrap(); + let mut client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap(); - let operator_wallet = Key::new().pubkey; - - let key = Key::new(); - - let vm_contracts = register_vm_node(client, key, operator_wallet).await; + let vm_contracts = + register_vm_node(&mut client, &Key::new(), &Key::new().pubkey).await.unwrap(); assert!(vm_contracts.is_empty()) } #[tokio::test] async fn test_brain_message() { - env_logger::builder().filter_level(log::LevelFilter::Info).init(); - let db = prepare_test_db().await; + env_logger::builder().filter_level(log::LevelFilter::Error).init(); + let db = prepare_test_db().await.unwrap(); - let brain_channel = run_service_for_stream().await; - let daemon_key = mock_vm_daemon(brain_channel.clone()).await; + let brain_channel = run_service_for_stream().await.unwrap(); + let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap(); let mut cli_client = BrainVmCliClient::new(brain_channel); let cli_key = Key::new();