usage of MeasurementArgs and VmNodeFilters from vm_proto in db module fix test brain message add new ssh port on mock daemon while new vm
		
			
				
	
	
		
			143 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			143 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| use super::test_utils::Key;
 | |
| use anyhow::Result;
 | |
| use detee_shared::vm_proto;
 | |
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
 | |
| use detee_shared::vm_proto::RegisterVmNodeReq;
 | |
| use futures::StreamExt;
 | |
| use tokio::sync::mpsc;
 | |
| use tokio_stream::wrappers::ReceiverStream;
 | |
| use tonic::transport::Channel;
 | |
| 
 | |
| pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result<String> {
 | |
|     let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
 | |
|     let daemon_key = Key::new();
 | |
| 
 | |
|     register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?;
 | |
| 
 | |
|     let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
 | |
| 
 | |
|     tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx));
 | |
| 
 | |
|     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1);
 | |
|     tokio::spawn(daemon_msg_sender(
 | |
|         daemon_client.clone(),
 | |
|         daemon_key.clone(),
 | |
|         daemon_msg_tx.clone(),
 | |
|         rx,
 | |
|     ));
 | |
| 
 | |
|     tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx));
 | |
| 
 | |
|     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
 | |
| 
 | |
|     Ok(daemon_key.pubkey)
 | |
| }
 | |
| 
 | |
| pub async fn register_vm_node(
 | |
|     client: &mut BrainVmDaemonClient<Channel>,
 | |
|     key: &Key,
 | |
|     operator_wallet: &str,
 | |
| ) -> Result<Vec<vm_proto::VmContract>> {
 | |
|     log::info!("Registering vm_node: {}", key.pubkey);
 | |
|     let node_pubkey = key.pubkey.clone();
 | |
| 
 | |
|     let req = RegisterVmNodeReq {
 | |
|         node_pubkey,
 | |
|         operator_wallet: operator_wallet.to_string(),
 | |
|         main_ip: String::from("185.243.218.213"),
 | |
|         city: String::from("Oslo"),
 | |
|         country: String::from("Norway"),
 | |
|         region: String::from("EU"),
 | |
|         price: 1200,
 | |
|     };
 | |
| 
 | |
|     let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner();
 | |
| 
 | |
|     let mut vm_contracts = Vec::new();
 | |
|     while let Some(stream_update) = grpc_stream.next().await {
 | |
|         match stream_update {
 | |
|             Ok(vm_c) => {
 | |
|                 vm_contracts.push(vm_c);
 | |
|             }
 | |
|             Err(e) => {
 | |
|                 panic!("Received error instead of vm_contracts: {e:?}");
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     Ok(vm_contracts)
 | |
| }
 | |
| 
 | |
| pub async fn daemon_listener(
 | |
|     mut client: BrainVmDaemonClient<Channel>,
 | |
|     key: Key,
 | |
|     tx: mpsc::Sender<vm_proto::BrainVmMessage>,
 | |
| ) -> Result<()> {
 | |
|     log::info!("listening vm_daemon");
 | |
|     let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner();
 | |
| 
 | |
|     while let Some(Ok(stream_update)) = grpc_stream.next().await {
 | |
|         log::info!("vm deamon got notified: {:?}", &stream_update);
 | |
|         let _ = tx.send(stream_update).await;
 | |
|     }
 | |
| 
 | |
|     Ok(())
 | |
| }
 | |
| 
 | |
| pub async fn daemon_msg_sender(
 | |
|     mut client: BrainVmDaemonClient<Channel>,
 | |
|     key: Key,
 | |
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
 | |
|     rx: mpsc::Receiver<vm_proto::VmDaemonMessage>,
 | |
| ) -> Result<()> {
 | |
|     log::info!("sender vm_daemon");
 | |
|     let rx_stream = ReceiverStream::new(rx);
 | |
|     tx.send(vm_proto::VmDaemonMessage {
 | |
|         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)),
 | |
|     })
 | |
|     .await?;
 | |
|     client.daemon_messages(rx_stream).await?;
 | |
|     Ok(())
 | |
| }
 | |
| 
 | |
| pub async fn daemon_engine(
 | |
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
 | |
|     mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
 | |
| ) -> Result<()> {
 | |
|     log::info!("daemon engine vm_daemon");
 | |
|     while let Some(brain_msg) = rx.recv().await {
 | |
|         match brain_msg.msg {
 | |
|             Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
 | |
|                 let exposed_ports = [vec![22], new_vm_req.extra_ports].concat();
 | |
|                 let args = Some(vm_proto::MeasurementArgs {
 | |
|                     dtrfs_api_endpoint: String::from("184.107.169.199:48865"),
 | |
|                     exposed_ports,
 | |
|                     ovmf_hash: String::from(
 | |
|                         "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76",
 | |
|                     ),
 | |
|                     ips: vec![],
 | |
|                 });
 | |
| 
 | |
|                 let new_vm_resp = vm_proto::NewVmResp {
 | |
|                     uuid: new_vm_req.uuid.clone(),
 | |
|                     args,
 | |
|                     error: String::new(),
 | |
|                 };
 | |
| 
 | |
|                 let res_data = vm_proto::VmDaemonMessage {
 | |
|                     msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)),
 | |
|                 };
 | |
|                 tx.send(res_data).await?;
 | |
|             }
 | |
|             Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
 | |
|                 todo!()
 | |
|             }
 | |
|             Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => {
 | |
|                 todo!()
 | |
|             }
 | |
|             None => todo!(),
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     Ok(())
 | |
| }
 |