refactor test vm daemon
This commit is contained in:
		
							parent
							
								
									32f6548eff
								
							
						
					
					
						commit
						92827161ad
					
				| @ -10,8 +10,9 @@ use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | |||||||
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||||
| use detee_shared::vm_proto::RegisterVmNodeReq; | use detee_shared::vm_proto::RegisterVmNodeReq; | ||||||
| use futures::StreamExt; | use futures::StreamExt; | ||||||
| use tokio::task::JoinSet; | use tokio::sync::mpsc; | ||||||
| use tokio_stream::wrappers::ReceiverStream; | use tokio_stream::wrappers::ReceiverStream; | ||||||
|  | use tonic::transport::Channel; | ||||||
| 
 | 
 | ||||||
| mod common; | mod common; | ||||||
| 
 | 
 | ||||||
| @ -32,7 +33,7 @@ async fn test_reg_vm_node() { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn register_vm_node( | async fn register_vm_node( | ||||||
|     mut client: BrainVmDaemonClient<tonic::transport::Channel>, |     mut client: BrainVmDaemonClient<Channel>, | ||||||
|     key: Key, |     key: Key, | ||||||
|     operator_wallet: String, |     operator_wallet: String, | ||||||
| ) -> Vec<vm_proto::VmContract> { | ) -> Vec<vm_proto::VmContract> { | ||||||
| @ -67,13 +68,6 @@ async fn register_vm_node( | |||||||
| 
 | 
 | ||||||
| #[tokio::test] | #[tokio::test] | ||||||
| async fn test_brain_message() { | async fn test_brain_message() { | ||||||
|     // spawn grpc stream server
 |  | ||||||
|     // mock a daemon
 |  | ||||||
|     // mock a cli client to interact with brain
 |  | ||||||
| 
 |  | ||||||
|     // validate if something happening in "surreal_brain::db::NewVmReq", "surreal_brain::db::UpdateVmReq", "surreal_brain::db::DeletedVm" these table
 |  | ||||||
|     // mock daemon will responde to brain
 |  | ||||||
| 
 |  | ||||||
|     env_logger::builder().filter_level(log::LevelFilter::Info).init(); |     env_logger::builder().filter_level(log::LevelFilter::Info).init(); | ||||||
|     let _ = prepare_test_db().await; |     let _ = prepare_test_db().await; | ||||||
| 
 | 
 | ||||||
| @ -81,84 +75,25 @@ async fn test_brain_message() { | |||||||
| 
 | 
 | ||||||
|     let channel = connect_stream_client_channel(tokio_duplex, addr).await; |     let channel = connect_stream_client_channel(tokio_duplex, addr).await; | ||||||
| 
 | 
 | ||||||
|     let mut daemon_client = BrainVmDaemonClient::new(channel.clone()); |     let daemon_client = BrainVmDaemonClient::new(channel.clone()); | ||||||
| 
 | 
 | ||||||
|     let daemon_key = Key::new(); |     let daemon_key = Key::new(); | ||||||
| 
 | 
 | ||||||
|     register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; |     register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; | ||||||
| 
 | 
 | ||||||
|     let mut daemon_join_set = JoinSet::new(); |     let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); | ||||||
| 
 | 
 | ||||||
|     let (tx, mut brain_msg_rx) = tokio::sync::mpsc::channel(1); |     tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx)); | ||||||
| 
 | 
 | ||||||
|     // listen to brain
 |  | ||||||
|     let mut daemon_client_01 = daemon_client.clone(); |  | ||||||
|     let daemon_key_01 = daemon_key.clone(); |  | ||||||
|     daemon_join_set.spawn(async move { |  | ||||||
|         let mut grpc_stream = daemon_client_01 |  | ||||||
|             .brain_messages(daemon_key_01.sign_stream_auth(vec![]).unwrap()) |  | ||||||
|             .await |  | ||||||
|             .unwrap() |  | ||||||
|             .into_inner(); |  | ||||||
| 
 |  | ||||||
|         while let Some(Ok(stream_update)) = grpc_stream.next().await { |  | ||||||
|             log::info!("vm deamon got notified: {:?}", &stream_update); |  | ||||||
|             let _ = tx.send(stream_update).await; |  | ||||||
|         } |  | ||||||
|     }); |  | ||||||
| 
 |  | ||||||
|     // send to brain
 |  | ||||||
|     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); |     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); | ||||||
|     let daemon_msg_tx_01 = daemon_msg_tx.clone(); |     tokio::spawn(daemon_msg_sender( | ||||||
|     let daemon_key_02 = daemon_key.clone(); |         daemon_client.clone(), | ||||||
|     daemon_join_set.spawn(async move { |         daemon_key.clone(), | ||||||
|         let rx_stream = ReceiverStream::new(rx); |         daemon_msg_tx.clone(), | ||||||
|         daemon_msg_tx_01 |         rx, | ||||||
|             .send(vm_proto::VmDaemonMessage { |     )); | ||||||
|                 msg: Some(vm_proto::vm_daemon_message::Msg::Auth( |  | ||||||
|                     daemon_key_02.sign_stream_auth(vec![]).unwrap(), |  | ||||||
|                 )), |  | ||||||
|             }) |  | ||||||
|             .await |  | ||||||
|             .unwrap(); |  | ||||||
|         daemon_client.daemon_messages(rx_stream).await.unwrap(); |  | ||||||
|     }); |  | ||||||
| 
 | 
 | ||||||
|     // daemon engine
 |     tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx)); | ||||||
