refactor tests

modularizing report node into reusable method
improved error handling unwraping only on top level method
utils methods accepts refs to remove clone() on top level methods
This commit is contained in:
Noor 2025-05-07 02:57:29 +05:30 committed by ghe0
parent dcfaf298fc
commit ea50603a88
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
5 changed files with 121 additions and 114 deletions

@ -1,3 +1,4 @@
use anyhow::{anyhow, Result};
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
@ -15,9 +16,9 @@ use tokio::sync::OnceCell;
use tonic::transport::{Channel, Endpoint, Server, Uri};
use tower::service_fn;
pub static DB_STATE: OnceCell<()> = OnceCell::const_new();
pub static DB_STATE: OnceCell<Result<()>> = OnceCell::const_new();
pub async fn prepare_test_db() -> Surreal<Client> {
pub async fn prepare_test_db() -> Result<Surreal<Client>> {
dotenv().ok();
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
@ -26,24 +27,25 @@ pub async fn prepare_test_db() -> Surreal<Client> {
let db_ns = "test_brain";
let db_name = "test_migration_db";
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name)
.await
.unwrap();
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?;
DB_STATE
.get_or_init(|| async {
let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap();
db.query(format!("REMOVE DATABASE {db_name}")).await.unwrap();
db.query(std::fs::read_to_string("interim_tables.surql").unwrap()).await.unwrap();
surreal_brain::db::migration0(&db, &old_brain_data).await.unwrap();
let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk()
.map_err(|e| anyhow!(e.to_string()))?;
db.query(format!("REMOVE DATABASE {db_name}")).await?;
db.query(std::fs::read_to_string("interim_tables.surql")?).await?;
surreal_brain::db::migration0(&db, &old_brain_data).await?;
Ok::<(), anyhow::Error>(())
})
.await;
db
Ok(db)
}
pub async fn run_service_in_background() -> SocketAddr {
pub async fn run_service_in_background() -> Result<SocketAddr> {
dotenv().ok();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env");
@ -52,9 +54,8 @@ pub async fn run_service_in_background() -> SocketAddr {
let db_name = "test_migration_db";
tokio::spawn(async move {
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name)
.await
.unwrap();
let db =
surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?;
let db_arc = Arc::new(db);
Server::builder()
@ -62,13 +63,14 @@ pub async fn run_service_in_background() -> SocketAddr {
.add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())))
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.unwrap();
.await?;
Ok::<(), anyhow::Error>(())
});
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
addr
Ok(addr)
}
pub async fn run_service_for_stream_server() -> DuplexStream {
@ -82,9 +84,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream {
let db_name = "test_migration_db";
tokio::spawn(async move {
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name)
.await
.unwrap();
let db =
surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?;
let db_arc = Arc::new(db);
tonic::transport::Server::builder()
@ -92,24 +93,26 @@ pub async fn run_service_for_stream_server() -> DuplexStream {
.add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.await?;
Ok::<(), anyhow::Error>(())
});
client
}
pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Channel {
pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Result<Channel> {
let mut client = Some(c_stream);
Endpoint::from_static("http://127.0.0.1:0")
Ok(Endpoint::from_static("http://127.0.0.1:0")
.connect_with_connector(service_fn(move |_: Uri| {
let client = client.take().unwrap();
async move { Ok::<TokioIo<DuplexStream>, std::io::Error>(TokioIo::new(client)) }
}))
.await
.unwrap()
.await?)
}
pub async fn run_service_for_stream() -> Channel {
pub async fn run_service_for_stream() -> Result<Channel> {
let client = run_service_for_stream_server().await;
connect_stream_client_channel(client).await
}

@ -1,4 +1,8 @@
use super::test_utils::Key;
use anyhow::{anyhow, Result};
use detee_shared::common_proto::Empty;
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
use detee_shared::general_proto::ReportNodeReq;
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ};
@ -9,13 +13,13 @@ use tonic::transport::Channel;
pub async fn create_new_vm(
db: &Surreal<Client>,
key: Key,
node_pubkey: String,
brain_channel: Channel,
) -> String {
key: &Key,
node_pubkey: &str,
brain_channel: &Channel,
) -> Result<String> {
let new_vm_req = vm_proto::NewVmReq {
admin_pubkey: key.pubkey.clone(),
node_pubkey,
node_pubkey: node_pubkey.to_string(),
price_per_unit: 1200,
extra_ports: vec![8080, 8081],
locked_nano: 0,
@ -23,8 +27,7 @@ pub async fn create_new_vm(
};
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
let new_vm_resp =
client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.unwrap().into_inner();
let new_vm_resp = client_vm_cli.new_vm(key.sign_request(new_vm_req)?).await?.into_inner();
assert!(new_vm_resp.error.is_empty());
assert!(new_vm_resp.uuid.len() == 40);
@ -32,16 +35,34 @@ pub async fn create_new_vm(
// wait for update db
tokio::time::sleep(tokio::time::Duration::from_millis(700)).await;
let vm_req_db: Option<db::NewVmReq> =
db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap();
let vm_req_db: Option<db::NewVmReq> = db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await?;
if let Some(new_vm_req) = vm_req_db {
panic!("New VM request found in DB: {:?}", new_vm_req);
}
let active_vm_op: Option<db::ActiveVm> =
db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap();
let active_vm = active_vm_op.unwrap();
db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await?;
let active_vm = active_vm_op.ok_or(anyhow!("Not found active vm in db"))?;
active_vm.id.key().to_string()
Ok(active_vm.id.key().to_string())
}
pub async fn report_node(
key: &Key,
brain_channel: &Channel,
node_pubkey: &str,
contract: &str,
reason: &str,
) -> Result<tonic::Response<Empty>> {
let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone());
let report_req = ReportNodeReq {
admin_pubkey: key.pubkey.clone(),
node_pubkey: node_pubkey.to_string(),
contract: contract.to_string(),
reason: reason.to_string(),
};
Ok(client_gen_cli.report_node(key.sign_request(report_req)?).await?)
}

@ -1,4 +1,5 @@
use super::test_utils::Key;
use anyhow::Result;
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
use detee_shared::vm_proto::RegisterVmNodeReq;
@ -7,11 +8,11 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
pub async fn mock_vm_daemon(brain_channel: Channel) -> String {
let daemon_client = BrainVmDaemonClient::new(brain_channel);
pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result<String> {
let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
let daemon_key = Key::new();
register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await;
register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?;
let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
@ -29,20 +30,20 @@ pub async fn mock_vm_daemon(brain_channel: Channel) -> String {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
daemon_key.pubkey
Ok(daemon_key.pubkey)
}
pub async fn register_vm_node(
mut client: BrainVmDaemonClient<Channel>,
key: Key,
operator_wallet: String,
) -> Vec<vm_proto::VmContract> {
client: &mut BrainVmDaemonClient<Channel>,
key: &Key,
operator_wallet: &str,
) -> Result<Vec<vm_proto::VmContract>> {
log::info!("Registering vm_node: {}", key.pubkey);
let node_pubkey = key.pubkey.clone();
let req = RegisterVmNodeReq {
node_pubkey,
operator_wallet,
operator_wallet: operator_wallet.to_string(),
main_ip: String::from("185.243.218.213"),
city: String::from("Oslo"),
country: String::from("Norway"),
@ -50,8 +51,7 @@ pub async fn register_vm_node(
price: 1200,
};
let mut grpc_stream =
client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner();
let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner();
let mut vm_contracts = Vec::new();
while let Some(stream_update) = grpc_stream.next().await {
@ -64,22 +64,23 @@ pub async fn register_vm_node(
}
}
}
vm_contracts
Ok(vm_contracts)
}
pub async fn daemon_listener(
mut client: BrainVmDaemonClient<Channel>,
key: Key,
tx: mpsc::Sender<vm_proto::BrainVmMessage>,
) {
) -> Result<()> {
log::info!("listening vm_daemon");
let mut grpc_stream =
client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner();
let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner();
while let Some(Ok(stream_update)) = grpc_stream.next().await {
log::info!("vm deamon got notified: {:?}", &stream_update);
let _ = tx.send(stream_update).await;
}
Ok(())
}
pub async fn daemon_msg_sender(
@ -87,21 +88,21 @@ pub async fn daemon_msg_sender(
key: Key,
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
rx: mpsc::Receiver<vm_proto::VmDaemonMessage>,
) {
) -> Result<()> {
log::info!("sender vm_daemon");
let rx_stream = ReceiverStream::new(rx);
tx.send(vm_proto::VmDaemonMessage {
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())),
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)),
})
.await
.unwrap();
client.daemon_messages(rx_stream).await.unwrap();
.await?;
client.daemon_messages(rx_stream).await?;
Ok(())
}
pub async fn daemon_engine(
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
) {
) -> Result<()> {
log::info!("daemon engine vm_daemon");
while let Some(brain_msg) = rx.recv().await {
match brain_msg.msg {
@ -124,7 +125,7 @@ pub async fn daemon_engine(
let res_data = vm_proto::VmDaemonMessage {
msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)),
};
tx.send(res_data).await.unwrap();
tx.send(res_data).await?;
}
Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
todo!()
@ -135,4 +136,6 @@ pub async fn daemon_engine(
None => todo!(),
}
}
Ok(())
}

@ -2,11 +2,11 @@ use common::prepare_test_env::{
prepare_test_db, run_service_for_stream, run_service_in_background,
};
use common::test_utils::Key;
use common::vm_cli_utils::create_new_vm;
use common::vm_cli_utils::{create_new_vm, report_node};
use common::vm_daemon_utils::mock_vm_daemon;
use detee_shared::common_proto::{Empty, Pubkey};
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
use detee_shared::general_proto::{AirdropReq, ReportNodeReq};
use detee_shared::general_proto::AirdropReq;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use detee_shared::vm_proto::ListVmContractsReq;
use futures::StreamExt;
@ -20,9 +20,9 @@ mod common;
#[tokio::test]
async fn test_general_balance() {
// env_logger::builder().filter_level(log::LevelFilter::Trace).init();
let _ = prepare_test_db().await;
prepare_test_db().await.unwrap();
let addr = run_service_in_background().await;
let addr = run_service_in_background().await.unwrap();
let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap();
let key = Key::new();
@ -40,12 +40,12 @@ async fn test_general_balance() {
#[tokio::test]
async fn test_general_airdrop() {
// env_logger::builder().filter_level(log::LevelFilter::Trace).init();
let _ = prepare_test_db().await;
prepare_test_db().await.unwrap();
const AIRDROP_MULTIPLE: u64 = 1_000_000_000;
let airdrop_amount = 10;
let addr = run_service_in_background().await;
let addr = run_service_in_background().await.unwrap();
let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap();
let admin_keys = vec![Key::new(), Key::new(), Key::new()];
@ -115,53 +115,36 @@ async fn test_general_airdrop() {
#[tokio::test]
async fn test_vm_creation() {
let db = prepare_test_db().await;
let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await;
let daemon_key = mock_vm_daemon(brain_channel.clone()).await;
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
let key = Key::new();
let _ = create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await;
let _ = create_new_vm(&db, &key, &daemon_key, &brain_channel).await;
}
#[tokio::test]
async fn test_report_node() {
let db = prepare_test_db().await;
let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await;
let daemon_key = mock_vm_daemon(brain_channel.clone()).await;
let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone());
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
let key = Key::new();
let pubkey = key.pubkey.clone();
let report_req = ReportNodeReq {
admin_pubkey: pubkey.clone(),
node_pubkey: daemon_key.clone(),
contract: String::from("uuid"),
reason: String::from("reason"),
};
let report_error =
client_gen_cli.report_node(key.sign_request(report_req).unwrap()).await.err().unwrap();
report_node(&key, &brain_channel, &daemon_key, "uuid", "reason").await.err().unwrap();
println!("Report error: {:?}", report_error);
assert_eq!(report_error.message(), "No contract found by this ID.");
log::info!("Report error: {:?}", report_error);
assert!(report_error.to_string().contains("No contract found by this ID."));
let active_vm_id =
create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await;
let active_vm_id = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap();
let reason = String::from("something went wrong on vm");
let report_req = ReportNodeReq {
admin_pubkey: pubkey,
node_pubkey: daemon_key.clone(),
contract: active_vm_id,
reason: reason.clone(),
};
let _ = client_gen_cli
.report_node(key.sign_request(report_req).unwrap())
let _ = report_node(&key, &brain_channel, &daemon_key, &active_vm_id, &reason)
.await
.unwrap()
.into_inner();
@ -175,7 +158,7 @@ async fn test_report_node() {
.take(0)
.unwrap();
let vm_node_with_report = vm_nodes.get(0).unwrap();
let vm_node_with_report = &vm_nodes[0];
assert!(vm_node_with_report.reports[0].reason == reason);
}
@ -183,9 +166,9 @@ async fn test_report_node() {
#[tokio::test]
// TODO: register some operators before testing this
async fn test_list_operators() {
prepare_test_db().await;
prepare_test_db().await.unwrap();
let channel = run_service_for_stream().await;
let channel = run_service_for_stream().await.unwrap();
let mut client = BrainGeneralCliClient::new(channel);
@ -212,9 +195,9 @@ async fn test_list_operators() {
#[tokio::test]
// TODO: create vm for this user before testing this
async fn test_list_vm_contracts() {
prepare_test_db().await;
prepare_test_db().await.unwrap();
let channel = run_service_for_stream().await;
let channel = run_service_for_stream().await.unwrap();
let mut client = BrainVmCliClient::new(channel);
let key = Key::new();

@ -11,27 +11,24 @@ mod common;
#[tokio::test]
async fn test_reg_vm_node() {
prepare_test_db().await;
prepare_test_db().await.unwrap();
let addr = run_service_in_background().await;
let client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap();
let addr = run_service_in_background().await.unwrap();
let mut client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap();
let operator_wallet = Key::new().pubkey;
let key = Key::new();
let vm_contracts = register_vm_node(client, key, operator_wallet).await;
let vm_contracts =
register_vm_node(&mut client, &Key::new(), &Key::new().pubkey).await.unwrap();
assert!(vm_contracts.is_empty())
}
#[tokio::test]
async fn test_brain_message() {
env_logger::builder().filter_level(log::LevelFilter::Info).init();
let db = prepare_test_db().await;
env_logger::builder().filter_level(log::LevelFilter::Error).init();
let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await;
let daemon_key = mock_vm_daemon(brain_channel.clone()).await;
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
let mut cli_client = BrainVmCliClient::new(brain_channel);
let cli_key = Key::new();