organize daemon test helpers
This commit is contained in:
		
							parent
							
								
									92827161ad
								
							
						
					
					
						commit
						ab39b12da9
					
				| @ -1,2 +1,6 @@ | |||||||
|  | #[allow(dead_code)] | ||||||
| pub mod prepare_test_env; | pub mod prepare_test_env; | ||||||
|  | #[allow(dead_code)] | ||||||
| pub mod test_utils; | pub mod test_utils; | ||||||
|  | #[allow(dead_code)] | ||||||
|  | pub mod vm_daemon_utils; | ||||||
|  | |||||||
| @ -92,7 +92,6 @@ pub async fn connect_stream_client_channel(c_stream: DuplexStream, addr: SocketA | |||||||
|         .unwrap() |         .unwrap() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[allow(dead_code)] |  | ||||||
| pub async fn run_service_for_stream() -> Channel { | pub async fn run_service_for_stream() -> Channel { | ||||||
|     let (client, addr) = run_service_for_stream_server().await; |     let (client, addr) = run_service_for_stream_server().await; | ||||||
|     connect_stream_client_channel(client, addr).await |     connect_stream_client_channel(client, addr).await | ||||||
|  | |||||||
| @ -38,7 +38,6 @@ impl Key { | |||||||
|         Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) |         Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     #[allow(dead_code)] |  | ||||||
|     pub fn sign_stream_auth(&self, contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> { |     pub fn sign_stream_auth(&self, contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> { | ||||||
|         let pubkey = self.pubkey.clone(); |         let pubkey = self.pubkey.clone(); | ||||||
|         let timestamp = chrono::Utc::now().to_rfc3339(); |         let timestamp = chrono::Utc::now().to_rfc3339(); | ||||||
|  | |||||||
							
								
								
									
										139
									
								
								tests/common/vm_daemon_utils.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										139
									
								
								tests/common/vm_daemon_utils.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,139 @@ | |||||||
|  | use super::prepare_test_env::{connect_stream_client_channel, run_service_for_stream_server}; | ||||||
|  | use super::test_utils::Key; | ||||||
|  | use detee_shared::vm_proto; | ||||||
|  | use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||||
|  | use detee_shared::vm_proto::RegisterVmNodeReq; | ||||||
|  | use futures::StreamExt; | ||||||
|  | use tokio::sync::mpsc; | ||||||
|  | use tokio_stream::wrappers::ReceiverStream; | ||||||
|  | use tonic::transport::Channel; | ||||||
|  | 
 | ||||||
|  | pub async fn mock_vm_daemon() -> (Channel, String) { | ||||||
|  |     let (tokio_duplex, addr) = run_service_for_stream_server().await; | ||||||
|  | 
 | ||||||
|  |     let channel = connect_stream_client_channel(tokio_duplex, addr).await; | ||||||
|  | 
 | ||||||
|  |     let daemon_client = BrainVmDaemonClient::new(channel.clone()); | ||||||
|  |     let daemon_key = Key::new(); | ||||||
|  | 
 | ||||||
|  |     register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; | ||||||
|  | 
 | ||||||
|  |     let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); | ||||||
|  | 
 | ||||||
|  |     tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); | ||||||
|  | 
 | ||||||
|  |     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); | ||||||
|  |     tokio::spawn(daemon_msg_sender( | ||||||
|  |         daemon_client.clone(), | ||||||
|  |         daemon_key.clone(), | ||||||
|  |         daemon_msg_tx.clone(), | ||||||
|  |         rx, | ||||||
|  |     )); | ||||||
|  | 
 | ||||||
|  |     tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx)); | ||||||
|  | 
 | ||||||
|  |     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||||||
|  | 
 | ||||||
|  |     (channel, daemon_key.pubkey) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn register_vm_node( | ||||||
|  |     mut client: BrainVmDaemonClient<Channel>, | ||||||
|  |     key: Key, | ||||||
|  |     operator_wallet: String, | ||||||
|  | ) -> Vec<vm_proto::VmContract> { | ||||||
|  |     let node_pubkey = key.pubkey.clone(); | ||||||
|  | 
 | ||||||
|  |     let req = RegisterVmNodeReq { | ||||||
|  |         node_pubkey, | ||||||
|  |         operator_wallet, | ||||||
|  |         main_ip: String::from("185.243.218.213"), | ||||||
|  |         city: String::from("Oslo"), | ||||||
|  |         country: String::from("Norway"), | ||||||
|  |         region: String::from("EU"), | ||||||
|  |         price: 1200, | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     let mut grpc_stream = | ||||||
|  |         client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner(); | ||||||
|  | 
 | ||||||
|  |     let mut vm_contracts = Vec::new(); | ||||||
|  |     while let Some(stream_update) = grpc_stream.next().await { | ||||||
|  |         match stream_update { | ||||||
|  |             Ok(vm_c) => { | ||||||
|  |                 vm_contracts.push(vm_c); | ||||||
|  |             } | ||||||
|  |             Err(e) => { | ||||||
|  |                 panic!("Received error instead of vm_contracts: {e:?}"); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     vm_contracts | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn daemon_listener( | ||||||
|  |     mut client: BrainVmDaemonClient<Channel>, | ||||||
|  |     key: Key, | ||||||
|  |     tx: mpsc::Sender<vm_proto::BrainVmMessage>, | ||||||
|  | ) { | ||||||
|  |     let mut grpc_stream = | ||||||
|  |         client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); | ||||||
|  | 
 | ||||||
|  |     while let Some(Ok(stream_update)) = grpc_stream.next().await { | ||||||
|  |         log::info!("vm deamon got notified: {:?}", &stream_update); | ||||||
|  |         let _ = tx.send(stream_update).await; | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn daemon_msg_sender( | ||||||
|  |     mut client: BrainVmDaemonClient<Channel>, | ||||||
|  |     key: Key, | ||||||
|  |     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, | ||||||
|  |     rx: mpsc::Receiver<vm_proto::VmDaemonMessage>, | ||||||
|  | ) { | ||||||
|  |     let rx_stream = ReceiverStream::new(rx); | ||||||
|  |     tx.send(vm_proto::VmDaemonMessage { | ||||||
|  |         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), | ||||||
|  |     }) | ||||||
|  |     .await | ||||||
|  |     .unwrap(); | ||||||
|  |     client.daemon_messages(rx_stream).await.unwrap(); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn daemon_engine( | ||||||
|  |     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, | ||||||
|  |     mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>, | ||||||
|  | ) { | ||||||
|  |     while let Some(brain_msg) = rx.recv().await { | ||||||
|  |         match brain_msg.msg { | ||||||
|  |             Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { | ||||||
|  |                 let args = Some(vm_proto::MeasurementArgs { | ||||||
|  |                     dtrfs_api_endpoint: String::from("184.107.169.199:48865"), | ||||||
|  |                     exposed_ports: new_vm_req.extra_ports, | ||||||
|  |                     ovmf_hash: String::from( | ||||||
|  |                         "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", | ||||||
|  |                     ), | ||||||
|  |                     ips: vec![], | ||||||
|  |                 }); | ||||||
|  | 
 | ||||||
|  |                 let new_vm_resp = vm_proto::NewVmResp { | ||||||
|  |                     uuid: new_vm_req.uuid.clone(), | ||||||
|  |                     args, | ||||||
|  |                     error: String::new(), | ||||||
|  |                 }; | ||||||
|  | 
 | ||||||
|  |                 let res_data = vm_proto::VmDaemonMessage { | ||||||
|  |                     msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), | ||||||
|  |                 }; | ||||||
|  |                 tx.send(res_data).await.unwrap(); | ||||||
|  |             } | ||||||
|  |             Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { | ||||||
|  |                 todo!() | ||||||
|  |             } | ||||||
|  |             Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { | ||||||
|  |                 todo!() | ||||||
|  |             } | ||||||
|  |             None => todo!(), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
| @ -1,18 +1,11 @@ | |||||||
| use common::{ | use common::{ | ||||||
|     prepare_test_env::{ |     prepare_test_env::{prepare_test_db, run_service_in_background}, | ||||||
|         connect_stream_client_channel, prepare_test_db, run_service_for_stream_server, |  | ||||||
|         run_service_in_background, |  | ||||||
|     }, |  | ||||||
|     test_utils::Key, |     test_utils::Key, | ||||||
|  |     vm_daemon_utils::{mock_vm_daemon, register_vm_node}, | ||||||
| }; | }; | ||||||
| use detee_shared::vm_proto; | use detee_shared::vm_proto; | ||||||
| use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||||
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||||
| use detee_shared::vm_proto::RegisterVmNodeReq; |  | ||||||
| use futures::StreamExt; |  | ||||||
| use tokio::sync::mpsc; |  | ||||||
| use tokio_stream::wrappers::ReceiverStream; |  | ||||||
| use tonic::transport::Channel; |  | ||||||
| 
 | 
 | ||||||
| mod common; | mod common; | ||||||
| 
 | 
 | ||||||
| @ -32,78 +25,22 @@ async fn test_reg_vm_node() { | |||||||
|     assert!(vm_contracts.is_empty()) |     assert!(vm_contracts.is_empty()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn register_vm_node( |  | ||||||
|     mut client: BrainVmDaemonClient<Channel>, |  | ||||||
|     key: Key, |  | ||||||
|     operator_wallet: String, |  | ||||||
| ) -> Vec<vm_proto::VmContract> { |  | ||||||
|     let node_pubkey = key.pubkey.clone(); |  | ||||||
| 
 |  | ||||||
|     let req = RegisterVmNodeReq { |  | ||||||
|         node_pubkey, |  | ||||||
|         operator_wallet, |  | ||||||
|         main_ip: String::from("185.243.218.213"), |  | ||||||
|         city: String::from("Oslo"), |  | ||||||
|         country: String::from("Norway"), |  | ||||||
|         region: String::from("EU"), |  | ||||||
|         price: 1200, |  | ||||||
|     }; |  | ||||||
| 
 |  | ||||||
|     let mut grpc_stream = |  | ||||||
|         client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner(); |  | ||||||
| 
 |  | ||||||
|     let mut vm_contracts = Vec::new(); |  | ||||||
|     while let Some(stream_update) = grpc_stream.next().await { |  | ||||||
|         match stream_update { |  | ||||||
|             Ok(vm_c) => { |  | ||||||
|                 vm_contracts.push(vm_c); |  | ||||||
|             } |  | ||||||
|             Err(e) => { |  | ||||||
|                 panic!("Received error instead of vm_contracts: {e:?}"); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|     vm_contracts |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| #[tokio::test] | #[tokio::test] | ||||||
| async fn test_brain_message() { | async fn test_brain_message() { | ||||||
|     env_logger::builder().filter_level(log::LevelFilter::Info).init(); |     env_logger::builder().filter_level(log::LevelFilter::Info).init(); | ||||||
|     let _ = prepare_test_db().await; |     let _ = prepare_test_db().await; | ||||||
| 
 | 
 | ||||||
|     let (tokio_duplex, addr) = run_service_for_stream_server().await; |  | ||||||
| 
 |  | ||||||
|     let channel = connect_stream_client_channel(tokio_duplex, addr).await; |  | ||||||
| 
 |  | ||||||
|     let daemon_client = BrainVmDaemonClient::new(channel.clone()); |  | ||||||
| 
 |  | ||||||
|     let daemon_key = Key::new(); |  | ||||||
| 
 |  | ||||||
|     register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; |  | ||||||
| 
 |  | ||||||
|     let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); |  | ||||||
| 
 |  | ||||||
|     tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); |  | ||||||
| 
 |  | ||||||
|     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); |  | ||||||
|     tokio::spawn(daemon_msg_sender( |  | ||||||
|         daemon_client.clone(), |  | ||||||
|         daemon_key.clone(), |  | ||||||
|         daemon_msg_tx.clone(), |  | ||||||
|         rx, |  | ||||||
|     )); |  | ||||||
| 
 |  | ||||||
|     tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx)); |  | ||||||
| 
 |  | ||||||
|     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; |     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||||||
| 
 | 
 | ||||||
|  |     let (channel, daemon_key) = mock_vm_daemon().await; | ||||||
|  | 
 | ||||||
|     let mut cli_client = BrainVmCliClient::new(channel); |     let mut cli_client = BrainVmCliClient::new(channel); | ||||||
| 
 | 
 | ||||||
|     let cli_key = Key::new(); |     let cli_key = Key::new(); | ||||||
| 
 | 
 | ||||||
|     let req = vm_proto::NewVmReq { |     let req = vm_proto::NewVmReq { | ||||||
|         admin_pubkey: cli_key.pubkey.clone(), |         admin_pubkey: cli_key.pubkey.clone(), | ||||||
|         node_pubkey: daemon_key.pubkey.clone(), |         node_pubkey: daemon_key, | ||||||
|         price_per_unit: 1200, |         price_per_unit: 1200, | ||||||
|         extra_ports: vec![8080, 8081], |         extra_ports: vec![8080, 8081], | ||||||
|         locked_nano: 0, |         locked_nano: 0, | ||||||
| @ -121,69 +58,3 @@ async fn test_brain_message() { | |||||||
| 
 | 
 | ||||||
|     assert_eq!(data_in_db, new_vm_resp.args.unwrap()); |     assert_eq!(data_in_db, new_vm_resp.args.unwrap()); | ||||||
| } | } | ||||||
| 
 |  | ||||||
| async fn daemon_listener( |  | ||||||
|     mut client: BrainVmDaemonClient<Channel>, |  | ||||||
|     key: Key, |  | ||||||
|     tx: mpsc::Sender<vm_proto::BrainVmMessage>, |  | ||||||
| ) { |  | ||||||
|     let mut grpc_stream = |  | ||||||
|         client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); |  | ||||||
| 
 |  | ||||||
|     while let Some(Ok(stream_update)) = grpc_stream.next().await { |  | ||||||
|         log::info!("vm deamon got notified: {:?}", &stream_update); |  | ||||||
|         let _ = tx.send(stream_update).await; |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| async fn daemon_msg_sender( |  | ||||||
|     mut client: BrainVmDaemonClient<Channel>, |  | ||||||
|     key: Key, |  | ||||||
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, |  | ||||||
|     rx: mpsc::Receiver<vm_proto::VmDaemonMessage>, |  | ||||||
| ) { |  | ||||||
|     let rx_stream = ReceiverStream::new(rx); |  | ||||||
|     tx.send(vm_proto::VmDaemonMessage { |  | ||||||
|         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), |  | ||||||
|     }) |  | ||||||
|     .await |  | ||||||
|     .unwrap(); |  | ||||||
|     client.daemon_messages(rx_stream).await.unwrap(); |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| async fn daemon_engine( |  | ||||||
|     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, |  | ||||||
|     mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>, |  | ||||||
| ) { |  | ||||||
|     while let Some(brain_msg) = rx.recv().await { |  | ||||||
|         match brain_msg.msg { |  | ||||||
|             Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { |  | ||||||
|                 let args = Some(vm_proto::MeasurementArgs { |  | ||||||
|                     dtrfs_api_endpoint: String::from("184.107.169.199:48865"), |  | ||||||
|                     exposed_ports: new_vm_req.extra_ports, |  | ||||||
|                     ovmf_hash: String::from( |  | ||||||
|                         "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", |  | ||||||
|                     ), |  | ||||||
|                     ips: vec![], |  | ||||||
|                 }); |  | ||||||
| 
 |  | ||||||
|                 let new_vm_resp = vm_proto::NewVmResp { |  | ||||||
|                     uuid: new_vm_req.uuid.clone(), |  | ||||||
|                     args, |  | ||||||
|                     error: String::new(), |  | ||||||
|                 }; |  | ||||||
| 
 |  | ||||||
|                 let res_data = vm_proto::VmDaemonMessage { |  | ||||||
|                     msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), |  | ||||||
|                 }; |  | ||||||
|                 tx.send(res_data).await.unwrap(); |  | ||||||
|             } |  | ||||||
|             Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { |  | ||||||
|                 todo!() |  | ||||||
|             } |  | ||||||
|             Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { |  | ||||||
|                 todo!() |  | ||||||
|             } |  | ||||||
|             None => todo!(), |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user