test brain message
refactor test environment mocked daemon and cli to interact with brain
This commit is contained in:
		
							parent
							
								
									0359de54fe
								
							
						
					
					
						commit
						a7268792ad
					
				| @ -6,6 +6,7 @@ use detee_shared::{ | |||||||
| use hyper_util::rt::TokioIo; | use hyper_util::rt::TokioIo; | ||||||
| use std::net::SocketAddr; | use std::net::SocketAddr; | ||||||
| use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmCliForReal, BrainVmDaemonForReal}; | use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmCliForReal, BrainVmDaemonForReal}; | ||||||
|  | use tokio::io::DuplexStream; | ||||||
| use tokio::{net::TcpListener, sync::OnceCell}; | use tokio::{net::TcpListener, sync::OnceCell}; | ||||||
| use tonic::transport::{Channel, Endpoint, Server, Uri}; | use tonic::transport::{Channel, Endpoint, Server, Uri}; | ||||||
| use tower::service_fn; | use tower::service_fn; | ||||||
| @ -34,16 +35,6 @@ pub async fn prepare_test_db() { | |||||||
|         .await; |         .await; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn _reset_test_db() { |  | ||||||
|     let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); |  | ||||||
|     surreal_brain::db::DB.query(format!("REMOVE DATABASE {DB_NAME}")).await.unwrap(); |  | ||||||
|     surreal_brain::db::DB |  | ||||||
|         .query(std::fs::read_to_string("interim_tables.surql").unwrap()) |  | ||||||
|         .await |  | ||||||
|         .unwrap(); |  | ||||||
|     surreal_brain::db::migration0(&old_brain_data).await.unwrap(); |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| pub async fn run_service_in_background() -> SocketAddr { | pub async fn run_service_in_background() -> SocketAddr { | ||||||
|     let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); |     let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||||||
|     let addr = listener.local_addr().unwrap(); |     let addr = listener.local_addr().unwrap(); | ||||||
| @ -63,12 +54,11 @@ pub async fn run_service_in_background() -> SocketAddr { | |||||||
|     addr |     addr | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn run_service_for_stream() -> Channel { | pub async fn run_service_for_stream_server() -> (DuplexStream, SocketAddr) { | ||||||
|     let (client, server) = tokio::io::duplex(1024); |     let (client, server) = tokio::io::duplex(1024); | ||||||
| 
 | 
 | ||||||
|     let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); |     let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||||||
|     let addr = listener.local_addr().unwrap(); |     let addr = listener.local_addr().unwrap(); | ||||||
|     let url = format!("http://{}", addr); |  | ||||||
| 
 | 
 | ||||||
|     tokio::spawn(async move { |     tokio::spawn(async move { | ||||||
|         tonic::transport::Server::builder() |         tonic::transport::Server::builder() | ||||||
| @ -78,8 +68,13 @@ pub async fn run_service_for_stream() -> Channel { | |||||||
|             .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) |             .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) | ||||||
|             .await |             .await | ||||||
|     }); |     }); | ||||||
|  |     (client, addr) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn connect_stream_client_channel(c_stream: DuplexStream, addr: SocketAddr) -> Channel { | ||||||
|  |     let url = format!("http://{}", addr); | ||||||
|  |     let mut client = Some(c_stream); | ||||||
| 
 | 
 | ||||||
|     let mut client = Some(client); |  | ||||||
|     Endpoint::try_from(url) |     Endpoint::try_from(url) | ||||||
|         .unwrap() |         .unwrap() | ||||||
|         .connect_with_connector(service_fn(move |_: Uri| { |         .connect_with_connector(service_fn(move |_: Uri| { | ||||||
| @ -96,3 +91,8 @@ pub async fn run_service_for_stream() -> Channel { | |||||||
|         .await |         .await | ||||||
|         .unwrap() |         .unwrap() | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | pub async fn run_service_for_stream() -> Channel { | ||||||
|  |     let (client, addr) = run_service_for_stream_server().await; | ||||||
|  |     connect_stream_client_channel(client, addr).await | ||||||
|  | } | ||||||
|  | |||||||
| @ -1,9 +1,11 @@ | |||||||
| use anyhow::Result; | use anyhow::Result; | ||||||
|  | use detee_shared::vm_proto as snp_proto; | ||||||
| use ed25519_dalek::Signer; | use ed25519_dalek::Signer; | ||||||
| use ed25519_dalek::SigningKey; | use ed25519_dalek::SigningKey; | ||||||
| use tonic::metadata::AsciiMetadataValue; | use tonic::metadata::AsciiMetadataValue; | ||||||
| use tonic::Request; | use tonic::Request; | ||||||
| 
 | 
 | ||||||
|  | #[derive(Debug, Clone)] | ||||||
| pub struct Key { | pub struct Key { | ||||||
|     pub sg_key: SigningKey, |     pub sg_key: SigningKey, | ||||||
|     pub pubkey: String, |     pub pubkey: String, | ||||||
| @ -35,4 +37,12 @@ impl Key { | |||||||
|         let key = self.sg_key.clone(); |         let key = self.sg_key.clone(); | ||||||
|         Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) |         Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string()) | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     pub fn sign_stream_auth(&self, contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> { | ||||||
|  |         let pubkey = self.pubkey.clone(); | ||||||
|  |         let timestamp = chrono::Utc::now().to_rfc3339(); | ||||||
|  |         let signature = | ||||||
|  |             self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; | ||||||
|  |         Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature }) | ||||||
|  |     } | ||||||
| } | } | ||||||
|  | |||||||
| @ -4,14 +4,17 @@ use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | |||||||
| use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; | ||||||
| use detee_shared::vm_proto::{ListVmContractsReq, RegisterVmNodeReq}; | use detee_shared::vm_proto::{ListVmContractsReq, RegisterVmNodeReq}; | ||||||
| use detee_shared::{ | use detee_shared::{ | ||||||
|     common_proto::Pubkey, general_proto::brain_general_cli_client::BrainGeneralCliClient, |     common_proto::Pubkey, general_proto::brain_general_cli_client::BrainGeneralCliClient, vm_proto, | ||||||
| }; | }; | ||||||
| mod common; | mod common; | ||||||
| use common::prepare_test_env::{ | use common::prepare_test_env::{ | ||||||
|     prepare_test_db, run_service_for_stream, run_service_in_background, |     connect_stream_client_channel, prepare_test_db, run_service_for_stream, | ||||||
|  |     run_service_for_stream_server, run_service_in_background, | ||||||
| }; | }; | ||||||
| use common::test_utils::Key; | use common::test_utils::Key; | ||||||
| use futures::StreamExt; | use futures::StreamExt; | ||||||
|  | use tokio::task::JoinSet; | ||||||
|  | use tokio_stream::wrappers::ReceiverStream; | ||||||
| 
 | 
 | ||||||
| #[tokio::test] | #[tokio::test] | ||||||
| async fn test_reg_vm_node() { | async fn test_reg_vm_node() { | ||||||
| @ -54,6 +57,128 @@ async fn test_reg_vm_node() { | |||||||
|     assert!(vm_contracts.is_empty()) |     assert!(vm_contracts.is_empty()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | #[tokio::test] | ||||||
|  | async fn test_brain_message() { | ||||||
|  |     // spawn grpc stream server
 | ||||||
|  |     // mock a daemon
 | ||||||
|  |     // mock a cli client to interact with brain
 | ||||||
|  | 
 | ||||||
|  |     // validate if something happening in "surreal_brain::db::NewVmReq", "surreal_brain::db::UpdateVmReq", "surreal_brain::db::DeletedVm" these table
 | ||||||
|  |     // mock daemon will responde to brain
 | ||||||
|  | 
 | ||||||
|  |     // TODO: register node before connecting to brain
 | ||||||
|  | 
 | ||||||
|  |     env_logger::builder().filter_level(log::LevelFilter::Info).init(); | ||||||
|  |     let _ = prepare_test_db().await; | ||||||
|  | 
 | ||||||
|  |     let (tokio_duplex, addr) = run_service_for_stream_server().await; | ||||||
|  | 
 | ||||||
|  |     let channel = connect_stream_client_channel(tokio_duplex, addr).await; | ||||||
|  | 
 | ||||||
|  |     let mut daemon_client = BrainVmDaemonClient::new(channel.clone()); | ||||||
|  | 
 | ||||||
|  |     let daemon_key = Key::new(); | ||||||
|  | 
 | ||||||
|  |     let mut daemon_join_set = JoinSet::new(); | ||||||
|  | 
 | ||||||
|  |     let (tx, mut brain_msg_rx) = tokio::sync::mpsc::channel(1); | ||||||
|  | 
 | ||||||
|  |     // listen to brain
 | ||||||
|  |     let mut daemon_client_01 = daemon_client.clone(); | ||||||
|  |     let daemon_key_01 = daemon_key.clone(); | ||||||
|  |     daemon_join_set.spawn(async move { | ||||||
|  |         let mut grpc_stream = daemon_client_01 | ||||||
|  |             .brain_messages(daemon_key_01.sign_stream_auth(vec![]).unwrap()) | ||||||
|  |             .await | ||||||
|  |             .unwrap() | ||||||
|  |             .into_inner(); | ||||||
|  | 
 | ||||||
|  |         while let Some(Ok(stream_update)) = grpc_stream.next().await { | ||||||
|  |             log::info!("vm deamon got notified: {:?}", &stream_update); | ||||||
|  |             let _ = tx.send(stream_update).await; | ||||||
|  |         } | ||||||
|  |     }); | ||||||
|  | 
 | ||||||
|  |     // send to brain
 | ||||||
|  |     let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1); | ||||||
|  |     let daemon_msg_tx_01 = daemon_msg_tx.clone(); | ||||||
|  |     let daemon_key_02 = daemon_key.clone(); | ||||||
|  |     daemon_join_set.spawn(async move { | ||||||
|  |         let rx_stream = ReceiverStream::new(rx); | ||||||
|  |         daemon_msg_tx_01 | ||||||
|  |             .send(vm_proto::VmDaemonMessage { | ||||||
|  |                 msg: Some(vm_proto::vm_daemon_message::Msg::Auth( | ||||||
|  |                     daemon_key_02.sign_stream_auth(vec![]).unwrap(), | ||||||
|  |                 )), | ||||||
|  |             }) | ||||||
|  |             .await | ||||||
|  |             .unwrap(); | ||||||
|  |         daemon_client.daemon_messages(rx_stream).await.unwrap(); | ||||||
|  |     }); | ||||||
|  | 
 | ||||||
|  |     // daemon engine
 | ||||||
|  |     daemon_join_set.spawn(async move { | ||||||
|  |         while let Some(brain_msg) = brain_msg_rx.recv().await { | ||||||
|  |             match brain_msg.msg { | ||||||
|  |                 Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { | ||||||
|  |                     let args = Some(vm_proto::MeasurementArgs { | ||||||
|  |                         dtrfs_api_endpoint: String::from("184.107.169.199:48865"), | ||||||
|  |                         exposed_ports: new_vm_req.extra_ports, | ||||||
|  |                         ovmf_hash: String::from( | ||||||
|  |                             "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", | ||||||
|  |                         ), | ||||||
|  |                         ips: vec![], | ||||||
|  |                     }); | ||||||
|  | 
 | ||||||
|  |                     let new_vm_resp = vm_proto::NewVmResp { | ||||||
|  |                         uuid: new_vm_req.uuid.clone(), | ||||||
|  |                         args, | ||||||
|  |                         error: String::new(), | ||||||
|  |                     }; | ||||||
|  | 
 | ||||||
|  |                     let res_data = vm_proto::VmDaemonMessage { | ||||||
|  |                         msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), | ||||||
|  |                     }; | ||||||
|  |                     daemon_msg_tx.send(res_data).await.unwrap(); | ||||||
|  |                 } | ||||||
|  |                 Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { | ||||||
|  |                     //
 | ||||||
|  |                 } | ||||||
|  |                 Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => { | ||||||
|  |                     //
 | ||||||
|  |                 } | ||||||
|  |                 None => todo!(), | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     }); | ||||||
|  | 
 | ||||||
|  |     tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||||||
|  | 
 | ||||||
|  |     let mut cli_client = BrainVmCliClient::new(channel); | ||||||
|  | 
 | ||||||
|  |     let cli_key = Key::new(); | ||||||
|  | 
 | ||||||
|  |     let req = vm_proto::NewVmReq { | ||||||
|  |         admin_pubkey: cli_key.pubkey.clone(), | ||||||
|  |         node_pubkey: daemon_key.pubkey.clone(), | ||||||
|  |         price_per_unit: 1200, | ||||||
|  |         extra_ports: vec![8080, 8081], | ||||||
|  |         locked_nano: 0, | ||||||
|  |         ..Default::default() | ||||||
|  |     }; | ||||||
|  |     let new_vm_resp = | ||||||
|  |         cli_client.new_vm(cli_key.sign_request(req).unwrap()).await.unwrap().into_inner(); | ||||||
|  | 
 | ||||||
|  |     assert!(new_vm_resp.error.is_empty()); | ||||||
|  |     assert!(new_vm_resp.uuid.len() == 40); | ||||||
|  | 
 | ||||||
|  |     let id = ("measurement_args", new_vm_resp.uuid); | ||||||
|  |     let data_in_db: detee_shared::vm_proto::MeasurementArgs = | ||||||
|  |         surreal_brain::db::DB.select(id).await.unwrap().unwrap(); | ||||||
|  | 
 | ||||||
|  |     assert_eq!(data_in_db, new_vm_resp.args.unwrap()); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| #[tokio::test] | #[tokio::test] | ||||||
| async fn test_general_balance() { | async fn test_general_balance() { | ||||||
|     // env_logger::builder().filter_level(log::LevelFilter::Trace).init();
 |     // env_logger::builder().filter_level(log::LevelFilter::Trace).init();
 | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user