Compare commits

..

2 Commits

Author SHA1 Message Date
815701184f
adding payments for VM contracts 2025-05-08 13:08:40 +03:00
af18e4ee77
fixes and tests
admin keys from env
test operator inspection
test vm creation timeout
extensive tests on airdrop
refactor db module imports
fix register vm_node creates operator account in db
modularised test into its module and reusable method
fix test brain message add ssh port on mock daemon while new vm
improved error handling on tests unwraping only on top level method
test utils methods accepts refs to remove clone() on top level methods
2025-05-08 15:12:58 +05:30
15 changed files with 469 additions and 260 deletions

1
.env

@ -5,3 +5,4 @@ DB_NAMESPACE = "brain"
DB_NAME = "migration" DB_NAME = "migration"
CERT_PATH = "./tmp/brain-crt.pem" CERT_PATH = "./tmp/brain-crt.pem"
CERT_KEY_PATH = "./tmp/brain-key.pem" CERT_KEY_PATH = "./tmp/brain-key.pem"
# ADMIN_PUB_KEYS = "admin_key01, admin_key02, admin_key03"

10
Cargo.lock generated

@ -1972,6 +1972,15 @@ dependencies = [
"either", "either",
] ]
[[package]]
name = "itertools"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
dependencies = [
"either",
]
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "1.0.15" version = "1.0.15"
@ -3788,6 +3797,7 @@ dependencies = [
"env_logger", "env_logger",
"futures", "futures",
"hyper-util", "hyper-util",
"itertools 0.14.0",
"log", "log",
"nanoid", "nanoid",
"rand 0.8.5", "rand 0.8.5",

@ -35,5 +35,6 @@ anyhow = "1.0.98"
bs58 = "0.5.1" bs58 = "0.5.1"
ed25519-dalek = { version = "2.1.1", features = ["rand_core"] } ed25519-dalek = { version = "2.1.1", features = ["rand_core"] }
hyper-util = "0.1.11" hyper-util = "0.1.11"
itertools = "0.14.0"
rand = "0.8" rand = "0.8"
tower = "0.5.2" tower = "0.5.2"

@ -1,3 +1,5 @@
use std::sync::LazyLock;
pub const BRAIN_GRPC_ADDR: &str = "0.0.0.0:31337"; pub const BRAIN_GRPC_ADDR: &str = "0.0.0.0:31337";
pub const CERT_PATH: &str = "/etc/detee/brain/brain-crt.pem"; pub const CERT_PATH: &str = "/etc/detee/brain/brain-crt.pem";
pub const CERT_KEY_PATH: &str = "/etc/detee/brain/brain-key.pem"; pub const CERT_KEY_PATH: &str = "/etc/detee/brain/brain-key.pem";
@ -5,12 +7,19 @@ pub const CONFIG_PATH: &str = "/etc/detee/brain/config.ini";
pub const DB_SCHEMA_FILE: &str = "interim_tables.surql"; pub const DB_SCHEMA_FILE: &str = "interim_tables.surql";
pub const ADMIN_ACCOUNTS: &[&str] = &[ pub static ADMIN_ACCOUNTS: LazyLock<Vec<String>> = LazyLock::new(|| {
"x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK", let default_admin_keys = vec![
"FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL", "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK".to_string(),
"H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc", "FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL".to_string(),
"H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc".to_string(),
]; ];
std::env::var("ADMIN_PUB_KEYS")
.ok()
.map(|keys| keys.split(',').map(|key| key.trim().to_string()).collect::<Vec<String>>())
.unwrap_or(default_admin_keys)
});
pub const OLD_BRAIN_DATA_PATH: &str = "./saved_data.yaml"; pub const OLD_BRAIN_DATA_PATH: &str = "./saved_data.yaml";
pub const ACCOUNT: &str = "account"; pub const ACCOUNT: &str = "account";

@ -30,6 +30,18 @@ impl Account {
Ok(account) Ok(account)
} }
pub async fn get_or_create(db: &Surreal<Client>, address: &str) -> Result<Self, Error> {
let id = (ACCOUNT, address);
match db.select(id).await? {
Some(account) => Ok(account),
None => {
let account: Option<Self> = db.create(id).await?;
account.ok_or(Error::FailedToCreateDBEntry)
}
}
}
pub async fn airdrop(db: &Surreal<Client>, account: &str, tokens: u64) -> Result<(), Error> { pub async fn airdrop(db: &Surreal<Client>, account: &str, tokens: u64) -> Result<(), Error> {
let tokens = tokens.saturating_mul(1_000_000_000); let tokens = tokens.saturating_mul(1_000_000_000);
let _ = db let _ = db

@ -22,6 +22,8 @@ pub enum Error {
StdIo(#[from] std::io::Error), StdIo(#[from] std::io::Error),
#[error(transparent)] #[error(transparent)]
TimeOut(#[from] tokio::time::error::Elapsed), TimeOut(#[from] tokio::time::error::Elapsed),
#[error("Failed to create account")]
FailedToCreateDBEntry,
} }
pub mod prelude { pub mod prelude {
@ -29,7 +31,6 @@ pub mod prelude {
pub use super::general::*; pub use super::general::*;
pub use super::vm::*; pub use super::vm::*;
pub use super::*; pub use super::*;
pub use detee_shared::snp::pb::vm_proto::{MeasurementArgs, VmNodeFilters};
} }
pub async fn db_connection( pub async fn db_connection(

@ -5,8 +5,9 @@ use super::Error;
use crate::constants::{ use crate::constants::{
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT, ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT,
}; };
use crate::db::{MeasurementArgs, Report, VmNodeFilters}; use crate::db::{Account, Report};
use crate::old_brain; use crate::old_brain;
use detee_shared::vm_proto;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::Client; use surrealdb::engine::remote::ws::Client;
use surrealdb::sql::Datetime; use surrealdb::sql::Datetime;
@ -52,6 +53,7 @@ impl VmNodeResources {
impl VmNode { impl VmNode {
pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> { pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> {
Account::get_or_create(db, &self.operator.key().to_string()).await?;
let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?; let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?;
Ok(()) Ok(())
} }
@ -82,7 +84,7 @@ impl VmNodeWithReports {
// https://en.wikipedia.org/wiki/Dependency_inversion_principle // https://en.wikipedia.org/wiki/Dependency_inversion_principle
pub async fn find_by_filters( pub async fn find_by_filters(
db: &Surreal<Client>, db: &Surreal<Client>,
filters: VmNodeFilters, filters: vm_proto::VmNodeFilters,
) -> Result<Vec<Self>, Error> { ) -> Result<Vec<Self>, Error> {
let mut query = format!( let mut query = format!(
"select *, <-report.* as reports from {VM_NODE} where "select *, <-report.* as reports from {VM_NODE} where
@ -235,7 +237,7 @@ impl NewVmReq {
/// first string is the vm_id /// first string is the vm_id
pub enum WrappedMeasurement { pub enum WrappedMeasurement {
Args(String, MeasurementArgs), Args(String, vm_proto::MeasurementArgs),
Error(String, String), Error(String, String),
} }
@ -262,9 +264,10 @@ impl WrappedMeasurement {
)) ))
.await?; .await?;
let mut error_stream = resp.stream::<Notification<ErrorMessage>>(0)?; let mut error_stream = resp.stream::<Notification<ErrorMessage>>(0)?;
let mut args_stream = resp.stream::<Notification<MeasurementArgs>>(1)?; let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?;
let args: Option<MeasurementArgs> = db.delete(("measurement_args", vm_id)).await?; let args: Option<vm_proto::MeasurementArgs> =
db.delete(("measurement_args", vm_id)).await?;
if let Some(args) = args { if let Some(args) = args {
return Ok(Self::Args(vm_id.to_string(), args)); return Ok(Self::Args(vm_id.to_string(), args));
} }
@ -293,7 +296,7 @@ impl WrappedMeasurement {
match args_notif { match args_notif {
Ok(args_notif) => { Ok(args_notif) => {
if args_notif.action == surrealdb::Action::Create { if args_notif.action == surrealdb::Action::Create {
let _: Option<MeasurementArgs> = db.delete(("measurement_args", vm_id)).await?; let _: Option<vm_proto::MeasurementArgs> = db.delete(("measurement_args", vm_id)).await?;
return Ok(Self::Args(vm_id.to_string(), args_notif.data)); return Ok(Self::Args(vm_id.to_string(), args_notif.data));
}; };
}, },
@ -356,7 +359,7 @@ impl ActiveVm {
pub async fn activate( pub async fn activate(
db: &Surreal<Client>, db: &Surreal<Client>,
id: &str, id: &str,
args: MeasurementArgs, args: vm_proto::MeasurementArgs,
) -> Result<(), Error> { ) -> Result<(), Error> {
let new_vm_req = match NewVmReq::get(db, id).await? { let new_vm_req = match NewVmReq::get(db, id).await? {
Some(r) => r, Some(r) => r,

@ -166,7 +166,8 @@ pub fn check_admin_key<T>(req: &Request<T>) -> Result<(), Status> {
}; };
let pubkey = pubkey let pubkey = pubkey
.to_str() .to_str()
.map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?; .map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?
.to_owned();
if !ADMIN_ACCOUNTS.contains(&pubkey) { if !ADMIN_ACCOUNTS.contains(&pubkey) {
return Err(Status::unauthenticated("This operation is reserved to admin accounts")); return Err(Status::unauthenticated("This operation is reserved to admin accounts"));

@ -1,3 +1,4 @@
use anyhow::{anyhow, Result};
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
@ -15,9 +16,9 @@ use tokio::sync::OnceCell;
use tonic::transport::{Channel, Endpoint, Server, Uri}; use tonic::transport::{Channel, Endpoint, Server, Uri};
use tower::service_fn; use tower::service_fn;
pub static DB_STATE: OnceCell<()> = OnceCell::const_new(); pub static DB_STATE: OnceCell<Result<()>> = OnceCell::const_new();
pub async fn prepare_test_db() -> Surreal<Client> { pub async fn prepare_test_db() -> Result<Surreal<Client>> {
dotenv().ok(); dotenv().ok();
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
@ -26,24 +27,25 @@ pub async fn prepare_test_db() -> Surreal<Client> {
let db_ns = "test_brain"; let db_ns = "test_brain";
let db_name = "test_migration_db"; let db_name = "test_migration_db";
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?;
.await
.unwrap();
DB_STATE DB_STATE
.get_or_init(|| async { .get_or_init(|| async {
let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap(); let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk()
db.query(format!("REMOVE DATABASE {db_name}")).await.unwrap(); .map_err(|e| anyhow!(e.to_string()))?;
db.query(std::fs::read_to_string("interim_tables.surql").unwrap()).await.unwrap();
surreal_brain::db::migration0(&db, &old_brain_data).await.unwrap(); db.query(format!("REMOVE DATABASE {db_name}")).await?;
db.query(std::fs::read_to_string("interim_tables.surql")?).await?;
surreal_brain::db::migration0(&db, &old_brain_data).await?;
Ok::<(), anyhow::Error>(())
}) })
.await; .await;
db Ok(db)
} }
pub async fn run_service_in_background() -> SocketAddr { pub async fn run_service_in_background() -> Result<SocketAddr> {
dotenv().ok(); dotenv().ok();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr()?;
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env"); let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env"); let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env");
@ -52,9 +54,8 @@ pub async fn run_service_in_background() -> SocketAddr {
let db_name = "test_migration_db"; let db_name = "test_migration_db";
tokio::spawn(async move { tokio::spawn(async move {
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) let db =
.await surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?;
.unwrap();
let db_arc = Arc::new(db); let db_arc = Arc::new(db);
Server::builder() Server::builder()
@ -62,13 +63,14 @@ pub async fn run_service_in_background() -> SocketAddr {
.add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())))
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await .await?;
.unwrap();
Ok::<(), anyhow::Error>(())
}); });
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
addr Ok(addr)
} }
pub async fn run_service_for_stream_server() -> DuplexStream { pub async fn run_service_for_stream_server() -> DuplexStream {
@ -82,9 +84,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream {
let db_name = "test_migration_db"; let db_name = "test_migration_db";
tokio::spawn(async move { tokio::spawn(async move {
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name) let db =
.await surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?;
.unwrap();
let db_arc = Arc::new(db); let db_arc = Arc::new(db);
tonic::transport::Server::builder() tonic::transport::Server::builder()
@ -92,24 +93,26 @@ pub async fn run_service_for_stream_server() -> DuplexStream {
.add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await .await?;
Ok::<(), anyhow::Error>(())
}); });
client client
} }
pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Channel { pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Result<Channel> {
let mut client = Some(c_stream); let mut client = Some(c_stream);
Endpoint::from_static("http://127.0.0.1:0") Ok(Endpoint::from_static("http://127.0.0.1:0")
.connect_with_connector(service_fn(move |_: Uri| { .connect_with_connector(service_fn(move |_: Uri| {
let client = client.take().unwrap(); let client = client.take().unwrap();
async move { Ok::<TokioIo<DuplexStream>, std::io::Error>(TokioIo::new(client)) } async move { Ok::<TokioIo<DuplexStream>, std::io::Error>(TokioIo::new(client)) }
})) }))
.await .await?)
.unwrap()
} }
pub async fn run_service_for_stream() -> Channel { pub async fn run_service_for_stream() -> Result<Channel> {
let client = run_service_for_stream_server().await; let client = run_service_for_stream_server().await;
connect_stream_client_channel(client).await connect_stream_client_channel(client).await
} }

@ -1,4 +1,8 @@
use super::test_utils::Key; use super::test_utils::Key;
use anyhow::{anyhow, Result};
use detee_shared::common_proto::Empty;
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
use detee_shared::general_proto::{AirdropReq, ReportNodeReq};
use detee_shared::vm_proto; use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ};
@ -7,15 +11,27 @@ use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal; use surrealdb::Surreal;
use tonic::transport::Channel; use tonic::transport::Channel;
async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> {
let mut client = BrainGeneralCliClient::new(brain_channel.clone());
let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount };
let admin_key = Key::new();
std::env::set_var("ADMIN_PUB_KEYS", &admin_key.pubkey);
client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?;
Ok(())
}
pub async fn create_new_vm( pub async fn create_new_vm(
db: &Surreal<Client>, db: &Surreal<Client>,
key: Key, key: &Key,
node_pubkey: String, node_pubkey: &str,
brain_channel: Channel, brain_channel: &Channel,
) -> String { ) -> Result<String> {
let new_vm_req = vm_proto::NewVmReq { let new_vm_req = vm_proto::NewVmReq {
admin_pubkey: key.pubkey.clone(), admin_pubkey: key.pubkey.clone(),
node_pubkey, node_pubkey: node_pubkey.to_string(),
price_per_unit: 1200, price_per_unit: 1200,
extra_ports: vec![8080, 8081], extra_ports: vec![8080, 8081],
locked_nano: 0, locked_nano: 0,
@ -23,8 +39,7 @@ pub async fn create_new_vm(
}; };
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
let new_vm_resp = let new_vm_resp = client_vm_cli.new_vm(key.sign_request(new_vm_req)?).await?.into_inner();
client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.unwrap().into_inner();
assert!(new_vm_resp.error.is_empty()); assert!(new_vm_resp.error.is_empty());
assert!(new_vm_resp.uuid.len() == 40); assert!(new_vm_resp.uuid.len() == 40);
@ -32,16 +47,34 @@ pub async fn create_new_vm(
// wait for update db // wait for update db
tokio::time::sleep(tokio::time::Duration::from_millis(700)).await; tokio::time::sleep(tokio::time::Duration::from_millis(700)).await;
let vm_req_db: Option<db::NewVmReq> = let vm_req_db: Option<db::NewVmReq> = db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await?;
db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap();
if let Some(new_vm_req) = vm_req_db { if let Some(new_vm_req) = vm_req_db {
panic!("New VM request found in DB: {:?}", new_vm_req); panic!("New VM request found in DB: {:?}", new_vm_req);
} }
let active_vm_op: Option<db::ActiveVm> = let active_vm_op: Option<db::ActiveVm> =
db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap(); db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await?;
let active_vm = active_vm_op.unwrap(); let active_vm = active_vm_op.ok_or(anyhow!("Not found active vm in db"))?;
active_vm.id.key().to_string() Ok(active_vm.id.key().to_string())
}
pub async fn report_node(
key: &Key,
brain_channel: &Channel,
node_pubkey: &str,
contract: &str,
reason: &str,
) -> Result<tonic::Response<Empty>> {
let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone());
let report_req = ReportNodeReq {
admin_pubkey: key.pubkey.clone(),
node_pubkey: node_pubkey.to_string(),
contract: contract.to_string(),
reason: reason.to_string(),
};
Ok(client_gen_cli.report_node(key.sign_request(report_req)?).await?)
} }

@ -1,4 +1,5 @@
use super::test_utils::Key; use super::test_utils::Key;
use anyhow::Result;
use detee_shared::vm_proto; use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
use detee_shared::vm_proto::RegisterVmNodeReq; use detee_shared::vm_proto::RegisterVmNodeReq;
@ -7,11 +8,11 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel; use tonic::transport::Channel;
pub async fn mock_vm_daemon(brain_channel: Channel) -> String { pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result<String> {
let daemon_client = BrainVmDaemonClient::new(brain_channel); let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
let daemon_key = Key::new(); let daemon_key = Key::new();
register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await; register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?;
let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1); let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
@ -29,20 +30,20 @@ pub async fn mock_vm_daemon(brain_channel: Channel) -> String {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
daemon_key.pubkey Ok(daemon_key.pubkey)
} }
pub async fn register_vm_node( pub async fn register_vm_node(
mut client: BrainVmDaemonClient<Channel>, client: &mut BrainVmDaemonClient<Channel>,
key: Key, key: &Key,
operator_wallet: String, operator_wallet: &str,
) -> Vec<vm_proto::VmContract> { ) -> Result<Vec<vm_proto::VmContract>> {
log::info!("Registering vm_node: {}", key.pubkey); log::info!("Registering vm_node: {}", key.pubkey);
let node_pubkey = key.pubkey.clone(); let node_pubkey = key.pubkey.clone();
let req = RegisterVmNodeReq { let req = RegisterVmNodeReq {
node_pubkey, node_pubkey,
operator_wallet, operator_wallet: operator_wallet.to_string(),
main_ip: String::from("185.243.218.213"), main_ip: String::from("185.243.218.213"),
city: String::from("Oslo"), city: String::from("Oslo"),
country: String::from("Norway"), country: String::from("Norway"),
@ -50,8 +51,7 @@ pub async fn register_vm_node(
price: 1200, price: 1200,
}; };
let mut grpc_stream = let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner();
client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner();
let mut vm_contracts = Vec::new(); let mut vm_contracts = Vec::new();
while let Some(stream_update) = grpc_stream.next().await { while let Some(stream_update) = grpc_stream.next().await {
@ -64,22 +64,23 @@ pub async fn register_vm_node(
} }
} }
} }
vm_contracts Ok(vm_contracts)
} }
pub async fn daemon_listener( pub async fn daemon_listener(
mut client: BrainVmDaemonClient<Channel>, mut client: BrainVmDaemonClient<Channel>,
key: Key, key: Key,
tx: mpsc::Sender<vm_proto::BrainVmMessage>, tx: mpsc::Sender<vm_proto::BrainVmMessage>,
) { ) -> Result<()> {
log::info!("listening vm_daemon"); log::info!("listening vm_daemon");
let mut grpc_stream = let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner();
client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner();
while let Some(Ok(stream_update)) = grpc_stream.next().await { while let Some(Ok(stream_update)) = grpc_stream.next().await {
log::info!("vm deamon got notified: {:?}", &stream_update); log::info!("vm deamon got notified: {:?}", &stream_update);
let _ = tx.send(stream_update).await; let _ = tx.send(stream_update).await;
} }
Ok(())
} }
pub async fn daemon_msg_sender( pub async fn daemon_msg_sender(
@ -87,28 +88,29 @@ pub async fn daemon_msg_sender(
key: Key, key: Key,
tx: mpsc::Sender<vm_proto::VmDaemonMessage>, tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
rx: mpsc::Receiver<vm_proto::VmDaemonMessage>, rx: mpsc::Receiver<vm_proto::VmDaemonMessage>,
) { ) -> Result<()> {
log::info!("sender vm_daemon"); log::info!("sender vm_daemon");
let rx_stream = ReceiverStream::new(rx); let rx_stream = ReceiverStream::new(rx);
tx.send(vm_proto::VmDaemonMessage { tx.send(vm_proto::VmDaemonMessage {
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)),
}) })
.await .await?;
.unwrap(); client.daemon_messages(rx_stream).await?;
client.daemon_messages(rx_stream).await.unwrap(); Ok(())
} }
pub async fn daemon_engine( pub async fn daemon_engine(
tx: mpsc::Sender<vm_proto::VmDaemonMessage>, tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>, mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
) { ) -> Result<()> {
log::info!("daemon engine vm_daemon"); log::info!("daemon engine vm_daemon");
while let Some(brain_msg) = rx.recv().await { while let Some(brain_msg) = rx.recv().await {
match brain_msg.msg { match brain_msg.msg {
Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
let exposed_ports = [vec![22], new_vm_req.extra_ports].concat();
let args = Some(vm_proto::MeasurementArgs { let args = Some(vm_proto::MeasurementArgs {
dtrfs_api_endpoint: String::from("184.107.169.199:48865"), dtrfs_api_endpoint: String::from("184.107.169.199:48865"),
exposed_ports: new_vm_req.extra_ports, exposed_ports,
ovmf_hash: String::from( ovmf_hash: String::from(
"0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76", "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76",
), ),
@ -124,7 +126,7 @@ pub async fn daemon_engine(
let res_data = vm_proto::VmDaemonMessage { let res_data = vm_proto::VmDaemonMessage {
msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)), msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)),
}; };
tx.send(res_data).await.unwrap(); tx.send(res_data).await?;
} }
Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => { Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
todo!() todo!()
@ -135,4 +137,6 @@ pub async fn daemon_engine(
None => todo!(), None => todo!(),
} }
} }
Ok(())
} }

213
tests/grpc_general_test.rs Normal file

@ -0,0 +1,213 @@
use common::prepare_test_env::{
prepare_test_db, run_service_for_stream, run_service_in_background,
};
use common::test_utils::Key;
use common::vm_cli_utils::{create_new_vm, report_node};
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
use detee_shared::common_proto::{Empty, Pubkey};
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
use detee_shared::general_proto::AirdropReq;
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
use futures::StreamExt;
use itertools::Itertools;
use std::vec;
use surreal_brain::constants::VM_NODE;
use surreal_brain::db::vm::VmNodeWithReports;
mod common;
#[tokio::test]
async fn test_general_balance() {
// env_logger::builder().filter_level(log::LevelFilter::Trace).init();
prepare_test_db().await.unwrap();
let addr = run_service_in_background().await.unwrap();
let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap();
let key = Key::new();
let pubkey = key.pubkey.clone();
let req_data = Pubkey { pubkey };
let req = key.sign_request(req_data).unwrap();
let acc_bal = client.get_balance(req).await.unwrap().into_inner();
assert_eq!(acc_bal.balance, 0);
assert_eq!(acc_bal.tmp_locked, 0);
}
#[tokio::test]
async fn test_general_airdrop() {
// env_logger::builder().filter_level(log::LevelFilter::Trace).init();
prepare_test_db().await.unwrap();
const AIRDROP_MULTIPLE: u64 = 1_000_000_000;
let airdrop_amount = 10;
let addr = run_service_in_background().await.unwrap();
let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap();
let admin_keys = vec![Key::new(), Key::new(), Key::new()];
let admin_pub_keys = admin_keys.iter().map(|k| k.pubkey.clone()).join(", ");
std::env::set_var("ADMIN_PUB_KEYS", admin_pub_keys);
let user_01_key = Key::new();
let user_01_pubkey = user_01_key.pubkey.clone();
let airdrop_req = AirdropReq { pubkey: user_01_pubkey.clone(), tokens: airdrop_amount };
// user airdroping himself
let err =
client.airdrop(user_01_key.sign_request(airdrop_req.clone()).unwrap()).await.err().unwrap();
assert_eq!(err.message(), "This operation is reserved to admin accounts");
// other user airdroping
let err =
client.airdrop(Key::new().sign_request(airdrop_req.clone()).unwrap()).await.err().unwrap();
assert_eq!(err.message(), "This operation is reserved to admin accounts");
let _ = client.airdrop(admin_keys[0].sign_request(airdrop_req.clone()).unwrap()).await.unwrap();
let bal_req_data = Pubkey { pubkey: user_01_pubkey };
let bal_req = user_01_key.sign_request(bal_req_data.clone()).unwrap();
let acc_bal_user_01 = client.get_balance(bal_req).await.unwrap().into_inner();
assert_eq!(acc_bal_user_01.balance, airdrop_amount * AIRDROP_MULTIPLE);
assert_eq!(acc_bal_user_01.tmp_locked, 0);
// second airdrop from same admin
let _ = client.airdrop(admin_keys[0].sign_request(airdrop_req.clone()).unwrap()).await.unwrap();
let acc_bal_user_01 = client
.get_balance(user_01_key.sign_request(bal_req_data.clone()).unwrap())
.await
.unwrap()
.into_inner();
assert_eq!(acc_bal_user_01.balance, 2 * airdrop_amount * AIRDROP_MULTIPLE);
// third airdrop from another admin
let _ = client.airdrop(admin_keys[1].sign_request(airdrop_req.clone()).unwrap()).await.unwrap();
let acc_bal_user_01 = client
.get_balance(user_01_key.sign_request(bal_req_data).unwrap())
.await
.unwrap()
.into_inner();
assert_eq!(acc_bal_user_01.balance, 3 * airdrop_amount * AIRDROP_MULTIPLE);
// self airdrop
let airdrop_req = AirdropReq { pubkey: admin_keys[2].pubkey.clone(), tokens: airdrop_amount };
let _ = client.airdrop(admin_keys[2].sign_request(airdrop_req.clone()).unwrap()).await.unwrap();
let bal_req_data = Pubkey { pubkey: admin_keys[2].pubkey.clone() };
let acc_bal_admin_3 = client
.get_balance(admin_keys[2].sign_request(bal_req_data.clone()).unwrap())
.await
.unwrap()
.into_inner();
assert_eq!(acc_bal_admin_3.balance, airdrop_amount * AIRDROP_MULTIPLE);
}
#[tokio::test]
async fn test_report_node() {
let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
let key = Key::new();
let report_error =
report_node(&key, &brain_channel, &daemon_key, "uuid", "reason").await.err().unwrap();
log::info!("Report error: {:?}", report_error);
assert!(report_error.to_string().contains("No contract found by this ID."));
let active_vm_id = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap();
let reason = String::from("something went wrong on vm");
let _ = report_node(&key, &brain_channel, &daemon_key, &active_vm_id, &reason)
.await
.unwrap()
.into_inner();
let vm_nodes: Vec<VmNodeWithReports> = db
.query(format!(
"SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};"
))
.await
.unwrap()
.take(0)
.unwrap();
let vm_node_with_report = &vm_nodes[0];
assert!(vm_node_with_report.reports[0].reason == reason);
}
#[tokio::test]
// TODO: register some operators before testing this
async fn test_list_operators() {
prepare_test_db().await.unwrap();
let channel = run_service_for_stream().await.unwrap();
let mut client = BrainGeneralCliClient::new(channel);
let key = Key::new();
let mut grpc_stream =
client.list_operators(key.sign_request(Empty {}).unwrap()).await.unwrap().into_inner();
let mut operators = Vec::new();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(op) => {
operators.push(op);
}
Err(e) => {
panic!("Received error instead of operators: {e:?}");
}
}
}
assert!(!operators.is_empty())
}
#[tokio::test]
async fn test_inspect_operator() {
prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let mut cli_client = BrainGeneralCliClient::new(brain_channel.clone());
let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
let key = Key::new();
let daemon_key = Key::new();
let operator_key = Key::new();
let err = cli_client
.inspect_operator(key.sign_request(Pubkey { pubkey: operator_key.pubkey.clone() }).unwrap())
.await
.err()
.unwrap();
assert_eq!(err.message(), "The wallet you specified is not an operator");
// TODO: test with app node also
register_vm_node(&mut daemon_client, &daemon_key, &operator_key.pubkey).await.unwrap();
let inspect_response = cli_client
.inspect_operator(key.sign_request(Pubkey { pubkey: operator_key.pubkey.clone() }).unwrap())
.await
.unwrap()
.into_inner();
assert!(inspect_response.app_nodes.is_empty());
assert!(!inspect_response.vm_nodes.is_empty());
assert_eq!(&inspect_response.vm_nodes[0].operator, &operator_key.pubkey);
}

@ -1,166 +0,0 @@
use common::prepare_test_env::{
prepare_test_db, run_service_for_stream, run_service_in_background,
};
use common::test_utils::Key;
use common::vm_cli_utils::create_new_vm;
use common::vm_daemon_utils::mock_vm_daemon;
use detee_shared::common_proto::{Empty, Pubkey};
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
use detee_shared::general_proto::ReportNodeReq;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use detee_shared::vm_proto::ListVmContractsReq;
use futures::StreamExt;
use surreal_brain::constants::VM_NODE;
use surreal_brain::db::vm::VmNodeWithReports;
mod common;
#[tokio::test]
async fn test_general_balance() {
// env_logger::builder().filter_level(log::LevelFilter::Trace).init();
let _ = prepare_test_db().await;
let addr = run_service_in_background().await;
let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap();
let key = Key::new();
let pubkey = key.pubkey.clone();
let req_data = Pubkey { pubkey };
let req = key.sign_request(req_data).unwrap();
let acc_bal = client.get_balance(req).await.unwrap().into_inner();
assert_eq!(acc_bal.balance, 0);
assert_eq!(acc_bal.tmp_locked, 0);
}
#[tokio::test]
async fn test_vm_creation() {
let db = prepare_test_db().await;
let brain_channel = run_service_for_stream().await;
let daemon_key = mock_vm_daemon(brain_channel.clone()).await;
let key = Key::new();
let _ = create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await;
}
#[tokio::test]
async fn test_report_node() {
let db = prepare_test_db().await;
let brain_channel = run_service_for_stream().await;
let daemon_key = mock_vm_daemon(brain_channel.clone()).await;
let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone());
let key = Key::new();
let pubkey = key.pubkey.clone();
let report_req = ReportNodeReq {
admin_pubkey: pubkey.clone(),
node_pubkey: daemon_key.clone(),
contract: String::from("uuid"),
reason: String::from("reason"),
};
let report_error =
client_gen_cli.report_node(key.sign_request(report_req).unwrap()).await.err().unwrap();
println!("Report error: {:?}", report_error);
assert_eq!(report_error.message(), "No contract found by this ID.");
let active_vm_id =
create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await;
let reason = String::from("something went wrong on vm");
let report_req = ReportNodeReq {
admin_pubkey: pubkey,
node_pubkey: daemon_key.clone(),
contract: active_vm_id,
reason: reason.clone(),
};
let _ = client_gen_cli
.report_node(key.sign_request(report_req).unwrap())
.await
.unwrap()
.into_inner();
let vm_nodes: Vec<VmNodeWithReports> = db
.query(format!(
"SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};"
))
.await
.unwrap()
.take(0)
.unwrap();
let vm_node_with_report = vm_nodes.get(0).unwrap();
assert!(vm_node_with_report.reports[0].reason == reason);
}
#[tokio::test]
// TODO: register some operators before testing this
async fn test_list_operators() {
prepare_test_db().await;
let channel = run_service_for_stream().await;
let mut client = BrainGeneralCliClient::new(channel);
let key = Key::new();
let mut grpc_stream =
client.list_operators(key.sign_request(Empty {}).unwrap()).await.unwrap().into_inner();
let mut operators = Vec::new();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(op) => {
operators.push(op);
}
Err(e) => {
panic!("Received error instead of operators: {e:?}");
}
}
}
assert!(!operators.is_empty())
}
#[tokio::test]
// TODO: create vm for this user before testing this
async fn test_list_vm_contracts() {
prepare_test_db().await;
let channel = run_service_for_stream().await;
let mut client = BrainVmCliClient::new(channel);
let key = Key::new();
let pubkey = key.pubkey.clone();
let req_data =
ListVmContractsReq { wallet: pubkey, uuid: String::from("uuid"), as_operator: false };
let mut grpc_stream =
client.list_vm_contracts(key.sign_request(req_data).unwrap()).await.unwrap().into_inner();
let mut vm_contracts = Vec::new();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(vm_c) => {
vm_contracts.push(vm_c);
}
Err(e) => {
panic!("Received error instead of vm_contracts: {e:?}");
}
}
}
assert!(vm_contracts.is_empty())
// verify report in db
}

90
tests/grpc_vm_cli_test.rs Normal file

@ -0,0 +1,90 @@
use common::prepare_test_env::{prepare_test_db, run_service_for_stream};
use common::test_utils::Key;
use common::vm_cli_utils::create_new_vm;
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
use detee_shared::vm_proto::{ListVmContractsReq, NewVmReq};
use futures::StreamExt;
use std::vec;
mod common;
#[tokio::test]
async fn test_vm_creation() {
let db = prepare_test_db().await.unwrap();
// env_logger::builder().filter_level(log::LevelFilter::Error).init();
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
let key = Key::new();
let _ = create_new_vm(&db, &key, &daemon_key, &brain_channel).await;
}
#[tokio::test]
async fn test_vm_creation_timeout() {
prepare_test_db().await.unwrap();
// env_logger::builder().filter_level(log::LevelFilter::Error).init();
let brain_channel = run_service_for_stream().await.unwrap();
let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
let daemon_key = Key::new();
register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await.unwrap();
let key = Key::new();
let new_vm_req = NewVmReq {
admin_pubkey: key.pubkey.clone(),
node_pubkey: daemon_key.pubkey,
price_per_unit: 1200,
extra_ports: vec![8080, 8081],
locked_nano: 0,
..Default::default()
};
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
let timeout_error =
client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.err().unwrap();
assert_eq!(
timeout_error.message(),
"Network timeout. Please try again later or contact the DeTEE devs team.",
)
}
#[tokio::test]
// TODO: create vm for this user before testing this
async fn test_list_vm_contracts() {
prepare_test_db().await.unwrap();
let channel = run_service_for_stream().await.unwrap();
let mut client = BrainVmCliClient::new(channel);
let key = Key::new();
let pubkey = key.pubkey.clone();
let req_data =
ListVmContractsReq { wallet: pubkey, uuid: String::from("uuid"), as_operator: false };
let mut grpc_stream =
client.list_vm_contracts(key.sign_request(req_data).unwrap()).await.unwrap().into_inner();
let mut vm_contracts = Vec::new();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(vm_c) => {
vm_contracts.push(vm_c);
}
Err(e) => {
panic!("Received error instead of vm_contracts: {e:?}");
}
}
}
assert!(vm_contracts.is_empty())
// verify report in db
}

@ -11,27 +11,24 @@ mod common;
#[tokio::test] #[tokio::test]
async fn test_reg_vm_node() { async fn test_reg_vm_node() {
prepare_test_db().await; prepare_test_db().await.unwrap();
let addr = run_service_in_background().await; let addr = run_service_in_background().await.unwrap();
let client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap(); let mut client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap();
let operator_wallet = Key::new().pubkey; let vm_contracts =
register_vm_node(&mut client, &Key::new(), &Key::new().pubkey).await.unwrap();
let key = Key::new();
let vm_contracts = register_vm_node(client, key, operator_wallet).await;
assert!(vm_contracts.is_empty()) assert!(vm_contracts.is_empty())
} }
#[tokio::test] #[tokio::test]
async fn test_brain_message() { async fn test_brain_message() {
env_logger::builder().filter_level(log::LevelFilter::Info).init(); env_logger::builder().filter_level(log::LevelFilter::Error).init();
let db = prepare_test_db().await; prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await; let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_vm_daemon(brain_channel.clone()).await; let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
let mut cli_client = BrainVmCliClient::new(brain_channel); let mut cli_client = BrainVmCliClient::new(brain_channel);
let cli_key = Key::new(); let cli_key = Key::new();
@ -49,9 +46,6 @@ async fn test_brain_message() {
assert!(new_vm_resp.error.is_empty()); assert!(new_vm_resp.error.is_empty());
assert!(new_vm_resp.uuid.len() == 40); assert!(new_vm_resp.uuid.len() == 40);
assert!(new_vm_resp.args.is_some());
let id = ("measurement_args", new_vm_resp.uuid); assert!(new_vm_resp.args.unwrap().exposed_ports.len() == 3);
let data_in_db: detee_shared::vm_proto::MeasurementArgs = db.select(id).await.unwrap().unwrap();
assert_eq!(data_in_db, new_vm_resp.args.unwrap());
} }