135 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			135 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| use super::test_utils::Key;
 | |
| use detee_shared::vm_proto;
 | |
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
 | |
| use detee_shared::vm_proto::RegisterVmNodeReq;
 | |
| use futures::StreamExt;
 | |
| use tokio::sync::mpsc;
 | |
| use tokio_stream::wrappers::ReceiverStream;
 | |
| use tonic::transport::Channel;
 | |
| 
 | |
| pub async fn mock_vm_daemon(brain_channel: Channel) -> String {
 | |
|     let daemon_client = BrainVmDaemonClient::new(brain_channel);
 | |
|     let daemon_key = Key::new();
 | |
| 
 | |
|     register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await;
 | |
| 
 | |
|     let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
 | |
| 
 | |
|     tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx));
 | |
| 
 | |
|     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1);
 | |
|     tokio::spawn(daemon_msg_sender(
 | |
|         daemon_client.clone(),
 | |
|         daemon_key.clone(),
 | |
|         daemon_msg_tx.clone(),
 | |
|         rx,
 | |
|     ));
 | |
| 
 | |
|     tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx));
 | |
| 
 | |
|     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
 | |
| 
 | |
|     daemon_key.pubkey
 | |
| }
 | |
| 
 | |
| pub async fn register_vm_node(
 | |
|     mut client: BrainVmDaemonClient<Channel>,
 | |
|     key: Key,
 | |
|     operator_wallet: String,
 | |
| ) -> Vec<vm_proto::VmContract> {
 | |
|     let node_pubkey = key.pubkey.clone();
 | |
| 
 | |
|     let req = RegisterVmNodeReq {
 | |
|         node_pubkey,
 | |
|         operator_wallet,
 | |
|         main_ip: String::from("185.243.218.213"),
 | |
|         city: String::from("Oslo"),
 | |
|         country: String::from("Norway"),
 | |
|         region: String::from("EU"),
 | |
|         price: 1200,
 | |
|     };
 | |
| 
 | |
|     let mut grpc_stream =
 | |
|         client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner();
 | |
| 
 | |
|     let mut vm_contracts = Vec::new();
 | |
|     while let Some(stream_update) = grpc_stream.next().await {
 | |
|         match stream_update {
 | |
|             Ok(vm_c) => {
 | |
|                 vm_contracts.push(vm_c);
 | |
|             }
 | |
|             Err(e) => {
 | |
|                 panic!("Received error instead of vm_contracts: {e:?}");
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     vm_contracts
 | |
| }
 | |
| 
 | |
| pub async fn daemon_listener(
 | |
|     mut client: BrainVmDaemonClient<Channel>,
 | |
|     key: Key,
 | |
|     tx: mpsc::Sender<vm_proto::BrainVmMessage>,
 | |
| ) {
 | |
|     let mut grpc_stream =
 | |
|         client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner();
 | |
| 
 | |
|     while let Some(Ok(stream_update)) = grpc_stream.next().await {
 | |
|         log::info!("vm deamon got notified: {:?}", &stream_update);
 | |
|         let _ = tx.send(stream_update).await;
 | |
|     }
 | |
| }
 | |
| 
 | |
| pub async fn daemon_msg_sender(
 | |
|     mut client: BrainVmDaemonClient<Channel>,
 | |
|     key: Key,
 | |
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
 | |
|     rx: mpsc::Receiver<vm_proto::VmDaemonMessage>,
 | |
| ) {
 | |
|     let rx_stream = ReceiverStream::new(rx);
 | |
|     tx.send(vm_proto::VmDaemonMessage {
 | |
|         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())),
 | |
|     })
 | |
|     .await
 | |
|     .unwrap();
 | |
|     client.daemon_messages(rx_stream).await.unwrap();
 | |
| }
 | |
| 
 | |
| pub async fn daemon_engine(
 | |
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
 | |
|     mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
 | |
| ) {
 | |
|     while let Some(brain_msg) = rx.recv().await {
 | |
|         match brain_msg.msg {
 | |
|             Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
 | |
|                 let args = Some(vm_proto::MeasurementArgs {
 | |
|                     dtrfs_api_endpoint: String::from("184.107.169.199:48865"),
 | |
|                     exposed_ports: new_vm_req.extra_ports,
 | |
|                     ovmf_hash: String::from(
 | |
|                         "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76",
 | |
|                     ),
 | |
|                     ips: vec![],
 | |
|                 });
 | |
| 
 | |
|                 let new_vm_resp = vm_proto::NewVmResp {
 | |
|                     uuid: new_vm_req.uuid.clone(),
 | |
|                     args,
 | |
|                     error: String::new(),
 | |
|                 };
 | |
| 
 | |
|                 let res_data = vm_proto::VmDaemonMessage {
 | |
|                     msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)),
 | |
|                 };
 | |
|                 tx.send(res_data).await.unwrap();
 | |
|             }
 | |
|             Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
 | |
|                 todo!()
 | |
|             }
 | |
|             Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => {
 | |
|                 todo!()
 | |
|             }
 | |
|             None => todo!(),
 | |
|         }
 | |
|     }
 | |
| }
 |