// SPDX-License-Identifier: Apache-2.0 use super::test_utils::Key; use crate::common::test_utils::{generate_random_public_ip, get_ip_info}; use anyhow::Result; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; use detee_shared::vm_proto::RegisterVmNodeReq; use futures::StreamExt; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; pub async fn mock_vm_daemon( brain_channel: &Channel, daemon_error: Option, ) -> Result { let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); let daemon_key = Key::new(); register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?; let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(daemon_msg_sender( daemon_client.clone(), daemon_key.clone(), daemon_msg_tx.clone(), rx, )); tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error)); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; Ok(daemon_key.pubkey) } pub async fn register_vm_node( client: &mut BrainVmDaemonClient, key: &Key, operator_wallet: &str, ) -> Result> { log::info!("Registering vm_node: {}", key.pubkey); let node_pubkey = key.pubkey.clone(); let ip = generate_random_public_ip().to_string(); let ip_info = get_ip_info(&ip).await?; let req = RegisterVmNodeReq { node_pubkey, operator_wallet: operator_wallet.to_string(), main_ip: ip_info.ip, city: ip_info.city, country: ip_info.country, region: ip_info.region, }; let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner(); let mut deleted_vm_reqs = Vec::new(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(del_vm_req) => { deleted_vm_reqs.push(del_vm_req); } Err(e) => { panic!("Received error instead of deleted_vm_reqs: {e:?}"); } } } Ok(deleted_vm_reqs) } pub async fn daemon_listener( mut client: BrainVmDaemonClient, key: Key, tx: mpsc::Sender, ) -> Result<()> { log::info!("listening vm_daemon"); let mut grpc_stream = client.brain_messages(key.sign_stream_auth_vm(vec![])?).await?.into_inner(); while let Some(Ok(stream_update)) = grpc_stream.next().await { log::info!("vm deamon got notified: {:?}", &stream_update); let _ = tx.send(stream_update).await; } Ok(()) } pub async fn daemon_msg_sender( mut client: BrainVmDaemonClient, key: Key, tx: mpsc::Sender, rx: mpsc::Receiver, ) -> Result<()> { log::info!("sender vm_daemon"); let rx_stream = ReceiverStream::new(rx); tx.send(vm_proto::VmDaemonMessage { msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth_vm(vec![])?)), }) .await?; client.daemon_messages(rx_stream).await?; Ok(()) } pub async fn daemon_engine( tx: mpsc::Sender, mut rx: mpsc::Receiver, new_vm_err: Option, ) -> Result<()> { log::info!("daemon engine vm_daemon"); while let Some(brain_msg) = rx.recv().await { match brain_msg.msg { Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { let exposed_ports = [vec![22], new_vm_req.extra_ports].concat(); let args = Some(vm_proto::MeasurementArgs { dtrfs_api_endpoint: String::from("184.107.169.199:48865"), exposed_ports, ovmf_hash: String::from( "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", ), ips: vec![], }); let new_vm_resp = vm_proto::NewVmResp { vm_id: new_vm_req.vm_id.clone(), args, error: new_vm_err.clone().unwrap_or_default(), }; let res_data = vm_proto::VmDaemonMessage { msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), }; tx.send(res_data).await?; } Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { todo!() } Some(vm_proto::brain_vm_message::Msg::DeleteVm(del_vm_req)) => { println!("MOCK_VM_DAEMON::delete vm request for {}", del_vm_req.vm_id); } None => todo!(), } } Ok(()) }