Compare commits
1 Commits
815701184f
...
186146a597
Author | SHA1 | Date | |
---|---|---|---|
186146a597 |
1
.env
1
.env
@ -5,4 +5,3 @@ DB_NAMESPACE = "brain"
|
|||||||
DB_NAME = "migration"
|
DB_NAME = "migration"
|
||||||
CERT_PATH = "./tmp/brain-crt.pem"
|
CERT_PATH = "./tmp/brain-crt.pem"
|
||||||
CERT_KEY_PATH = "./tmp/brain-key.pem"
|
CERT_KEY_PATH = "./tmp/brain-key.pem"
|
||||||
# ADMIN_PUB_KEYS = "admin_key01, admin_key02, admin_key03"
|
|
||||||
|
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -1972,15 +1972,6 @@ dependencies = [
|
|||||||
"either",
|
"either",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "itertools"
|
|
||||||
version = "0.14.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
|
|
||||||
dependencies = [
|
|
||||||
"either",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itoa"
|
name = "itoa"
|
||||||
version = "1.0.15"
|
version = "1.0.15"
|
||||||
@ -3797,7 +3788,6 @@ dependencies = [
|
|||||||
"env_logger",
|
"env_logger",
|
||||||
"futures",
|
"futures",
|
||||||
"hyper-util",
|
"hyper-util",
|
||||||
"itertools 0.14.0",
|
|
||||||
"log",
|
"log",
|
||||||
"nanoid",
|
"nanoid",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
|
@ -35,6 +35,5 @@ anyhow = "1.0.98"
|
|||||||
bs58 = "0.5.1"
|
bs58 = "0.5.1"
|
||||||
ed25519-dalek = { version = "2.1.1", features = ["rand_core"] }
|
ed25519-dalek = { version = "2.1.1", features = ["rand_core"] }
|
||||||
hyper-util = "0.1.11"
|
hyper-util = "0.1.11"
|
||||||
itertools = "0.14.0"
|
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
tower = "0.5.2"
|
tower = "0.5.2"
|
||||||
|
@ -1,5 +1,3 @@
|
|||||||
use std::sync::LazyLock;
|
|
||||||
|
|
||||||
pub const BRAIN_GRPC_ADDR: &str = "0.0.0.0:31337";
|
pub const BRAIN_GRPC_ADDR: &str = "0.0.0.0:31337";
|
||||||
pub const CERT_PATH: &str = "/etc/detee/brain/brain-crt.pem";
|
pub const CERT_PATH: &str = "/etc/detee/brain/brain-crt.pem";
|
||||||
pub const CERT_KEY_PATH: &str = "/etc/detee/brain/brain-key.pem";
|
pub const CERT_KEY_PATH: &str = "/etc/detee/brain/brain-key.pem";
|
||||||
@ -7,19 +5,12 @@ pub const CONFIG_PATH: &str = "/etc/detee/brain/config.ini";
|
|||||||
|
|
||||||
pub const DB_SCHEMA_FILE: &str = "interim_tables.surql";
|
pub const DB_SCHEMA_FILE: &str = "interim_tables.surql";
|
||||||
|
|
||||||
pub static ADMIN_ACCOUNTS: LazyLock<Vec<String>> = LazyLock::new(|| {
|
pub const ADMIN_ACCOUNTS: &[&str] = &[
|
||||||
let default_admin_keys = vec![
|
"x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK",
|
||||||
"x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK".to_string(),
|
"FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL",
|
||||||
"FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL".to_string(),
|
"H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc",
|
||||||
"H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc".to_string(),
|
|
||||||
];
|
];
|
||||||
|
|
||||||
std::env::var("ADMIN_PUB_KEYS")
|
|
||||||
.ok()
|
|
||||||
.map(|keys| keys.split(',').map(|key| key.trim().to_string()).collect::<Vec<String>>())
|
|
||||||
.unwrap_or(default_admin_keys)
|
|
||||||
});
|
|
||||||
|
|
||||||
pub const OLD_BRAIN_DATA_PATH: &str = "./saved_data.yaml";
|
pub const OLD_BRAIN_DATA_PATH: &str = "./saved_data.yaml";
|
||||||
|
|
||||||
pub const ACCOUNT: &str = "account";
|
pub const ACCOUNT: &str = "account";
|
||||||
|
@ -30,18 +30,6 @@ impl Account {
|
|||||||
Ok(account)
|
Ok(account)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_or_create(db: &Surreal<Client>, address: &str) -> Result<Self, Error> {
|
|
||||||
let id = (ACCOUNT, address);
|
|
||||||
|
|
||||||
match db.select(id).await? {
|
|
||||||
Some(account) => Ok(account),
|
|
||||||
None => {
|
|
||||||
let account: Option<Self> = db.create(id).await?;
|
|
||||||
account.ok_or(Error::FailedToCreateDBEntry)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn airdrop(db: &Surreal<Client>, account: &str, tokens: u64) -> Result<(), Error> {
|
pub async fn airdrop(db: &Surreal<Client>, account: &str, tokens: u64) -> Result<(), Error> {
|
||||||
let tokens = tokens.saturating_mul(1_000_000_000);
|
let tokens = tokens.saturating_mul(1_000_000_000);
|
||||||
let _ = db
|
let _ = db
|
||||||
|
@ -22,8 +22,6 @@ pub enum Error {
|
|||||||
StdIo(#[from] std::io::Error),
|
StdIo(#[from] std::io::Error),
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
TimeOut(#[from] tokio::time::error::Elapsed),
|
TimeOut(#[from] tokio::time::error::Elapsed),
|
||||||
#[error("Failed to create account")]
|
|
||||||
FailedToCreateDBEntry,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod prelude {
|
pub mod prelude {
|
||||||
@ -31,6 +29,7 @@ pub mod prelude {
|
|||||||
pub use super::general::*;
|
pub use super::general::*;
|
||||||
pub use super::vm::*;
|
pub use super::vm::*;
|
||||||
pub use super::*;
|
pub use super::*;
|
||||||
|
pub use detee_shared::snp::pb::vm_proto::{MeasurementArgs, VmNodeFilters};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn db_connection(
|
pub async fn db_connection(
|
||||||
|
17
src/db/vm.rs
17
src/db/vm.rs
@ -5,9 +5,8 @@ use super::Error;
|
|||||||
use crate::constants::{
|
use crate::constants::{
|
||||||
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT,
|
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT,
|
||||||
};
|
};
|
||||||
use crate::db::{Account, Report};
|
use crate::db::{MeasurementArgs, Report, VmNodeFilters};
|
||||||
use crate::old_brain;
|
use crate::old_brain;
|
||||||
use detee_shared::vm_proto;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use surrealdb::engine::remote::ws::Client;
|
use surrealdb::engine::remote::ws::Client;
|
||||||
use surrealdb::sql::Datetime;
|
use surrealdb::sql::Datetime;
|
||||||
@ -53,7 +52,6 @@ impl VmNodeResources {
|
|||||||
|
|
||||||
impl VmNode {
|
impl VmNode {
|
||||||
pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> {
|
pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> {
|
||||||
Account::get_or_create(db, &self.operator.key().to_string()).await?;
|
|
||||||
let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?;
|
let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -84,7 +82,7 @@ impl VmNodeWithReports {
|
|||||||
// https://en.wikipedia.org/wiki/Dependency_inversion_principle
|
// https://en.wikipedia.org/wiki/Dependency_inversion_principle
|
||||||
pub async fn find_by_filters(
|
pub async fn find_by_filters(
|
||||||
db: &Surreal<Client>,
|
db: &Surreal<Client>,
|
||||||
filters: vm_proto::VmNodeFilters,
|
filters: VmNodeFilters,
|
||||||
) -> Result<Vec<Self>, Error> {
|
) -> Result<Vec<Self>, Error> {
|
||||||
let mut query = format!(
|
let mut query = format!(
|
||||||
"select *, <-report.* as reports from {VM_NODE} where
|
"select *, <-report.* as reports from {VM_NODE} where
|
||||||
@ -237,7 +235,7 @@ impl NewVmReq {
|
|||||||
|
|
||||||
/// first string is the vm_id
|
/// first string is the vm_id
|
||||||
pub enum WrappedMeasurement {
|
pub enum WrappedMeasurement {
|
||||||
Args(String, vm_proto::MeasurementArgs),
|
Args(String, MeasurementArgs),
|
||||||
Error(String, String),
|
Error(String, String),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -264,10 +262,9 @@ impl WrappedMeasurement {
|
|||||||
))
|
))
|
||||||
.await?;
|
.await?;
|
||||||
let mut error_stream = resp.stream::<Notification<ErrorMessage>>(0)?;
|
let mut error_stream = resp.stream::<Notification<ErrorMessage>>(0)?;
|
||||||
let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?;
|
let mut args_stream = resp.stream::<Notification<MeasurementArgs>>(1)?;
|
||||||
|
|
||||||
let args: Option<vm_proto::MeasurementArgs> =
|
let args: Option<MeasurementArgs> = db.delete(("measurement_args", vm_id)).await?;
|
||||||
db.delete(("measurement_args", vm_id)).await?;
|
|
||||||
if let Some(args) = args {
|
if let Some(args) = args {
|
||||||
return Ok(Self::Args(vm_id.to_string(), args));
|
return Ok(Self::Args(vm_id.to_string(), args));
|
||||||
}
|
}
|
||||||
@ -296,7 +293,7 @@ impl WrappedMeasurement {
|
|||||||
match args_notif {
|
match args_notif {
|
||||||
Ok(args_notif) => {
|
Ok(args_notif) => {
|
||||||
if args_notif.action == surrealdb::Action::Create {
|
if args_notif.action == surrealdb::Action::Create {
|
||||||
let _: Option<vm_proto::MeasurementArgs> = db.delete(("measurement_args", vm_id)).await?;
|
let _: Option<MeasurementArgs> = db.delete(("measurement_args", vm_id)).await?;
|
||||||
return Ok(Self::Args(vm_id.to_string(), args_notif.data));
|
return Ok(Self::Args(vm_id.to_string(), args_notif.data));
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
@ -359,7 +356,7 @@ impl ActiveVm {
|
|||||||
pub async fn activate(
|
pub async fn activate(
|
||||||
db: &Surreal<Client>,
|
db: &Surreal<Client>,
|
||||||
id: &str,
|
id: &str,
|
||||||
args: vm_proto::MeasurementArgs,
|
args: MeasurementArgs,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let new_vm_req = match NewVmReq::get(db, id).await? {
|
let new_vm_req = match NewVmReq::get(db, id).await? {
|
||||||
Some(r) => r,
|
Some(r) => r,
|
||||||
|
@ -166,8 +166,7 @@ pub fn check_admin_key<T>(req: &Request<T>) -> Result<(), Status> {
|
|||||||
};
|
};
|
||||||
let pubkey = pubkey
|
let pubkey = pubkey
|
||||||
.to_str()
|
.to_str()
|
||||||
.map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?
|
.map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?;
|
||||||
.to_owned();
|
|
||||||
|
|
||||||
if !ADMIN_ACCOUNTS.contains(&pubkey) {
|
if !ADMIN_ACCOUNTS.contains(&pubkey) {
|
||||||
return Err(Status::unauthenticated("This operation is reserved to admin accounts"));
|
return Err(Status::unauthenticated("This operation is reserved to admin accounts"));
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
use anyhow::{anyhow, Result};
|
|
||||||
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
|
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
|
||||||
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
|
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
|
||||||
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
|
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
|
||||||
@ -16,9 +15,9 @@ use tokio::sync::OnceCell;
|
|||||||
use tonic::transport::{Channel, Endpoint, Server, Uri};
|
use tonic::transport::{Channel, Endpoint, Server, Uri};
|
||||||
use tower::service_fn;
|
use tower::service_fn;
|
||||||
|
|
||||||
pub static DB_STATE: OnceCell<Result<()>> = OnceCell::const_new();
|
pub static DB_STATE: OnceCell<()> = OnceCell::const_new();
|
||||||
|
|
||||||
pub async fn prepare_test_db() -> Result<Surreal<Client>> {
|
pub async fn prepare_test_db() -> Surreal<Client> {
|
||||||
dotenv().ok();
|
dotenv().ok();
|
||||||
|
|
||||||
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
|
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
|
||||||
@ -27,25 +26,24 @@ pub async fn prepare_test_db() -> Result<Surreal<Client>> {
|
|||||||
let db_ns = "test_brain";
|
let db_ns = "test_brain";
|
||||||
let db_name = "test_migration_db";
|
let db_name = "test_migration_db";
|
||||||
|
|
||||||
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?;
|
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
DB_STATE
|
DB_STATE
|
||||||
.get_or_init(|| async {
|
.get_or_init(|| async {
|
||||||
let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk()
|
let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap();
|
||||||
.map_err(|e| anyhow!(e.to_string()))?;
|
db.query(format!("REMOVE DATABASE {db_name}")).await.unwrap();
|
||||||
|
db.query(std::fs::read_to_string("interim_tables.surql").unwrap()).await.unwrap();
|
||||||
db.query(format!("REMOVE DATABASE {db_name}")).await?;
|
surreal_brain::db::migration0(&db, &old_brain_data).await.unwrap();
|
||||||
db.query(std::fs::read_to_string("interim_tables.surql")?).await?;
|
|
||||||
surreal_brain::db::migration0(&db, &old_brain_data).await?;
|
|
||||||
Ok::<(), anyhow::Error>(())
|
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
Ok(db)
|
db
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_service_in_background() -> Result<SocketAddr> {
|
pub async fn run_service_in_background() -> SocketAddr {
|
||||||
dotenv().ok();
|
dotenv().ok();
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
let addr = listener.local_addr()?;
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
|
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
|
||||||
let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env");
|
let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env");
|
||||||
@ -54,8 +52,9 @@ pub async fn run_service_in_background() -> Result<SocketAddr> {
|
|||||||
let db_name = "test_migration_db";
|
let db_name = "test_migration_db";
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let db =
|
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name)
|
||||||
surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?;
|
.await
|
||||||
|
.unwrap();
|
||||||
let db_arc = Arc::new(db);
|
let db_arc = Arc::new(db);
|
||||||
|
|
||||||
Server::builder()
|
Server::builder()
|
||||||
@ -63,14 +62,13 @@ pub async fn run_service_in_background() -> Result<SocketAddr> {
|
|||||||
.add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone())))
|
.add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone())))
|
||||||
.add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())))
|
.add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())))
|
||||||
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
|
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
|
||||||
.await?;
|
.await
|
||||||
|
.unwrap();
|
||||||
Ok::<(), anyhow::Error>(())
|
|
||||||
});
|
});
|
||||||
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
|
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
|
||||||
|
|
||||||
Ok(addr)
|
addr
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_service_for_stream_server() -> DuplexStream {
|
pub async fn run_service_for_stream_server() -> DuplexStream {
|
||||||
@ -84,8 +82,9 @@ pub async fn run_service_for_stream_server() -> DuplexStream {
|
|||||||
let db_name = "test_migration_db";
|
let db_name = "test_migration_db";
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let db =
|
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name)
|
||||||
surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, db_ns, db_name).await?;
|
.await
|
||||||
|
.unwrap();
|
||||||
let db_arc = Arc::new(db);
|
let db_arc = Arc::new(db);
|
||||||
|
|
||||||
tonic::transport::Server::builder()
|
tonic::transport::Server::builder()
|
||||||
@ -93,26 +92,24 @@ pub async fn run_service_for_stream_server() -> DuplexStream {
|
|||||||
.add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone())))
|
.add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone())))
|
||||||
.add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())))
|
.add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())))
|
||||||
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
|
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
|
||||||
.await?;
|
.await
|
||||||
|
|
||||||
Ok::<(), anyhow::Error>(())
|
|
||||||
});
|
});
|
||||||
client
|
client
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Result<Channel> {
|
pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Channel {
|
||||||
let mut client = Some(c_stream);
|
let mut client = Some(c_stream);
|
||||||
|
|
||||||
Ok(Endpoint::from_static("http://127.0.0.1:0")
|
Endpoint::from_static("http://127.0.0.1:0")
|
||||||
.connect_with_connector(service_fn(move |_: Uri| {
|
.connect_with_connector(service_fn(move |_: Uri| {
|
||||||
let client = client.take().unwrap();
|
let client = client.take().unwrap();
|
||||||
async move { Ok::<TokioIo<DuplexStream>, std::io::Error>(TokioIo::new(client)) }
|
async move { Ok::<TokioIo<DuplexStream>, std::io::Error>(TokioIo::new(client)) }
|
||||||
}))
|
}))
|
||||||
.await?)
|
.await
|
||||||
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_service_for_stream() -> Result<Channel> {
|
pub async fn run_service_for_stream() -> Channel {
|
||||||
let client = run_service_for_stream_server().await;
|
let client = run_service_for_stream_server().await;
|
||||||
|
|
||||||
connect_stream_client_channel(client).await
|
connect_stream_client_channel(client).await
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,4 @@
|
|||||||
use super::test_utils::Key;
|
use super::test_utils::Key;
|
||||||
use anyhow::{anyhow, Result};
|
|
||||||
use detee_shared::common_proto::Empty;
|
|
||||||
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
|
|
||||||
use detee_shared::general_proto::{AirdropReq, ReportNodeReq};
|
|
||||||
use detee_shared::vm_proto;
|
use detee_shared::vm_proto;
|
||||||
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
|
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
|
||||||
use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ};
|
use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ};
|
||||||
@ -11,27 +7,15 @@ use surrealdb::engine::remote::ws::Client;
|
|||||||
use surrealdb::Surreal;
|
use surrealdb::Surreal;
|
||||||
use tonic::transport::Channel;
|
use tonic::transport::Channel;
|
||||||
|
|
||||||
async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> {
|
|
||||||
let mut client = BrainGeneralCliClient::new(brain_channel.clone());
|
|
||||||
let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount };
|
|
||||||
|
|
||||||
let admin_key = Key::new();
|
|
||||||
std::env::set_var("ADMIN_PUB_KEYS", &admin_key.pubkey);
|
|
||||||
|
|
||||||
client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn create_new_vm(
|
pub async fn create_new_vm(
|
||||||
db: &Surreal<Client>,
|
db: &Surreal<Client>,
|
||||||
key: &Key,
|
key: Key,
|
||||||
node_pubkey: &str,
|
node_pubkey: String,
|
||||||
brain_channel: &Channel,
|
brain_channel: Channel,
|
||||||
) -> Result<String> {
|
) -> String {
|
||||||
let new_vm_req = vm_proto::NewVmReq {
|
let new_vm_req = vm_proto::NewVmReq {
|
||||||
admin_pubkey: key.pubkey.clone(),
|
admin_pubkey: key.pubkey.clone(),
|
||||||
node_pubkey: node_pubkey.to_string(),
|
node_pubkey,
|
||||||
price_per_unit: 1200,
|
price_per_unit: 1200,
|
||||||
extra_ports: vec![8080, 8081],
|
extra_ports: vec![8080, 8081],
|
||||||
locked_nano: 0,
|
locked_nano: 0,
|
||||||
@ -39,7 +23,8 @@ pub async fn create_new_vm(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
|
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
|
||||||
let new_vm_resp = client_vm_cli.new_vm(key.sign_request(new_vm_req)?).await?.into_inner();
|
let new_vm_resp =
|
||||||
|
client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.unwrap().into_inner();
|
||||||
|
|
||||||
assert!(new_vm_resp.error.is_empty());
|
assert!(new_vm_resp.error.is_empty());
|
||||||
assert!(new_vm_resp.uuid.len() == 40);
|
assert!(new_vm_resp.uuid.len() == 40);
|
||||||
@ -47,34 +32,16 @@ pub async fn create_new_vm(
|
|||||||
// wait for update db
|
// wait for update db
|
||||||
tokio::time::sleep(tokio::time::Duration::from_millis(700)).await;
|
tokio::time::sleep(tokio::time::Duration::from_millis(700)).await;
|
||||||
|
|
||||||
let vm_req_db: Option<db::NewVmReq> = db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await?;
|
let vm_req_db: Option<db::NewVmReq> =
|
||||||
|
db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap();
|
||||||
|
|
||||||
if let Some(new_vm_req) = vm_req_db {
|
if let Some(new_vm_req) = vm_req_db {
|
||||||
panic!("New VM request found in DB: {:?}", new_vm_req);
|
panic!("New VM request found in DB: {:?}", new_vm_req);
|
||||||
}
|
}
|
||||||
|
|
||||||
let active_vm_op: Option<db::ActiveVm> =
|
let active_vm_op: Option<db::ActiveVm> =
|
||||||
db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await?;
|
db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap();
|
||||||
let active_vm = active_vm_op.ok_or(anyhow!("Not found active vm in db"))?;
|
let active_vm = active_vm_op.unwrap();
|
||||||
|
|
||||||
Ok(active_vm.id.key().to_string())
|
active_vm.id.key().to_string()
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn report_node(
|
|
||||||
key: &Key,
|
|
||||||
brain_channel: &Channel,
|
|
||||||
node_pubkey: &str,
|
|
||||||
contract: &str,
|
|
||||||
reason: &str,
|
|
||||||
) -> Result<tonic::Response<Empty>> {
|
|
||||||
let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone());
|
|
||||||
|
|
||||||
let report_req = ReportNodeReq {
|
|
||||||
admin_pubkey: key.pubkey.clone(),
|
|
||||||
node_pubkey: node_pubkey.to_string(),
|
|
||||||
contract: contract.to_string(),
|
|
||||||
reason: reason.to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(client_gen_cli.report_node(key.sign_request(report_req)?).await?)
|
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
use super::test_utils::Key;
|
use super::test_utils::Key;
|
||||||
use anyhow::Result;
|
|
||||||
use detee_shared::vm_proto;
|
use detee_shared::vm_proto;
|
||||||
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
|
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
|
||||||
use detee_shared::vm_proto::RegisterVmNodeReq;
|
use detee_shared::vm_proto::RegisterVmNodeReq;
|
||||||
@ -8,11 +7,11 @@ use tokio::sync::mpsc;
|
|||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
use tonic::transport::Channel;
|
use tonic::transport::Channel;
|
||||||
|
|
||||||
pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result<String> {
|
pub async fn mock_vm_daemon(brain_channel: Channel) -> String {
|
||||||
let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
|
let daemon_client = BrainVmDaemonClient::new(brain_channel);
|
||||||
let daemon_key = Key::new();
|
let daemon_key = Key::new();
|
||||||
|
|
||||||
register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?;
|
register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await;
|
||||||
|
|
||||||
let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
|
let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
|
||||||
|
|
||||||
@ -30,20 +29,20 @@ pub async fn mock_vm_daemon(brain_channel: &Channel) -> Result<String> {
|
|||||||
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||||
|
|
||||||
Ok(daemon_key.pubkey)
|
daemon_key.pubkey
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn register_vm_node(
|
pub async fn register_vm_node(
|
||||||
client: &mut BrainVmDaemonClient<Channel>,
|
mut client: BrainVmDaemonClient<Channel>,
|
||||||
key: &Key,
|
key: Key,
|
||||||
operator_wallet: &str,
|
operator_wallet: String,
|
||||||
) -> Result<Vec<vm_proto::VmContract>> {
|
) -> Vec<vm_proto::VmContract> {
|
||||||
log::info!("Registering vm_node: {}", key.pubkey);
|
log::info!("Registering vm_node: {}", key.pubkey);
|
||||||
let node_pubkey = key.pubkey.clone();
|
let node_pubkey = key.pubkey.clone();
|
||||||
|
|
||||||
let req = RegisterVmNodeReq {
|
let req = RegisterVmNodeReq {
|
||||||
node_pubkey,
|
node_pubkey,
|
||||||
operator_wallet: operator_wallet.to_string(),
|
operator_wallet,
|
||||||
main_ip: String::from("185.243.218.213"),
|
main_ip: String::from("185.243.218.213"),
|
||||||
city: String::from("Oslo"),
|
city: String::from("Oslo"),
|
||||||
country: String::from("Norway"),
|
country: String::from("Norway"),
|
||||||
@ -51,7 +50,8 @@ pub async fn register_vm_node(
|
|||||||
price: 1200,
|
price: 1200,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner();
|
let mut grpc_stream =
|
||||||
|
client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner();
|
||||||
|
|
||||||
let mut vm_contracts = Vec::new();
|
let mut vm_contracts = Vec::new();
|
||||||
while let Some(stream_update) = grpc_stream.next().await {
|
while let Some(stream_update) = grpc_stream.next().await {
|
||||||
@ -64,23 +64,22 @@ pub async fn register_vm_node(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(vm_contracts)
|
vm_contracts
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn daemon_listener(
|
pub async fn daemon_listener(
|
||||||
mut client: BrainVmDaemonClient<Channel>,
|
mut client: BrainVmDaemonClient<Channel>,
|
||||||
key: Key,
|
key: Key,
|
||||||
tx: mpsc::Sender<vm_proto::BrainVmMessage>,
|
tx: mpsc::Sender<vm_proto::BrainVmMessage>,
|
||||||
) -> Result<()> {
|
) {
|
||||||
log::info!("listening vm_daemon");
|
log::info!("listening vm_daemon");
|
||||||
let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner();
|
let mut grpc_stream =
|
||||||
|
client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner();
|
||||||
|
|
||||||
while let Some(Ok(stream_update)) = grpc_stream.next().await {
|
while let Some(Ok(stream_update)) = grpc_stream.next().await {
|
||||||
log::info!("vm deamon got notified: {:?}", &stream_update);
|
log::info!("vm deamon got notified: {:?}", &stream_update);
|
||||||
let _ = tx.send(stream_update).await;
|
let _ = tx.send(stream_update).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn daemon_msg_sender(
|
pub async fn daemon_msg_sender(
|
||||||
@ -88,29 +87,28 @@ pub async fn daemon_msg_sender(
|
|||||||
key: Key,
|
key: Key,
|
||||||
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
|
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
|
||||||
rx: mpsc::Receiver<vm_proto::VmDaemonMessage>,
|
rx: mpsc::Receiver<vm_proto::VmDaemonMessage>,
|
||||||
) -> Result<()> {
|
) {
|
||||||
log::info!("sender vm_daemon");
|
log::info!("sender vm_daemon");
|
||||||
let rx_stream = ReceiverStream::new(rx);
|
let rx_stream = ReceiverStream::new(rx);
|
||||||
tx.send(vm_proto::VmDaemonMessage {
|
tx.send(vm_proto::VmDaemonMessage {
|
||||||
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)),
|
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())),
|
||||||
})
|
})
|
||||||
.await?;
|
.await
|
||||||
client.daemon_messages(rx_stream).await?;
|
.unwrap();
|
||||||
Ok(())
|
client.daemon_messages(rx_stream).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn daemon_engine(
|
pub async fn daemon_engine(
|
||||||
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
|
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
|
||||||
mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
|
mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
|
||||||
) -> Result<()> {
|
) {
|
||||||
log::info!("daemon engine vm_daemon");
|
log::info!("daemon engine vm_daemon");
|
||||||
while let Some(brain_msg) = rx.recv().await {
|
while let Some(brain_msg) = rx.recv().await {
|
||||||
match brain_msg.msg {
|
match brain_msg.msg {
|
||||||
Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
|
Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
|
||||||
let exposed_ports = [vec![22], new_vm_req.extra_ports].concat();
|
|
||||||
let args = Some(vm_proto::MeasurementArgs {
|
let args = Some(vm_proto::MeasurementArgs {
|
||||||
dtrfs_api_endpoint: String::from("184.107.169.199:48865"),
|
dtrfs_api_endpoint: String::from("184.107.169.199:48865"),
|
||||||
exposed_ports,
|
exposed_ports: new_vm_req.extra_ports,
|
||||||
ovmf_hash: String::from(
|
ovmf_hash: String::from(
|
||||||
"0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76",
|
"0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76",
|
||||||
),
|
),
|
||||||
@ -126,7 +124,7 @@ pub async fn daemon_engine(
|
|||||||
let res_data = vm_proto::VmDaemonMessage {
|
let res_data = vm_proto::VmDaemonMessage {
|
||||||
msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)),
|
msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)),
|
||||||
};
|
};
|
||||||
tx.send(res_data).await?;
|
tx.send(res_data).await.unwrap();
|
||||||
}
|
}
|
||||||
Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
|
Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
|
||||||
todo!()
|
todo!()
|
||||||
@ -137,6 +135,4 @@ pub async fn daemon_engine(
|
|||||||
None => todo!(),
|
None => todo!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
@ -1,213 +0,0 @@
|
|||||||
use common::prepare_test_env::{
|
|
||||||
prepare_test_db, run_service_for_stream, run_service_in_background,
|
|
||||||
};
|
|
||||||
use common::test_utils::Key;
|
|
||||||
use common::vm_cli_utils::{create_new_vm, report_node};
|
|
||||||
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
|
|
||||||
use detee_shared::common_proto::{Empty, Pubkey};
|
|
||||||
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
|
|
||||||
use detee_shared::general_proto::AirdropReq;
|
|
||||||
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
|
|
||||||
use futures::StreamExt;
|
|
||||||
use itertools::Itertools;
|
|
||||||
use std::vec;
|
|
||||||
use surreal_brain::constants::VM_NODE;
|
|
||||||
use surreal_brain::db::vm::VmNodeWithReports;
|
|
||||||
|
|
||||||
mod common;
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_general_balance() {
|
|
||||||
// env_logger::builder().filter_level(log::LevelFilter::Trace).init();
|
|
||||||
prepare_test_db().await.unwrap();
|
|
||||||
|
|
||||||
let addr = run_service_in_background().await.unwrap();
|
|
||||||
let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap();
|
|
||||||
|
|
||||||
let key = Key::new();
|
|
||||||
let pubkey = key.pubkey.clone();
|
|
||||||
let req_data = Pubkey { pubkey };
|
|
||||||
|
|
||||||
let req = key.sign_request(req_data).unwrap();
|
|
||||||
|
|
||||||
let acc_bal = client.get_balance(req).await.unwrap().into_inner();
|
|
||||||
|
|
||||||
assert_eq!(acc_bal.balance, 0);
|
|
||||||
assert_eq!(acc_bal.tmp_locked, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_general_airdrop() {
|
|
||||||
// env_logger::builder().filter_level(log::LevelFilter::Trace).init();
|
|
||||||
prepare_test_db().await.unwrap();
|
|
||||||
|
|
||||||
const AIRDROP_MULTIPLE: u64 = 1_000_000_000;
|
|
||||||
let airdrop_amount = 10;
|
|
||||||
|
|
||||||
let addr = run_service_in_background().await.unwrap();
|
|
||||||
let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap();
|
|
||||||
|
|
||||||
let admin_keys = vec![Key::new(), Key::new(), Key::new()];
|
|
||||||
let admin_pub_keys = admin_keys.iter().map(|k| k.pubkey.clone()).join(", ");
|
|
||||||
std::env::set_var("ADMIN_PUB_KEYS", admin_pub_keys);
|
|
||||||
|
|
||||||
let user_01_key = Key::new();
|
|
||||||
let user_01_pubkey = user_01_key.pubkey.clone();
|
|
||||||
|
|
||||||
let airdrop_req = AirdropReq { pubkey: user_01_pubkey.clone(), tokens: airdrop_amount };
|
|
||||||
|
|
||||||
// user airdroping himself
|
|
||||||
let err =
|
|
||||||
client.airdrop(user_01_key.sign_request(airdrop_req.clone()).unwrap()).await.err().unwrap();
|
|
||||||
assert_eq!(err.message(), "This operation is reserved to admin accounts");
|
|
||||||
|
|
||||||
// other user airdroping
|
|
||||||
let err =
|
|
||||||
client.airdrop(Key::new().sign_request(airdrop_req.clone()).unwrap()).await.err().unwrap();
|
|
||||||
assert_eq!(err.message(), "This operation is reserved to admin accounts");
|
|
||||||
|
|
||||||
let _ = client.airdrop(admin_keys[0].sign_request(airdrop_req.clone()).unwrap()).await.unwrap();
|
|
||||||
|
|
||||||
let bal_req_data = Pubkey { pubkey: user_01_pubkey };
|
|
||||||
let bal_req = user_01_key.sign_request(bal_req_data.clone()).unwrap();
|
|
||||||
let acc_bal_user_01 = client.get_balance(bal_req).await.unwrap().into_inner();
|
|
||||||
|
|
||||||
assert_eq!(acc_bal_user_01.balance, airdrop_amount * AIRDROP_MULTIPLE);
|
|
||||||
assert_eq!(acc_bal_user_01.tmp_locked, 0);
|
|
||||||
|
|
||||||
// second airdrop from same admin
|
|
||||||
let _ = client.airdrop(admin_keys[0].sign_request(airdrop_req.clone()).unwrap()).await.unwrap();
|
|
||||||
|
|
||||||
let acc_bal_user_01 = client
|
|
||||||
.get_balance(user_01_key.sign_request(bal_req_data.clone()).unwrap())
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.into_inner();
|
|
||||||
|
|
||||||
assert_eq!(acc_bal_user_01.balance, 2 * airdrop_amount * AIRDROP_MULTIPLE);
|
|
||||||
|
|
||||||
// third airdrop from another admin
|
|
||||||
let _ = client.airdrop(admin_keys[1].sign_request(airdrop_req.clone()).unwrap()).await.unwrap();
|
|
||||||
|
|
||||||
let acc_bal_user_01 = client
|
|
||||||
.get_balance(user_01_key.sign_request(bal_req_data).unwrap())
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.into_inner();
|
|
||||||
|
|
||||||
assert_eq!(acc_bal_user_01.balance, 3 * airdrop_amount * AIRDROP_MULTIPLE);
|
|
||||||
|
|
||||||
// self airdrop
|
|
||||||
let airdrop_req = AirdropReq { pubkey: admin_keys[2].pubkey.clone(), tokens: airdrop_amount };
|
|
||||||
|
|
||||||
let _ = client.airdrop(admin_keys[2].sign_request(airdrop_req.clone()).unwrap()).await.unwrap();
|
|
||||||
|
|
||||||
let bal_req_data = Pubkey { pubkey: admin_keys[2].pubkey.clone() };
|
|
||||||
let acc_bal_admin_3 = client
|
|
||||||
.get_balance(admin_keys[2].sign_request(bal_req_data.clone()).unwrap())
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.into_inner();
|
|
||||||
|
|
||||||
assert_eq!(acc_bal_admin_3.balance, airdrop_amount * AIRDROP_MULTIPLE);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_report_node() {
|
|
||||||
let db = prepare_test_db().await.unwrap();
|
|
||||||
|
|
||||||
let brain_channel = run_service_for_stream().await.unwrap();
|
|
||||||
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
|
|
||||||
|
|
||||||
let key = Key::new();
|
|
||||||
|
|
||||||
let report_error =
|
|
||||||
report_node(&key, &brain_channel, &daemon_key, "uuid", "reason").await.err().unwrap();
|
|
||||||
|
|
||||||
log::info!("Report error: {:?}", report_error);
|
|
||||||
assert!(report_error.to_string().contains("No contract found by this ID."));
|
|
||||||
|
|
||||||
let active_vm_id = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap();
|
|
||||||
|
|
||||||
let reason = String::from("something went wrong on vm");
|
|
||||||
|
|
||||||
let _ = report_node(&key, &brain_channel, &daemon_key, &active_vm_id, &reason)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.into_inner();
|
|
||||||
|
|
||||||
let vm_nodes: Vec<VmNodeWithReports> = db
|
|
||||||
.query(format!(
|
|
||||||
"SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};"
|
|
||||||
))
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.take(0)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let vm_node_with_report = &vm_nodes[0];
|
|
||||||
|
|
||||||
assert!(vm_node_with_report.reports[0].reason == reason);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
// TODO: register some operators before testing this
|
|
||||||
async fn test_list_operators() {
|
|
||||||
prepare_test_db().await.unwrap();
|
|
||||||
|
|
||||||
let channel = run_service_for_stream().await.unwrap();
|
|
||||||
|
|
||||||
let mut client = BrainGeneralCliClient::new(channel);
|
|
||||||
|
|
||||||
let key = Key::new();
|
|
||||||
|
|
||||||
let mut grpc_stream =
|
|
||||||
client.list_operators(key.sign_request(Empty {}).unwrap()).await.unwrap().into_inner();
|
|
||||||
|
|
||||||
let mut operators = Vec::new();
|
|
||||||
while let Some(stream_update) = grpc_stream.next().await {
|
|
||||||
match stream_update {
|
|
||||||
Ok(op) => {
|
|
||||||
operators.push(op);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
panic!("Received error instead of operators: {e:?}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert!(!operators.is_empty())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_inspect_operator() {
|
|
||||||
prepare_test_db().await.unwrap();
|
|
||||||
|
|
||||||
let brain_channel = run_service_for_stream().await.unwrap();
|
|
||||||
let mut cli_client = BrainGeneralCliClient::new(brain_channel.clone());
|
|
||||||
let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
|
|
||||||
let key = Key::new();
|
|
||||||
let daemon_key = Key::new();
|
|
||||||
let operator_key = Key::new();
|
|
||||||
|
|
||||||
let err = cli_client
|
|
||||||
.inspect_operator(key.sign_request(Pubkey { pubkey: operator_key.pubkey.clone() }).unwrap())
|
|
||||||
.await
|
|
||||||
.err()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(err.message(), "The wallet you specified is not an operator");
|
|
||||||
|
|
||||||
// TODO: test with app node also
|
|
||||||
register_vm_node(&mut daemon_client, &daemon_key, &operator_key.pubkey).await.unwrap();
|
|
||||||
|
|
||||||
let inspect_response = cli_client
|
|
||||||
.inspect_operator(key.sign_request(Pubkey { pubkey: operator_key.pubkey.clone() }).unwrap())
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.into_inner();
|
|
||||||
|
|
||||||
assert!(inspect_response.app_nodes.is_empty());
|
|
||||||
assert!(!inspect_response.vm_nodes.is_empty());
|
|
||||||
assert_eq!(&inspect_response.vm_nodes[0].operator, &operator_key.pubkey);
|
|
||||||
}
|
|
166
tests/grpc_test.rs
Normal file
166
tests/grpc_test.rs
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
use common::prepare_test_env::{
|
||||||
|
prepare_test_db, run_service_for_stream, run_service_in_background,
|
||||||
|
};
|
||||||
|
use common::test_utils::Key;
|
||||||
|
use common::vm_cli_utils::create_new_vm;
|
||||||
|
use common::vm_daemon_utils::mock_vm_daemon;
|
||||||
|
use detee_shared::common_proto::{Empty, Pubkey};
|
||||||
|
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
|
||||||
|
use detee_shared::general_proto::ReportNodeReq;
|
||||||
|
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
|
||||||
|
use detee_shared::vm_proto::ListVmContractsReq;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use surreal_brain::constants::VM_NODE;
|
||||||
|
use surreal_brain::db::vm::VmNodeWithReports;
|
||||||
|
|
||||||
|
mod common;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_general_balance() {
|
||||||
|
// env_logger::builder().filter_level(log::LevelFilter::Trace).init();
|
||||||
|
let _ = prepare_test_db().await;
|
||||||
|
|
||||||
|
let addr = run_service_in_background().await;
|
||||||
|
let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap();
|
||||||
|
|
||||||
|
let key = Key::new();
|
||||||
|
let pubkey = key.pubkey.clone();
|
||||||
|
let req_data = Pubkey { pubkey };
|
||||||
|
|
||||||
|
let req = key.sign_request(req_data).unwrap();
|
||||||
|
|
||||||
|
let acc_bal = client.get_balance(req).await.unwrap().into_inner();
|
||||||
|
|
||||||
|
assert_eq!(acc_bal.balance, 0);
|
||||||
|
assert_eq!(acc_bal.tmp_locked, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_vm_creation() {
|
||||||
|
let db = prepare_test_db().await;
|
||||||
|
|
||||||
|
let brain_channel = run_service_for_stream().await;
|
||||||
|
let daemon_key = mock_vm_daemon(brain_channel.clone()).await;
|
||||||
|
|
||||||
|
let key = Key::new();
|
||||||
|
|
||||||
|
let _ = create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_report_node() {
|
||||||
|
let db = prepare_test_db().await;
|
||||||
|
|
||||||
|
let brain_channel = run_service_for_stream().await;
|
||||||
|
let daemon_key = mock_vm_daemon(brain_channel.clone()).await;
|
||||||
|
let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone());
|
||||||
|
|
||||||
|
let key = Key::new();
|
||||||
|
let pubkey = key.pubkey.clone();
|
||||||
|
|
||||||
|
let report_req = ReportNodeReq {
|
||||||
|
admin_pubkey: pubkey.clone(),
|
||||||
|
node_pubkey: daemon_key.clone(),
|
||||||
|
contract: String::from("uuid"),
|
||||||
|
reason: String::from("reason"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let report_error =
|
||||||
|
client_gen_cli.report_node(key.sign_request(report_req).unwrap()).await.err().unwrap();
|
||||||
|
|
||||||
|
println!("Report error: {:?}", report_error);
|
||||||
|
assert_eq!(report_error.message(), "No contract found by this ID.");
|
||||||
|
|
||||||
|
let active_vm_id =
|
||||||
|
create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await;
|
||||||
|
|
||||||
|
let reason = String::from("something went wrong on vm");
|
||||||
|
let report_req = ReportNodeReq {
|
||||||
|
admin_pubkey: pubkey,
|
||||||
|
node_pubkey: daemon_key.clone(),
|
||||||
|
contract: active_vm_id,
|
||||||
|
reason: reason.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = client_gen_cli
|
||||||
|
.report_node(key.sign_request(report_req).unwrap())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
let vm_nodes: Vec<VmNodeWithReports> = db
|
||||||
|
.query(format!(
|
||||||
|
"SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};"
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.take(0)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let vm_node_with_report = vm_nodes.get(0).unwrap();
|
||||||
|
|
||||||
|
assert!(vm_node_with_report.reports[0].reason == reason);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
// TODO: register some operators before testing this
|
||||||
|
async fn test_list_operators() {
|
||||||
|
prepare_test_db().await;
|
||||||
|
|
||||||
|
let channel = run_service_for_stream().await;
|
||||||
|
|
||||||
|
let mut client = BrainGeneralCliClient::new(channel);
|
||||||
|
|
||||||
|
let key = Key::new();
|
||||||
|
|
||||||
|
let mut grpc_stream =
|
||||||
|
client.list_operators(key.sign_request(Empty {}).unwrap()).await.unwrap().into_inner();
|
||||||
|
|
||||||
|
let mut operators = Vec::new();
|
||||||
|
while let Some(stream_update) = grpc_stream.next().await {
|
||||||
|
match stream_update {
|
||||||
|
Ok(op) => {
|
||||||
|
operators.push(op);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
panic!("Received error instead of operators: {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(!operators.is_empty())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
// TODO: create vm for this user before testing this
|
||||||
|
async fn test_list_vm_contracts() {
|
||||||
|
prepare_test_db().await;
|
||||||
|
|
||||||
|
let channel = run_service_for_stream().await;
|
||||||
|
let mut client = BrainVmCliClient::new(channel);
|
||||||
|
|
||||||
|
let key = Key::new();
|
||||||
|
let pubkey = key.pubkey.clone();
|
||||||
|
|
||||||
|
let req_data =
|
||||||
|
ListVmContractsReq { wallet: pubkey, uuid: String::from("uuid"), as_operator: false };
|
||||||
|
|
||||||
|
let mut grpc_stream =
|
||||||
|
client.list_vm_contracts(key.sign_request(req_data).unwrap()).await.unwrap().into_inner();
|
||||||
|
|
||||||
|
let mut vm_contracts = Vec::new();
|
||||||
|
while let Some(stream_update) = grpc_stream.next().await {
|
||||||
|
match stream_update {
|
||||||
|
Ok(vm_c) => {
|
||||||
|
vm_contracts.push(vm_c);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
panic!("Received error instead of vm_contracts: {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(vm_contracts.is_empty())
|
||||||
|
|
||||||
|
// verify report in db
|
||||||
|
}
|
@ -1,90 +0,0 @@
|
|||||||
use common::prepare_test_env::{prepare_test_db, run_service_for_stream};
|
|
||||||
use common::test_utils::Key;
|
|
||||||
use common::vm_cli_utils::create_new_vm;
|
|
||||||
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
|
|
||||||
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
|
|
||||||
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
|
|
||||||
use detee_shared::vm_proto::{ListVmContractsReq, NewVmReq};
|
|
||||||
use futures::StreamExt;
|
|
||||||
use std::vec;
|
|
||||||
|
|
||||||
mod common;
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_vm_creation() {
|
|
||||||
let db = prepare_test_db().await.unwrap();
|
|
||||||
// env_logger::builder().filter_level(log::LevelFilter::Error).init();
|
|
||||||
|
|
||||||
let brain_channel = run_service_for_stream().await.unwrap();
|
|
||||||
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
|
|
||||||
|
|
||||||
let key = Key::new();
|
|
||||||
|
|
||||||
let _ = create_new_vm(&db, &key, &daemon_key, &brain_channel).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_vm_creation_timeout() {
|
|
||||||
prepare_test_db().await.unwrap();
|
|
||||||
// env_logger::builder().filter_level(log::LevelFilter::Error).init();
|
|
||||||
|
|
||||||
let brain_channel = run_service_for_stream().await.unwrap();
|
|
||||||
let mut daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
|
|
||||||
let daemon_key = Key::new();
|
|
||||||
|
|
||||||
register_vm_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await.unwrap();
|
|
||||||
|
|
||||||
let key = Key::new();
|
|
||||||
|
|
||||||
let new_vm_req = NewVmReq {
|
|
||||||
admin_pubkey: key.pubkey.clone(),
|
|
||||||
node_pubkey: daemon_key.pubkey,
|
|
||||||
price_per_unit: 1200,
|
|
||||||
extra_ports: vec![8080, 8081],
|
|
||||||
locked_nano: 0,
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
|
|
||||||
let timeout_error =
|
|
||||||
client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.err().unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
timeout_error.message(),
|
|
||||||
"Network timeout. Please try again later or contact the DeTEE devs team.",
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
// TODO: create vm for this user before testing this
|
|
||||||
async fn test_list_vm_contracts() {
|
|
||||||
prepare_test_db().await.unwrap();
|
|
||||||
|
|
||||||
let channel = run_service_for_stream().await.unwrap();
|
|
||||||
let mut client = BrainVmCliClient::new(channel);
|
|
||||||
|
|
||||||
let key = Key::new();
|
|
||||||
let pubkey = key.pubkey.clone();
|
|
||||||
|
|
||||||
let req_data =
|
|
||||||
ListVmContractsReq { wallet: pubkey, uuid: String::from("uuid"), as_operator: false };
|
|
||||||
|
|
||||||
let mut grpc_stream =
|
|
||||||
client.list_vm_contracts(key.sign_request(req_data).unwrap()).await.unwrap().into_inner();
|
|
||||||
|
|
||||||
let mut vm_contracts = Vec::new();
|
|
||||||
while let Some(stream_update) = grpc_stream.next().await {
|
|
||||||
match stream_update {
|
|
||||||
Ok(vm_c) => {
|
|
||||||
vm_contracts.push(vm_c);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
panic!("Received error instead of vm_contracts: {e:?}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert!(vm_contracts.is_empty())
|
|
||||||
|
|
||||||
// verify report in db
|
|
||||||
}
|
|
@ -11,24 +11,27 @@ mod common;
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_reg_vm_node() {
|
async fn test_reg_vm_node() {
|
||||||
prepare_test_db().await.unwrap();
|
prepare_test_db().await;
|
||||||
|
|
||||||
let addr = run_service_in_background().await.unwrap();
|
let addr = run_service_in_background().await;
|
||||||
let mut client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap();
|
let client = BrainVmDaemonClient::connect(format!("http://{}", addr)).await.unwrap();
|
||||||
|
|
||||||
let vm_contracts =
|
let operator_wallet = Key::new().pubkey;
|
||||||
register_vm_node(&mut client, &Key::new(), &Key::new().pubkey).await.unwrap();
|
|
||||||
|
let key = Key::new();
|
||||||
|
|
||||||
|
let vm_contracts = register_vm_node(client, key, operator_wallet).await;
|
||||||
|
|
||||||
assert!(vm_contracts.is_empty())
|
assert!(vm_contracts.is_empty())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_brain_message() {
|
async fn test_brain_message() {
|
||||||
env_logger::builder().filter_level(log::LevelFilter::Error).init();
|
env_logger::builder().filter_level(log::LevelFilter::Info).init();
|
||||||
prepare_test_db().await.unwrap();
|
let db = prepare_test_db().await;
|
||||||
|
|
||||||
let brain_channel = run_service_for_stream().await.unwrap();
|
let brain_channel = run_service_for_stream().await;
|
||||||
let daemon_key = mock_vm_daemon(&brain_channel).await.unwrap();
|
let daemon_key = mock_vm_daemon(brain_channel.clone()).await;
|
||||||
let mut cli_client = BrainVmCliClient::new(brain_channel);
|
let mut cli_client = BrainVmCliClient::new(brain_channel);
|
||||||
|
|
||||||
let cli_key = Key::new();
|
let cli_key = Key::new();
|
||||||
@ -46,6 +49,9 @@ async fn test_brain_message() {
|
|||||||
|
|
||||||
assert!(new_vm_resp.error.is_empty());
|
assert!(new_vm_resp.error.is_empty());
|
||||||
assert!(new_vm_resp.uuid.len() == 40);
|
assert!(new_vm_resp.uuid.len() == 40);
|
||||||
assert!(new_vm_resp.args.is_some());
|
|
||||||
assert!(new_vm_resp.args.unwrap().exposed_ports.len() == 3);
|
let id = ("measurement_args", new_vm_resp.uuid);
|
||||||
|
let data_in_db: detee_shared::vm_proto::MeasurementArgs = db.select(id).await.unwrap().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(data_in_db, new_vm_resp.args.unwrap());
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user