From ab39b12da951486658bddee9c5a6e3012fb278c7 Mon Sep 17 00:00:00 2001 From: Noor Date: Wed, 30 Apr 2025 15:47:59 +0530 Subject: [PATCH] organize daemon test helpers --- tests/common/mod.rs | 4 + tests/common/prepare_test_env.rs | 1 - tests/common/test_utils.rs | 1 - tests/common/vm_daemon_utils.rs | 139 +++++++++++++++++++++++++++++++ tests/grpc_vm_daemon_test.rs | 139 ++----------------------------- 5 files changed, 148 insertions(+), 136 deletions(-) create mode 100644 tests/common/vm_daemon_utils.rs diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 1a15421..223aa90 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,2 +1,6 @@ +#[allow(dead_code)] pub mod prepare_test_env; +#[allow(dead_code)] pub mod test_utils; +#[allow(dead_code)] +pub mod vm_daemon_utils; diff --git a/tests/common/prepare_test_env.rs b/tests/common/prepare_test_env.rs index 2edd66b..0fd6699 100644 --- a/tests/common/prepare_test_env.rs +++ b/tests/common/prepare_test_env.rs @@ -92,7 +92,6 @@ pub async fn connect_stream_client_channel(c_stream: DuplexStream, addr: SocketA .unwrap() } -#[allow(dead_code)] pub async fn run_service_for_stream() -> Channel { let (client, addr) = run_service_for_stream_server().await; connect_stream_client_channel(client, addr).await diff --git a/tests/common/test_utils.rs b/tests/common/test_utils.rs index e5c9f27..04a5b13 100644 --- a/tests/common/test_utils.rs +++ b/tests/common/test_utils.rs @@ -38,7 +38,6 @@ impl Key { Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) } - #[allow(dead_code)] pub fn sign_stream_auth(&self, contracts: Vec) -> Result { let pubkey = self.pubkey.clone(); let timestamp = chrono::Utc::now().to_rfc3339(); diff --git a/tests/common/vm_daemon_utils.rs b/tests/common/vm_daemon_utils.rs new file mode 100644 index 0000000..313dd44 --- /dev/null +++ b/tests/common/vm_daemon_utils.rs @@ -0,0 +1,139 @@ +use super::prepare_test_env::{connect_stream_client_channel, run_service_for_stream_server}; +use super::test_utils::Key; +use detee_shared::vm_proto; +use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; +use detee_shared::vm_proto::RegisterVmNodeReq; +use futures::StreamExt; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::transport::Channel; + +pub async fn mock_vm_daemon() -> (Channel, String) { + let (tokio_duplex, addr) = run_service_for_stream_server().await; + + let channel = connect_stream_client_channel(tokio_duplex, addr).await; + + let daemon_client = BrainVmDaemonClient::new(channel.clone()); + let daemon_key = Key::new(); + + register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; + + let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); + + tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); + + let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); + tokio::spawn(daemon_msg_sender( + daemon_client.clone(), + daemon_key.clone(), + daemon_msg_tx.clone(), + rx, + )); + + tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx)); + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + (channel, daemon_key.pubkey) +} + +pub async fn register_vm_node( + mut client: BrainVmDaemonClient, + key: Key, + operator_wallet: String, +) -> Vec { + let node_pubkey = key.pubkey.clone(); + + let req = RegisterVmNodeReq { + node_pubkey, + operator_wallet, + main_ip: String::from("185.243.218.213"), + city: String::from("Oslo"), + country: String::from("Norway"), + region: String::from("EU"), + price: 1200, + }; + + let mut grpc_stream = + client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner(); + + let mut vm_contracts = Vec::new(); + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(vm_c) => { + vm_contracts.push(vm_c); + } + Err(e) => { + panic!("Received error instead of vm_contracts: {e:?}"); + } + } + } + vm_contracts +} + +pub async fn daemon_listener( + mut client: BrainVmDaemonClient, + key: Key, + tx: mpsc::Sender, +) { + let mut grpc_stream = + client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); + + while let Some(Ok(stream_update)) = grpc_stream.next().await { + log::info!("vm deamon got notified: {:?}", &stream_update); + let _ = tx.send(stream_update).await; + } +} + +pub async fn daemon_msg_sender( + mut client: BrainVmDaemonClient, + key: Key, + tx: mpsc::Sender, + rx: mpsc::Receiver, +) { + let rx_stream = ReceiverStream::new(rx); + tx.send(vm_proto::VmDaemonMessage { + msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), + }) + .await + .unwrap(); + client.daemon_messages(rx_stream).await.unwrap(); +} + +pub async fn daemon_engine( + tx: mpsc::Sender, + mut rx: mpsc::Receiver, +) { + while let Some(brain_msg) = rx.recv().await { + match brain_msg.msg { + Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { + let args = Some(vm_proto::MeasurementArgs { + dtrfs_api_endpoint: String::from("184.107.169.199:48865"), + exposed_ports: new_vm_req.extra_ports, + ovmf_hash: String::from( + "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", + ), + ips: vec![], + }); + + let new_vm_resp = vm_proto::NewVmResp { + uuid: new_vm_req.uuid.clone(), + args, + error: String::new(), + }; + + let res_data = vm_proto::VmDaemonMessage { + msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), + }; + tx.send(res_data).await.unwrap(); + } + Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { + todo!() + } + Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { + todo!() + } + None => todo!(), + } + } +} diff --git a/tests/grpc_vm_daemon_test.rs b/tests/grpc_vm_daemon_test.rs index 5770de1..b10ca18 100644 --- a/tests/grpc_vm_daemon_test.rs +++ b/tests/grpc_vm_daemon_test.rs @@ -1,18 +1,11 @@ use common::{ - prepare_test_env::{ - connect_stream_client_channel, prepare_test_db, run_service_for_stream_server, - run_service_in_background, - }, + prepare_test_env::{prepare_test_db, run_service_in_background}, test_utils::Key, + vm_daemon_utils::{mock_vm_daemon, register_vm_node}, }; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; -use detee_shared::vm_proto::RegisterVmNodeReq; -use futures::StreamExt; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tonic::transport::Channel; mod common; @@ -32,78 +25,22 @@ async fn test_reg_vm_node() { assert!(vm_contracts.is_empty()) } -async fn register_vm_node( - mut client: BrainVmDaemonClient, - key: Key, - operator_wallet: String, -) -> Vec { - let node_pubkey = key.pubkey.clone(); - - let req = RegisterVmNodeReq { - node_pubkey, - operator_wallet, - main_ip: String::from("185.243.218.213"), - city: String::from("Oslo"), - country: String::from("Norway"), - region: String::from("EU"), - price: 1200, - }; - - let mut grpc_stream = - client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner(); - - let mut vm_contracts = Vec::new(); - while let Some(stream_update) = grpc_stream.next().await { - match stream_update { - Ok(vm_c) => { - vm_contracts.push(vm_c); - } - Err(e) => { - panic!("Received error instead of vm_contracts: {e:?}"); - } - } - } - vm_contracts -} - #[tokio::test] async fn test_brain_message() { env_logger::builder().filter_level(log::LevelFilter::Info).init(); let _ = prepare_test_db().await; - let (tokio_duplex, addr) = run_service_for_stream_server().await; - - let channel = connect_stream_client_channel(tokio_duplex, addr).await; - - let daemon_client = BrainVmDaemonClient::new(channel.clone()); - - let daemon_key = Key::new(); - - register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; - - let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); - - tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); - - let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); - tokio::spawn(daemon_msg_sender( - daemon_client.clone(), - daemon_key.clone(), - daemon_msg_tx.clone(), - rx, - )); - - tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx)); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + let (channel, daemon_key) = mock_vm_daemon().await; + let mut cli_client = BrainVmCliClient::new(channel); let cli_key = Key::new(); let req = vm_proto::NewVmReq { admin_pubkey: cli_key.pubkey.clone(), - node_pubkey: daemon_key.pubkey.clone(), + node_pubkey: daemon_key, price_per_unit: 1200, extra_ports: vec![8080, 8081], locked_nano: 0, @@ -121,69 +58,3 @@ async fn test_brain_message() { assert_eq!(data_in_db, new_vm_resp.args.unwrap()); } - -async fn daemon_listener( - mut client: BrainVmDaemonClient, - key: Key, - tx: mpsc::Sender, -) { - let mut grpc_stream = - client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); - - while let Some(Ok(stream_update)) = grpc_stream.next().await { - log::info!("vm deamon got notified: {:?}", &stream_update); - let _ = tx.send(stream_update).await; - } -} -async fn daemon_msg_sender( - mut client: BrainVmDaemonClient, - key: Key, - tx: mpsc::Sender, - rx: mpsc::Receiver, -) { - let rx_stream = ReceiverStream::new(rx); - tx.send(vm_proto::VmDaemonMessage { - msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), - }) - .await - .unwrap(); - client.daemon_messages(rx_stream).await.unwrap(); -} - -async fn daemon_engine( - tx: mpsc::Sender, - mut rx: mpsc::Receiver, -) { - while let Some(brain_msg) = rx.recv().await { - match brain_msg.msg { - Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { - let args = Some(vm_proto::MeasurementArgs { - dtrfs_api_endpoint: String::from("184.107.169.199:48865"), - exposed_ports: new_vm_req.extra_ports, - ovmf_hash: String::from( - "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", - ), - ips: vec![], - }); - - let new_vm_resp = vm_proto::NewVmResp { - uuid: new_vm_req.uuid.clone(), - args, - error: String::new(), - }; - - let res_data = vm_proto::VmDaemonMessage { - msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), - }; - tx.send(res_data).await.unwrap(); - } - Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { - todo!() - } - Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { - todo!() - } - None => todo!(), - } - } -}