test operator inspection extensive tests on airdrop refactor db module imports seperated vm tests into its module modularised report node into reusable method fix register vm_node creates operator account in db fix test brain message add ssh port on mock daemon while new vm improved error handling on tests unwraping only on top level method test utils methods accepts refs to remove clone() on top level methods
		
			
				
	
	
		
			143 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			143 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| use super::test_utils::Key;
 | |
| use anyhow::Result;
 | |
| use detee_shared::vm_proto;
 | |
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
 | |
| use detee_shared::vm_proto::RegisterVmNodeReq;
 | |
| use futures::StreamExt;
 | |
| use tokio::sync::mpsc;
 | |
| use tokio_stream::wrappers::ReceiverStream;
 | |
| use tonic::transport::Channel;
 | |
| 
 | |
| pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result<String> {
 | |
|     let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
 | |
|     let daemon_key = Key::new();
 | |
| 
 | |
|     register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?;
 | |
| 
 | |
|     let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
 | |
| 
 | |
|     tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx));
 | |
| 
 | |
|     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1);
 | |
|     tokio::spawn(daemon_msg_sender(
 | |
|         daemon_client.clone(),
 | |
|         daemon_key.clone(),
 | |
|         daemon_msg_tx.clone(),
 | |
|         rx,
 | |
|     ));
 | |
| 
 | |
|     tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx));
 | |
| 
 | |
|     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
 | |
| 
 | |
|     Ok(daemon_key.pubkey)
 | |
| }
 | |
| 
 | |
| pub async fn register_vm_node(
 | |
|     client: &mut BrainVmDaemonClient<Channel>,
 | |
|     key: &Key,
 | |
|     operator_wallet: &str,
 | |
| ) -> Result<Vec<vm_proto::VmContract>> {
 | |
|     log::info!("Registering vm_node: {}", key.pubkey);
 | |
|     let node_pubkey = key.pubkey.clone();
 | |
| 
 | |
|     let req = RegisterVmNodeReq {
 | |
|         node_pubkey,
 | |
|         operator_wallet: operator_wallet.to_string(),
 | |
|         main_ip: String::from("185.243.218.213"),
 | |
|         city: String::from("Oslo"),
 | |
|         country: String::from("Norway"),
 | |
|         region: String::from("EU"),
 | |
|         price: 1200,
 | |
|     };
 | |
| 
 | |
|     let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner();
 | |
| 
 | |
|     let mut vm_contracts = Vec::new();
 | |
|     while let Some(stream_update) = grpc_stream.next().await {
 | |
|         match stream_update {
 | |
|             Ok(vm_c) => {
 | |
|                 vm_contracts.push(vm_c);
 | |
|             }
 | |
|             Err(e) => {
 | |
|                 panic!("Received error instead of vm_contracts: {e:?}");
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     Ok(vm_contracts)
 | |
| }
 | |
| 
 | |
| pub async fn daemon_listener(
 | |
|     mut client: BrainVmDaemonClient<Channel>,
 | |
|     key: Key,
 | |
|     tx: mpsc::Sender<vm_proto::BrainVmMessage>,
 | |
| ) -> Result<()> {
 | |
|     log::info!("listening vm_daemon");
 | |
|     let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner();
 | |
| 
 | |
|     while let Some(Ok(stream_update)) = grpc_stream.next().await {
 | |
|         log::info!("vm deamon got notified: {:?}", &stream_update);
 | |
|         let _ = tx.send(stream_update).await;
 | |
|     }
 | |
| 
 | |
|     Ok(())
 | |
| }
 | |
| 
 | |
| pub async fn daemon_msg_sender(
 | |
|     mut client: BrainVmDaemonClient<Channel>,
 | |
|     key: Key,
 | |
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
 | |
|     rx: mpsc::Receiver<vm_proto::VmDaemonMessage>,
 | |
| ) -> Result<()> {
 | |
|     log::info!("sender vm_daemon");
 | |
|     let rx_stream = ReceiverStream::new(rx);
 | |
|     tx.send(vm_proto::VmDaemonMessage {
 | |
|         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)),
 | |
|     })
 | |
|     .await?;
 | |
|     client.daemon_messages(rx_stream).await?;
 | |
|     Ok(())
 | |
| }
 | |
| 
 | |
| pub async fn daemon_engine(
 | |
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
 | |
|     mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
 | |
| ) -> Result<()> {
 | |
|     log::info!("daemon engine vm_daemon");
 | |
|     while let Some(brain_msg) = rx.recv().await {
 | |
|         match brain_msg.msg {
 | |
|             Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
 | |
|                 let exposed_ports = [vec![22], new_vm_req.extra_ports].concat();
 | |
|                 let args = Some(vm_proto::MeasurementArgs {
 | |
|                     dtrfs_api_endpoint: String::from("184.107.169.199:48865"),
 | |
|                     exposed_ports,
 | |
|                     ovmf_hash: String::from(
 | |
|                         "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76",
 | |
|                     ),
 | |
|                     ips: vec![],
 | |
|                 });
 | |
| 
 | |
|                 let new_vm_resp = vm_proto::NewVmResp {
 | |
|                     uuid: new_vm_req.uuid.clone(),
 | |
|                     args,
 | |
|                     error: String::new(),
 | |
|                 };
 | |
| 
 | |
|                 let res_data = vm_proto::VmDaemonMessage {
 | |
|                     msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)),
 | |
|                 };
 | |
|                 tx.send(res_data).await?;
 | |
|             }
 | |
|             Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
 | |
|                 todo!()
 | |
|             }
 | |
|             Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => {
 | |
|                 todo!()
 | |
|             }
 | |
|             None => todo!(),
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     Ok(())
 | |
| }
 |