|     daemon_join_set.spawn(async move { |  | ||||||
|         while let Some(brain_msg) = brain_msg_rx.recv().await { |  | ||||||
|             match brain_msg.msg { |  | ||||||
|                 Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { |  | ||||||
|                     let args = Some(vm_proto::MeasurementArgs { |  | ||||||
|                         dtrfs_api_endpoint: String::from("184.107.169.199:48865"), |  | ||||||
|                         exposed_ports: new_vm_req.extra_ports, |  | ||||||
|                         ovmf_hash: String::from( |  | ||||||
|                             "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", |  | ||||||
|                         ), |  | ||||||
|                         ips: vec![], |  | ||||||
|                     }); |  | ||||||
| 
 |  | ||||||
|                     let new_vm_resp = vm_proto::NewVmResp { |  | ||||||
|                         uuid: new_vm_req.uuid.clone(), |  | ||||||
|                         args, |  | ||||||
|                         error: String::new(), |  | ||||||
|                     }; |  | ||||||
| 
 |  | ||||||
|                     let res_data = vm_proto::VmDaemonMessage { |  | ||||||
|                         msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), |  | ||||||
|                     }; |  | ||||||
|                     daemon_msg_tx.send(res_data).await.unwrap(); |  | ||||||
|                 } |  | ||||||
|                 Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { |  | ||||||
|                     todo!() |  | ||||||
|                 } |  | ||||||
|                 Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { |  | ||||||
|                     todo!() |  | ||||||
|                 } |  | ||||||
|                 None => todo!(), |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     }); |  | ||||||
| 
 | 
 | ||||||
|     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; |     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||||||
| 
 | 
 | ||||||
| @ -186,3 +121,69 @@ async fn test_brain_message() { | |||||||
| 
 | 
 | ||||||
|     assert_eq!(data_in_db, new_vm_resp.args.unwrap()); |     assert_eq!(data_in_db, new_vm_resp.args.unwrap()); | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | async fn daemon_listener( | ||||||
|  |     mut client: BrainVmDaemonClient<Channel>, | ||||||
|  |     key: Key, | ||||||
|  |     tx: mpsc::Sender<vm_proto::BrainVmMessage>, | ||||||
|  | ) { | ||||||
|  |     let mut grpc_stream = | ||||||
|  |         client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); | ||||||
|  | 
 | ||||||
|  |     while let Some(Ok(stream_update)) = grpc_stream.next().await { | ||||||
|  |         log::info!("vm deamon got notified: {:?}", &stream_update); | ||||||
|  |         let _ = tx.send(stream_update).await; | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | async fn daemon_msg_sender( | ||||||
|  |     mut client: BrainVmDaemonClient<Channel>, | ||||||
|  |     key: Key, | ||||||
|  |     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, | ||||||
|  |     rx: mpsc::Receiver<vm_proto::VmDaemonMessage>, | ||||||
|  | ) { | ||||||
|  |     let rx_stream = ReceiverStream::new(rx); | ||||||
|  |     tx.send(vm_proto::VmDaemonMessage { | ||||||
|  |         msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), | ||||||
|  |     }) | ||||||
|  |     .await | ||||||
|  |     .unwrap(); | ||||||
|  |     client.daemon_messages(rx_stream).await.unwrap(); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | async fn daemon_engine( | ||||||
|  |     tx: mpsc::Sender<vm_proto::VmDaemonMessage>, | ||||||
|  |     mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>, | ||||||
|  | ) { | ||||||
|  |     while let Some(brain_msg) = rx.recv().await { | ||||||
|  |         match brain_msg.msg { | ||||||
|  |             Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { | ||||||
|  |                 let args = Some(vm_proto::MeasurementArgs { | ||||||
|  |                     dtrfs_api_endpoint: String::from("184.107.169.199:48865"), | ||||||
|  |                     exposed_ports: new_vm_req.extra_ports, | ||||||
|  |                     ovmf_hash: String::from( | ||||||
|  |                         "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", | ||||||
|  |                     ), | ||||||
|  |                     ips: vec![], | ||||||
|  |                 }); | ||||||
|  | 
 | ||||||
|  |                 let new_vm_resp = vm_proto::NewVmResp { | ||||||
|  |                     uuid: new_vm_req.uuid.clone(), | ||||||
|  |                     args, | ||||||
|  |                     error: String::new(), | ||||||
|  |                 }; | ||||||
|  | 
 | ||||||
|  |                 let res_data = vm_proto::VmDaemonMessage { | ||||||
|  |                     msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), | ||||||
|  |                 }; | ||||||
|  |                 tx.send(res_data).await.unwrap(); | ||||||
|  |             } | ||||||
|  |             Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { | ||||||
|  |                 todo!() | ||||||
|  |             } | ||||||
|  |             Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { | ||||||
|  |                 todo!() | ||||||
|  |             } | ||||||
|  |             None => todo!(), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user