use super::test_utils::Key; use anyhow::Result; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; use detee_shared::vm_proto::RegisterVmNodeReq; use futures::StreamExt; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result { let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); let daemon_key = Key::new(); register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?; let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(daemon_msg_sender( daemon_client.clone(), daemon_key.clone(), daemon_msg_tx.clone(), rx, )); tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx)); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; Ok(daemon_key.pubkey) } pub async fn register_vm_node( client: &mut BrainVmDaemonClient, key: &Key, operator_wallet: &str, ) -> Result> { log::info!("Registering vm_node: {}", key.pubkey); let node_pubkey = key.pubkey.clone(); let req = RegisterVmNodeReq { node_pubkey, operator_wallet: operator_wallet.to_string(), main_ip: String::from("185.243.218.213"), city: String::from("Oslo"), country: String::from("Norway"), region: String::from("EU"), price: 1200, }; let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner(); let mut vm_contracts = Vec::new(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(vm_c) => { vm_contracts.push(vm_c); } Err(e) => { panic!("Received error instead of vm_contracts: {e:?}"); } } } Ok(vm_contracts) } pub async fn daemon_listener( mut client: BrainVmDaemonClient, key: Key, tx: mpsc::Sender, ) -> Result<()> { log::info!("listening vm_daemon"); let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner(); while let Some(Ok(stream_update)) = grpc_stream.next().await { log::info!("vm deamon got notified: {:?}", &stream_update); let _ = tx.send(stream_update).await; } Ok(()) } pub async fn daemon_msg_sender( mut client: BrainVmDaemonClient, key: Key, tx: mpsc::Sender, rx: mpsc::Receiver, ) -> Result<()> { log::info!("sender vm_daemon"); let rx_stream = ReceiverStream::new(rx); tx.send(vm_proto::VmDaemonMessage { msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)), }) .await?; client.daemon_messages(rx_stream).await?; Ok(()) } pub async fn daemon_engine( tx: mpsc::Sender, mut rx: mpsc::Receiver, ) -> Result<()> { log::info!("daemon engine vm_daemon"); while let Some(brain_msg) = rx.recv().await { match brain_msg.msg { Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { let args = Some(vm_proto::MeasurementArgs { dtrfs_api_endpoint: String::from("184.107.169.199:48865"), exposed_ports: new_vm_req.extra_ports, ovmf_hash: String::from( "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", ), ips: vec![], }); let new_vm_resp = vm_proto::NewVmResp { uuid: new_vm_req.uuid.clone(), args, error: String::new(), }; let res_data = vm_proto::VmDaemonMessage { msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), }; tx.send(res_data).await?; } Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { todo!() } Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { todo!() } None => todo!(), } } Ok(()) }