diff --git a/Cargo.lock b/Cargo.lock index 2b4e107..ffa6637 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3779,6 +3779,7 @@ dependencies = [ "ed25519-dalek", "env_logger", "futures", + "hyper-util", "log", "nanoid", "serde", @@ -3789,6 +3790,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tower 0.5.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 21d3efd..3e359ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,5 @@ codegen-units = 1 anyhow = "1.0.98" bs58 = "0.5.1" ed25519-dalek = "2.1.1" +hyper-util = "0.1.11" +tower = "0.5.2" diff --git a/tests/common/prepare_test_env.rs b/tests/common/prepare_test_env.rs index c4fa642..4533d01 100644 --- a/tests/common/prepare_test_env.rs +++ b/tests/common/prepare_test_env.rs @@ -1,54 +1,93 @@ -use detee_shared::{ - general_proto::brain_general_cli_server::BrainGeneralCliServer, - vm_proto::brain_vm_cli_server::BrainVmCliServer, -}; +use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; +use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; +use hyper_util::rt::TokioIo; +use std::net::SocketAddr; use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmCliForReal}; -use tokio::sync::OnceCell; -use tonic::transport::Channel; +use tokio::{net::TcpListener, sync::OnceCell}; +use tonic::transport::{Channel, Endpoint, Server, Uri}; +use tower::service_fn; pub const DB_URL: &str = "localhost:8000"; pub const DB_NS: &str = "test_brain"; pub const DB_NAME: &str = "test_migration_db"; -pub const GRPC_ADDR: &str = "127.0.0.1:31337"; - -pub static TEST_STATE: OnceCell = OnceCell::const_new(); +pub static DB_STATE: OnceCell<()> = OnceCell::const_new(); pub async fn prepare_test_db() { - surreal_brain::db::init(DB_URL, DB_NS, DB_NAME) - .await - .expect("Failed to initialize the database"); - - let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); - let _ = surreal_brain::db::DB.query(format!("REMOVE DATABASE {DB_NAME}")).await; - surreal_brain::db::migration0(&old_brain_data).await.unwrap(); -} - -pub async fn mock_grpc_server() { - tonic::transport::Server::builder() - .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal {})) - .add_service(BrainVmCliServer::new(BrainVmCliForReal {})) - .serve(GRPC_ADDR.parse().unwrap()) - .await - .unwrap(); -} - -pub async fn mock_grpc_client() -> Channel { - let url = format!("http://{GRPC_ADDR}"); - Channel::from_shared(url).unwrap().connect().await.unwrap() -} - -pub async fn prepare_test_setup() { - TEST_STATE + DB_STATE .get_or_init(|| async { - prepare_test_db().await; + surreal_brain::db::init(DB_URL, DB_NS, DB_NAME) + .await + .expect("Failed to initialize the database"); - tokio::spawn(async { - mock_grpc_server().await; - }); - - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - mock_grpc_client().await + let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); + surreal_brain::db::DB.query(format!("REMOVE DATABASE {DB_NAME}")).await.unwrap(); + surreal_brain::db::DB + .query(std::fs::read_to_string("interim_tables.surql").unwrap()) + .await + .unwrap(); + surreal_brain::db::migration0(&old_brain_data).await.unwrap(); }) .await; } + +pub async fn _reset_test_db() { + let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); + surreal_brain::db::DB.query(format!("REMOVE DATABASE {DB_NAME}")).await.unwrap(); + surreal_brain::db::DB + .query(std::fs::read_to_string("interim_tables.surql").unwrap()) + .await + .unwrap(); + surreal_brain::db::migration0(&old_brain_data).await.unwrap(); +} + +pub async fn run_service_in_background() -> SocketAddr { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + Server::builder() + .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal {})) + .add_service(BrainVmCliServer::new(BrainVmCliForReal {})) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) + .await + .unwrap(); + }); + + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + + addr +} + +pub async fn run_service_for_stream() -> Channel { + let (client, server) = tokio::io::duplex(1024); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let url = format!("http://{}", addr); + + tokio::spawn(async move { + tonic::transport::Server::builder() + .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal {})) + .add_service(BrainVmCliServer::new(BrainVmCliForReal {})) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) + .await + }); + + let mut client = Some(client); + Endpoint::try_from(url) + .unwrap() + .connect_with_connector(service_fn(move |_: Uri| { + let client = client.take(); + + async move { + if let Some(client) = client { + Ok(TokioIo::new(client)) + } else { + Err(std::io::Error::new(std::io::ErrorKind::Other, "Client already taken")) + } + } + })) + .await + .unwrap() +} diff --git a/tests/grpcs_test.rs b/tests/grpcs_test.rs index 3990b6a..14f4223 100644 --- a/tests/grpcs_test.rs +++ b/tests/grpcs_test.rs @@ -6,26 +6,26 @@ use detee_shared::{ common_proto::Pubkey, general_proto::brain_general_cli_client::BrainGeneralCliClient, }; mod common; -use common::prepare_test_env::{prepare_test_setup, TEST_STATE}; +use common::prepare_test_env::{ + prepare_test_db, run_service_for_stream, run_service_in_background, +}; use common::test_utils::{get_pub_key, sign_request}; -use tokio_stream::StreamExt; +use futures::StreamExt; #[tokio::test] async fn test_general_balance() { - prepare_test_setup().await; - let grpc_channel = TEST_STATE.get().unwrap().clone(); + // env_logger::builder().filter_level(log::LevelFilter::Trace).init(); + let _ = prepare_test_db().await; - let mut brain_general_cli_client = BrainGeneralCliClient::new(grpc_channel.clone()); + let addr = run_service_in_background().await; + let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); - let req_data = Pubkey { pubkey: get_pub_key().unwrap() }; + let pubkey = get_pub_key().unwrap(); + let req_data = Pubkey { pubkey: pubkey.clone() }; - let acc_bal = brain_general_cli_client - .get_balance(sign_request(req_data).unwrap()) - .await - .unwrap() - .into_inner(); + let req = sign_request(req_data).unwrap(); - // verify it in db also + let acc_bal = client.get_balance(req).await.unwrap().into_inner(); assert_eq!(acc_bal.balance, 0); assert_eq!(acc_bal.tmp_locked, 0); @@ -33,10 +33,10 @@ async fn test_general_balance() { #[tokio::test] async fn test_report_node() { - prepare_test_setup().await; - let grpc_channel = TEST_STATE.get().unwrap().clone(); + let _ = prepare_test_db().await; - let mut brain_general_cli_client = BrainGeneralCliClient::new(grpc_channel.clone()); + let addr = run_service_in_background().await; + let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap(); // TODO: create contract, node and operator in db and use it here let req_data = ReportNodeReq { @@ -46,8 +46,7 @@ async fn test_report_node() { reason: String::from("reason"), }; - let report_error = - brain_general_cli_client.report_node(sign_request(req_data).unwrap()).await.err().unwrap(); + let report_error = client.report_node(sign_request(req_data).unwrap()).await.err().unwrap(); assert_eq!(report_error.message(), "No contract found by this ID."); @@ -57,16 +56,14 @@ async fn test_report_node() { #[tokio::test] // TODO: register some operators before testing this async fn test_list_operators() { - prepare_test_setup().await; - let grpc_channel = TEST_STATE.get().unwrap().clone(); + let _ = prepare_test_db().await; - let mut brain_general_cli_client = BrainGeneralCliClient::new(grpc_channel.clone()); + let channel = run_service_for_stream().await; - let mut grpc_stream = brain_general_cli_client - .list_operators(sign_request(Empty {}).unwrap()) - .await - .unwrap() - .into_inner(); + let mut client = BrainGeneralCliClient::new(channel); + + let mut grpc_stream = + client.list_operators(sign_request(Empty {}).unwrap()).await.unwrap().into_inner(); let mut operators = Vec::new(); while let Some(stream_update) = grpc_stream.next().await { @@ -81,17 +78,15 @@ async fn test_list_operators() { } assert!(!operators.is_empty()) - - // verify report in db } #[tokio::test] // TODO: create vm for this user before testing this async fn test_list_vm_contracts() { - prepare_test_setup().await; - let grpc_channel = TEST_STATE.get().unwrap().clone(); + let _ = prepare_test_db().await; - let mut brain_general_cli_client = BrainVmCliClient::new(grpc_channel.clone()); + let channel = run_service_for_stream().await; + let mut client = BrainVmCliClient::new(channel); let req_data = ListVmContractsReq { wallet: get_pub_key().unwrap(), @@ -99,11 +94,8 @@ async fn test_list_vm_contracts() { as_operator: false, }; - let mut grpc_stream = brain_general_cli_client - .list_vm_contracts(sign_request(req_data).unwrap()) - .await - .unwrap() - .into_inner(); + let mut grpc_stream = + client.list_vm_contracts(sign_request(req_data).unwrap()).await.unwrap().into_inner(); let mut vm_contracts = Vec::new(); while let Some(stream_update) = grpc_stream.next().await {