diff --git a/tests/grpc_vm_daemon_test.rs b/tests/grpc_vm_daemon_test.rs index 554b988..5770de1 100644 --- a/tests/grpc_vm_daemon_test.rs +++ b/tests/grpc_vm_daemon_test.rs @@ -10,8 +10,9 @@ use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; use detee_shared::vm_proto::RegisterVmNodeReq; use futures::StreamExt; -use tokio::task::JoinSet; +use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use tonic::transport::Channel; mod common; @@ -32,7 +33,7 @@ async fn test_reg_vm_node() { } async fn register_vm_node( - mut client: BrainVmDaemonClient, + mut client: BrainVmDaemonClient, key: Key, operator_wallet: String, ) -> Vec { @@ -67,13 +68,6 @@ async fn register_vm_node( #[tokio::test] async fn test_brain_message() { - // spawn grpc stream server - // mock a daemon - // mock a cli client to interact with brain - - // validate if something happening in "surreal_brain::db::NewVmReq", "surreal_brain::db::UpdateVmReq", "surreal_brain::db::DeletedVm" these table - // mock daemon will responde to brain - env_logger::builder().filter_level(log::LevelFilter::Info).init(); let _ = prepare_test_db().await; @@ -81,84 +75,25 @@ async fn test_brain_message() { let channel = connect_stream_client_channel(tokio_duplex, addr).await; - let mut daemon_client = BrainVmDaemonClient::new(channel.clone()); + let daemon_client = BrainVmDaemonClient::new(channel.clone()); let daemon_key = Key::new(); register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; - let mut daemon_join_set = JoinSet::new(); + let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); - let (tx, mut brain_msg_rx) = tokio::sync::mpsc::channel(1); + tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); - // listen to brain - let mut daemon_client_01 = daemon_client.clone(); - let daemon_key_01 = daemon_key.clone(); - daemon_join_set.spawn(async move { - let mut grpc_stream = daemon_client_01 - .brain_messages(daemon_key_01.sign_stream_auth(vec![]).unwrap()) - .await - .unwrap() - .into_inner(); - - while let Some(Ok(stream_update)) = grpc_stream.next().await { - log::info!("vm deamon got notified: {:?}", &stream_update); - let _ = tx.send(stream_update).await; - } - }); - - // send to brain let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); - let daemon_msg_tx_01 = daemon_msg_tx.clone(); - let daemon_key_02 = daemon_key.clone(); - daemon_join_set.spawn(async move { - let rx_stream = ReceiverStream::new(rx); - daemon_msg_tx_01 - .send(vm_proto::VmDaemonMessage { - msg: Some(vm_proto::vm_daemon_message::Msg::Auth( - daemon_key_02.sign_stream_auth(vec![]).unwrap(), - )), - }) - .await - .unwrap(); - daemon_client.daemon_messages(rx_stream).await.unwrap(); - }); + tokio::spawn(daemon_msg_sender( + daemon_client.clone(), + daemon_key.clone(), + daemon_msg_tx.clone(), + rx, + )); - // daemon engine - daemon_join_set.spawn(async move { - while let Some(brain_msg) = brain_msg_rx.recv().await { - match brain_msg.msg { - Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { - let args = Some(vm_proto::MeasurementArgs { - dtrfs_api_endpoint: String::from("184.107.169.199:48865"), - exposed_ports: new_vm_req.extra_ports, - ovmf_hash: String::from( - "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", - ), - ips: vec![], - }); - - let new_vm_resp = vm_proto::NewVmResp { - uuid: new_vm_req.uuid.clone(), - args, - error: String::new(), - }; - - let res_data = vm_proto::VmDaemonMessage { - msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), - }; - daemon_msg_tx.send(res_data).await.unwrap(); - } - Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { - todo!() - } - Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { - todo!() - } - None => todo!(), - } - } - }); + tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx)); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; @@ -186,3 +121,69 @@ async fn test_brain_message() { assert_eq!(data_in_db, new_vm_resp.args.unwrap()); } + +async fn daemon_listener( + mut client: BrainVmDaemonClient, + key: Key, + tx: mpsc::Sender, +) { + let mut grpc_stream = + client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); + + while let Some(Ok(stream_update)) = grpc_stream.next().await { + log::info!("vm deamon got notified: {:?}", &stream_update); + let _ = tx.send(stream_update).await; + } +} +async fn daemon_msg_sender( + mut client: BrainVmDaemonClient, + key: Key, + tx: mpsc::Sender, + rx: mpsc::Receiver, +) { + let rx_stream = ReceiverStream::new(rx); + tx.send(vm_proto::VmDaemonMessage { + msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), + }) + .await + .unwrap(); + client.daemon_messages(rx_stream).await.unwrap(); +} + +async fn daemon_engine( + tx: mpsc::Sender, + mut rx: mpsc::Receiver, +) { + while let Some(brain_msg) = rx.recv().await { + match brain_msg.msg { + Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { + let args = Some(vm_proto::MeasurementArgs { + dtrfs_api_endpoint: String::from("184.107.169.199:48865"), + exposed_ports: new_vm_req.extra_ports, + ovmf_hash: String::from( + "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", + ), + ips: vec![], + }); + + let new_vm_resp = vm_proto::NewVmResp { + uuid: new_vm_req.uuid.clone(), + args, + error: String::new(), + }; + + let res_data = vm_proto::VmDaemonMessage { + msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), + }; + tx.send(res_data).await.unwrap(); + } + Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { + todo!() + } + Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { + todo!() + } + None => todo!(), + } + } +}