Compare commits
No commits in common. "8fa8a648621d1e012ef58e4a75bce129d5880e9f" and "32f6548eff8a1d7e2caa758444b70824677ecca8" have entirely different histories.
8fa8a64862
...
32f6548eff
5
.env
5
.env
@ -1,5 +0,0 @@
|
||||
DB_URL = "localhost:8000"
|
||||
DB_USER = "root"
|
||||
DB_PASS = "root"
|
||||
DB_NAMESPACE = "brain"
|
||||
DB_NAME = "migration"
|
7
Cargo.lock
generated
7
Cargo.lock
generated
@ -1078,12 +1078,6 @@ version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
|
||||
|
||||
[[package]]
|
||||
name = "dotenv"
|
||||
version = "0.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
|
||||
|
||||
[[package]]
|
||||
name = "earcutr"
|
||||
version = "0.4.3"
|
||||
@ -3783,7 +3777,6 @@ dependencies = [
|
||||
"chrono",
|
||||
"dashmap 6.1.0",
|
||||
"detee-shared",
|
||||
"dotenv",
|
||||
"ed25519-dalek",
|
||||
"env_logger",
|
||||
"futures",
|
||||
|
@ -21,7 +21,6 @@ log = "0.4.27"
|
||||
env_logger = "0.11.8"
|
||||
thiserror = "2.0.12"
|
||||
nanoid = "0.4.0"
|
||||
dotenv = "0.15.0"
|
||||
|
||||
[profile.release]
|
||||
lto = true
|
||||
|
@ -1 +0,0 @@
|
||||
# Brain Miration to SurrealDB
|
@ -131,4 +131,3 @@ DEFINE FIELD contract ON TABLE kick TYPE record<deleted_vm|deleted_app>;
|
||||
DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node;
|
||||
DEFINE FIELD created_at ON TABLE report TYPE datetime;
|
||||
DEFINE FIELD reason ON TABLE report TYPE string;
|
||||
DEFINE FIELD contract_id ON TABLE report TYPE string;
|
||||
|
@ -1,10 +1,9 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
|
||||
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
|
||||
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
|
||||
use dotenv::dotenv;
|
||||
use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH};
|
||||
use surreal_brain::constants::{
|
||||
BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH, DB_ADDRESS, DB_NAME, DB_NS,
|
||||
};
|
||||
use surreal_brain::db;
|
||||
use surreal_brain::grpc::BrainVmCliForReal;
|
||||
use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmDaemonForReal};
|
||||
@ -12,24 +11,13 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
dotenv().ok();
|
||||
env_logger::builder().filter_level(log::LevelFilter::Debug).init();
|
||||
|
||||
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
|
||||
let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env");
|
||||
let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env");
|
||||
let db_ns = std::env::var("DB_NAMESPACE").expect("DB_NAMESPACE not set in .env");
|
||||
let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env");
|
||||
|
||||
let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap();
|
||||
let db_arc = Arc::new(db);
|
||||
|
||||
db::init(DB_ADDRESS, DB_NS, DB_NAME).await.unwrap();
|
||||
let addr = BRAIN_GRPC_ADDR.parse().unwrap();
|
||||
|
||||
let snp_daemon_server = BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone()));
|
||||
let snp_cli_server = BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone()));
|
||||
let general_service_server =
|
||||
BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone()));
|
||||
let snp_daemon_server = BrainVmDaemonServer::new(BrainVmDaemonForReal {});
|
||||
let snp_cli_server = BrainVmCliServer::new(BrainVmCliForReal {});
|
||||
let general_service_server = BrainGeneralCliServer::new(BrainGeneralCliForReal {});
|
||||
|
||||
let cert = std::fs::read_to_string(CERT_PATH).unwrap();
|
||||
let key = std::fs::read_to_string(CERT_KEY_PATH).unwrap();
|
||||
|
@ -1,24 +1,20 @@
|
||||
// After deleting this migration, also delete old_brain structs
|
||||
// and dangling impls from the model
|
||||
use dotenv::dotenv;
|
||||
use std::error::Error;
|
||||
use surreal_brain::constants::{DB_ADDRESS, DB_NAME, DB_NS};
|
||||
use surreal_brain::db::init;
|
||||
use surreal_brain::{db, old_brain};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
dotenv().ok();
|
||||
let old_brain_data = old_brain::BrainData::load_from_disk()?;
|
||||
// println!("{}", serde_yaml::to_string(&old_brain_data)?);
|
||||
|
||||
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
|
||||
let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env");
|
||||
let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env");
|
||||
let db_ns = std::env::var("DB_NAMESPACE").expect("DB_NAMESPACE not set in .env");
|
||||
let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env");
|
||||
init(DB_ADDRESS, DB_NS, DB_NAME).await?;
|
||||
|
||||
let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap();
|
||||
let result = db::migration0(&old_brain_data).await?;
|
||||
|
||||
db::migration0(&db, &old_brain_data).await?;
|
||||
println!("{result:?}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -2,7 +2,13 @@ pub const BRAIN_GRPC_ADDR: &str = "0.0.0.0:31337";
|
||||
pub const CERT_PATH: &str = "./tmp/brain-crt.pem";
|
||||
pub const CERT_KEY_PATH: &str = "./tmp/brain-key.pem";
|
||||
|
||||
pub const DB_SCHEMA_FILE: &str = "interim_tables.surql";
|
||||
pub const DB_ADDRESS: &str = "localhost:8000";
|
||||
pub const DB_NS: &str = "brain";
|
||||
pub const DB_NAME: &str = "migration";
|
||||
|
||||
// TODO: read from .env
|
||||
pub const DB_USER: &str = "root";
|
||||
pub const DB_PASS: &str = "root";
|
||||
|
||||
pub const ADMIN_ACCOUNTS: &[&str] = &[
|
||||
"x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK",
|
||||
@ -20,8 +26,6 @@ pub const UPDATE_VM_REQ: &str = "update_vm_req";
|
||||
pub const DELETED_VM: &str = "deleted_vm";
|
||||
pub const VM_CONTRACT: &str = "vm_contract";
|
||||
|
||||
pub const ACTIVE_APP: &str = "active_app";
|
||||
|
||||
pub const ID_ALPHABET: [char; 62] = [
|
||||
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
|
||||
'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B',
|
||||
|
274
src/db.rs
274
src/db.rs
@ -1,12 +1,11 @@
|
||||
use std::{str::FromStr, time::Duration};
|
||||
|
||||
pub use crate::constants::{
|
||||
ACCOUNT, ACTIVE_APP, ACTIVE_VM, DB_SCHEMA_FILE, DELETED_VM, ID_ALPHABET, NEW_VM_REQ,
|
||||
UPDATE_VM_REQ, VM_CONTRACT, VM_NODE,
|
||||
ACCOUNT, ACTIVE_VM, DB_ADDRESS, DB_NAME, DB_NS, DB_PASS, DB_USER, DELETED_VM, ID_ALPHABET,
|
||||
NEW_VM_REQ, UPDATE_VM_REQ, VM_CONTRACT, VM_NODE,
|
||||
};
|
||||
|
||||
use crate::old_brain;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{str::FromStr, sync::LazyLock};
|
||||
use surrealdb::{
|
||||
engine::remote::ws::{Client, Ws},
|
||||
opt::auth::Root,
|
||||
@ -16,65 +15,49 @@ use surrealdb::{
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio_stream::StreamExt as _;
|
||||
|
||||
pub static DB: LazyLock<Surreal<Client>> = LazyLock::new(Surreal::init);
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("Internal DB error: {0}")]
|
||||
DataBase(#[from] surrealdb::Error),
|
||||
#[error("Daemon channel got closed: {0}")]
|
||||
DaemonConnection(#[from] tokio::sync::mpsc::error::SendError<DaemonNotification>),
|
||||
#[error(transparent)]
|
||||
StdIo(#[from] std::io::Error),
|
||||
#[error(transparent)]
|
||||
TimeOut(#[from] tokio::time::error::Elapsed),
|
||||
}
|
||||
|
||||
pub async fn db_connection(
|
||||
db_address: &str,
|
||||
username: &str,
|
||||
password: &str,
|
||||
ns: &str,
|
||||
db: &str,
|
||||
) -> Result<Surreal<Client>, Error> {
|
||||
let db_connection: Surreal<Client> = Surreal::init();
|
||||
db_connection.connect::<Ws>(db_address).await?;
|
||||
pub async fn init(db_address: &str, ns: &str, db: &str) -> surrealdb::Result<()> {
|
||||
DB.connect::<Ws>(db_address).await?;
|
||||
// Sign in to the server
|
||||
db_connection.signin(Root { username, password }).await?;
|
||||
db_connection.use_ns(ns).use_db(db).await?;
|
||||
Ok(db_connection)
|
||||
DB.signin(Root { username: DB_USER, password: DB_PASS }).await?;
|
||||
DB.use_ns(ns).use_db(db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn upsert_record<SomeRecord: Serialize + 'static>(
|
||||
db: &Surreal<Client>,
|
||||
table: &str,
|
||||
id: &str,
|
||||
my_record: SomeRecord,
|
||||
) -> Result<(), Error> {
|
||||
#[derive(Deserialize)]
|
||||
struct Wrapper {}
|
||||
let _: Option<Wrapper> = db.create((table, id)).content(my_record).await?;
|
||||
let _: Option<Wrapper> = DB.create((table, id)).content(my_record).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn migration0(
|
||||
db: &Surreal<Client>,
|
||||
old_data: &old_brain::BrainData,
|
||||
) -> Result<(), Error> {
|
||||
pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()> {
|
||||
let accounts: Vec<Account> = old_data.into();
|
||||
let vm_nodes: Vec<VmNode> = old_data.into();
|
||||
let app_nodes: Vec<AppNode> = old_data.into();
|
||||
let vm_contracts: Vec<ActiveVm> = old_data.into();
|
||||
|
||||
let schema = std::fs::read_to_string(DB_SCHEMA_FILE)?;
|
||||
db.query(schema).await?;
|
||||
|
||||
println!("Inserting accounts...");
|
||||
let _: Vec<Account> = db.insert(()).content(accounts).await?;
|
||||
let _: Vec<Account> = DB.insert(()).content(accounts).await?;
|
||||
println!("Inserting vm nodes...");
|
||||
let _: Vec<VmNode> = db.insert(()).content(vm_nodes).await?;
|
||||
let _: Vec<VmNode> = DB.insert(()).content(vm_nodes).await?;
|
||||
println!("Inserting app nodes...");
|
||||
let _: Vec<AppNode> = db.insert(()).content(app_nodes).await?;
|
||||
let _: Vec<AppNode> = DB.insert(()).content(app_nodes).await?;
|
||||
println!("Inserting vm contracts...");
|
||||
let _: Vec<ActiveVm> = db.insert("vm_contract").relation(vm_contracts).await?;
|
||||
let _: Vec<ActiveVm> = DB.insert("vm_contract").relation(vm_contracts).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -89,9 +72,9 @@ pub struct Account {
|
||||
}
|
||||
|
||||
impl Account {
|
||||
pub async fn get(db: &Surreal<Client>, address: &str) -> Result<Self, Error> {
|
||||
pub async fn get(address: &str) -> Result<Self, Error> {
|
||||
let id = (ACCOUNT, address);
|
||||
let account: Option<Self> = db.select(id).await?;
|
||||
let account: Option<Self> = DB.select(id).await?;
|
||||
let account = match account {
|
||||
Some(account) => account,
|
||||
None => {
|
||||
@ -101,9 +84,9 @@ impl Account {
|
||||
Ok(account)
|
||||
}
|
||||
|
||||
pub async fn airdrop(db: &Surreal<Client>, account: &str, tokens: u64) -> Result<(), Error> {
|
||||
pub async fn airdrop(account: &str, tokens: u64) -> Result<(), Error> {
|
||||
let tokens = tokens.saturating_mul(1_000_000_000);
|
||||
let _ = db
|
||||
let _ = DB
|
||||
.query(format!("upsert account:{account} SET balance = (balance || 0) + {tokens};"))
|
||||
.await?;
|
||||
Ok(())
|
||||
@ -111,12 +94,8 @@ impl Account {
|
||||
}
|
||||
|
||||
impl Account {
|
||||
pub async fn is_banned_by_node(
|
||||
db: &Surreal<Client>,
|
||||
user: &str,
|
||||
node: &str,
|
||||
) -> Result<bool, Error> {
|
||||
let ban: Option<Self> = db
|
||||
pub async fn is_banned_by_node(user: &str, node: &str) -> Result<bool, Error> {
|
||||
let ban: Option<Self> = DB
|
||||
.query(format!(
|
||||
"(select operator->ban[0] as ban
|
||||
from vm_node:{node}
|
||||
@ -160,15 +139,15 @@ pub struct VmNodeResources {
|
||||
}
|
||||
|
||||
impl VmNodeResources {
|
||||
pub async fn merge(self, db: &Surreal<Client>, node_id: &str) -> Result<(), Error> {
|
||||
let _: Option<VmNode> = db.update((VM_NODE, node_id)).merge(self).await?;
|
||||
pub async fn merge(self, node_id: &str) -> Result<(), Error> {
|
||||
let _: Option<VmNode> = DB.update((VM_NODE, node_id)).merge(self).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl VmNode {
|
||||
pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> {
|
||||
let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?;
|
||||
pub async fn register(self) -> Result<(), Error> {
|
||||
let _: Option<VmNode> = DB.upsert(self.id.clone()).content(self).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -197,7 +176,6 @@ impl VmNodeWithReports {
|
||||
// TODO: find a more elegant way to do this than importing gRPC in the DB module
|
||||
// https://en.wikipedia.org/wiki/Dependency_inversion_principle
|
||||
pub async fn find_by_filters(
|
||||
db: &Surreal<Client>,
|
||||
filters: detee_shared::snp::pb::vm_proto::VmNodeFilters,
|
||||
) -> Result<Vec<Self>, Error> {
|
||||
let mut query = format!(
|
||||
@ -230,7 +208,7 @@ impl VmNodeWithReports {
|
||||
query += &format!("&& ip = '{}' ", filters.ip);
|
||||
}
|
||||
query += ";";
|
||||
let mut result = db.query(query).await?;
|
||||
let mut result = DB.query(query).await?;
|
||||
let vm_nodes: Vec<Self> = result.take(0)?;
|
||||
Ok(vm_nodes)
|
||||
}
|
||||
@ -285,27 +263,27 @@ pub struct NewVmReq {
|
||||
}
|
||||
|
||||
impl NewVmReq {
|
||||
pub async fn get(db: &Surreal<Client>, id: &str) -> Result<Option<Self>, Error> {
|
||||
let new_vm_req: Option<Self> = db.select((NEW_VM_REQ, id)).await?;
|
||||
pub async fn get(id: &str) -> Result<Option<Self>, Error> {
|
||||
let new_vm_req: Option<Self> = DB.select((NEW_VM_REQ, id)).await?;
|
||||
Ok(new_vm_req)
|
||||
}
|
||||
|
||||
pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<(), Error> {
|
||||
let _: Option<Self> = db.delete((NEW_VM_REQ, id)).await?;
|
||||
pub async fn delete(id: &str) -> Result<(), Error> {
|
||||
let _: Option<Self> = DB.delete((NEW_VM_REQ, id)).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> {
|
||||
pub async fn submit_error(id: &str, error: String) -> Result<(), Error> {
|
||||
#[derive(Serialize)]
|
||||
struct NewVmError {
|
||||
error: String,
|
||||
}
|
||||
let _: Option<Self> = db.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?;
|
||||
let _: Option<Self> = DB.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> {
|
||||
let _: Vec<Self> = db.insert(NEW_VM_REQ).relation(self).await?;
|
||||
pub async fn submit(self) -> Result<(), Error> {
|
||||
let _: Vec<Self> = DB.insert(NEW_VM_REQ).relation(self).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -319,8 +297,8 @@ pub enum NewVmResp {
|
||||
}
|
||||
|
||||
impl NewVmResp {
|
||||
pub async fn listen(db: &Surreal<Client>, vm_id: &str) -> Result<NewVmResp, Error> {
|
||||
let mut resp = db
|
||||
pub async fn listen(vm_id: &str) -> Result<NewVmResp, Error> {
|
||||
let mut resp = DB
|
||||
.query(format!("live select * from {NEW_VM_REQ} where id = {NEW_VM_REQ}:{vm_id};"))
|
||||
.query(format!(
|
||||
"live select * from measurement_args where id = measurement_args:{vm_id};"
|
||||
@ -330,37 +308,43 @@ impl NewVmResp {
|
||||
let mut args_stream =
|
||||
resp.stream::<Notification<detee_shared::snp::pb::vm_proto::MeasurementArgs>>(1)?;
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(10), async {
|
||||
loop {
|
||||
tokio::select! {
|
||||
new_vm_req_notif = new_vm_stream.next() => {
|
||||
log::debug!("Got stream 1...");
|
||||
if let Some(new_vm_req_notif) = new_vm_req_notif {
|
||||
match new_vm_req_notif {
|
||||
Ok(new_vm_req_notif) => {
|
||||
if new_vm_req_notif.action == surrealdb::Action::Update && !new_vm_req_notif.data.error.is_empty() {
|
||||
return Ok::<NewVmResp, Error>(Self::Error(vm_id.to_string(), new_vm_req_notif.data.error));
|
||||
};
|
||||
},
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
loop {
|
||||
tokio::select! {
|
||||
new_vm_req_notif = new_vm_stream.next() => {
|
||||
log::debug!("Got stream 1...");
|
||||
if let Some(new_vm_req_notif) = new_vm_req_notif {
|
||||
match new_vm_req_notif {
|
||||
Ok(new_vm_req_notif) => {
|
||||
match new_vm_req_notif.action {
|
||||
surrealdb::Action::Update => {
|
||||
if !new_vm_req_notif.data.error.is_empty() {
|
||||
return Ok(Self::Error(vm_id.to_string(), new_vm_req_notif.data.error));
|
||||
}
|
||||
},
|
||||
_ => {}
|
||||
};
|
||||
},
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
args_notif = args_stream.next() => {
|
||||
if let Some(args_notif) = args_notif {
|
||||
match args_notif {
|
||||
Ok(args_notif) => {
|
||||
if args_notif.action == surrealdb::Action::Create {
|
||||
}
|
||||
args_notif = args_stream.next() => {
|
||||
if let Some(args_notif) = args_notif {
|
||||
match args_notif {
|
||||
Ok(args_notif) => {
|
||||
match args_notif.action {
|
||||
surrealdb::Action::Create => {
|
||||
return Ok(Self::Args(vm_id.to_string(), args_notif.data));
|
||||
};
|
||||
},
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
},
|
||||
_ => {}
|
||||
};
|
||||
},
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}).await?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -388,11 +372,10 @@ pub struct ActiveVm {
|
||||
|
||||
impl ActiveVm {
|
||||
pub async fn activate(
|
||||
db: &Surreal<Client>,
|
||||
id: &str,
|
||||
args: detee_shared::vm_proto::MeasurementArgs,
|
||||
) -> Result<(), Error> {
|
||||
let new_vm_req = match NewVmReq::get(db, id).await? {
|
||||
let new_vm_req = match NewVmReq::get(id).await? {
|
||||
Some(r) => r,
|
||||
None => return Ok(()),
|
||||
};
|
||||
@ -440,9 +423,9 @@ impl ActiveVm {
|
||||
collected_at: new_vm_req.created_at,
|
||||
};
|
||||
|
||||
let _: Vec<ActiveVm> = db.insert(()).relation(active_vm).await?;
|
||||
let _: Vec<ActiveVm> = DB.insert(()).relation(active_vm).await?;
|
||||
|
||||
NewVmReq::delete(db, id).await?;
|
||||
NewVmReq::delete(id).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -469,7 +452,6 @@ pub struct UpdateVmReq {
|
||||
pub async fn listen_for_node<
|
||||
T: Into<DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>,
|
||||
>(
|
||||
db: &Surreal<Client>,
|
||||
node: &str,
|
||||
tx: Sender<DaemonNotification>,
|
||||
) -> Result<(), Error> {
|
||||
@ -483,15 +465,14 @@ pub async fn listen_for_node<
|
||||
}
|
||||
};
|
||||
let mut resp =
|
||||
db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?;
|
||||
DB.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?;
|
||||
let mut live_stream = resp.stream::<Notification<T>>(0)?;
|
||||
while let Some(result) = live_stream.next().await {
|
||||
match result {
|
||||
Ok(notification) => {
|
||||
if notification.action == surrealdb::Action::Create {
|
||||
tx.send(notification.data.into()).await?
|
||||
}
|
||||
}
|
||||
Ok(notification) => match notification.action {
|
||||
surrealdb::Action::Create => tx.send(notification.data.into()).await?,
|
||||
_ => {}
|
||||
},
|
||||
Err(e) => {
|
||||
log::warn!("listen_for_deletion DB stream failed for {node}: {e}");
|
||||
return Err(Error::from(e));
|
||||
@ -523,31 +504,28 @@ pub struct DeletedVm {
|
||||
}
|
||||
|
||||
impl DeletedVm {
|
||||
pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> {
|
||||
pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> {
|
||||
let contract: Option<Self> =
|
||||
db.query(format!("select * from {DELETED_VM}:{uuid};")).await?.take(0)?;
|
||||
DB.query(format!("select * from {DELETED_VM}:{uuid};")).await?.take(0)?;
|
||||
Ok(contract)
|
||||
}
|
||||
|
||||
pub async fn list_by_admin(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> {
|
||||
pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result =
|
||||
db.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?;
|
||||
DB.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?;
|
||||
let contracts: Vec<Self> = result.take(0)?;
|
||||
Ok(contracts)
|
||||
}
|
||||
|
||||
pub async fn list_by_node(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> {
|
||||
pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result =
|
||||
db.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?;
|
||||
DB.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?;
|
||||
let contracts: Vec<Self> = result.take(0)?;
|
||||
Ok(contracts)
|
||||
}
|
||||
|
||||
pub async fn list_by_operator(
|
||||
db: &Surreal<Client>,
|
||||
operator: &str,
|
||||
) -> Result<Vec<Self>, Error> {
|
||||
let mut result = db
|
||||
pub async fn list_by_operator(operator: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result = DB
|
||||
.query(format!(
|
||||
"select
|
||||
(select * from ->operator->vm_node<-{DELETED_VM}) as contracts
|
||||
@ -625,33 +603,30 @@ pub struct ActiveVmWithNode {
|
||||
}
|
||||
|
||||
impl ActiveVmWithNode {
|
||||
pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> {
|
||||
pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> {
|
||||
let contract: Option<Self> =
|
||||
db.query(format!("select * from {ACTIVE_VM}:{uuid} fetch out;")).await?.take(0)?;
|
||||
DB.query(format!("select * from {ACTIVE_VM}:{uuid} fetch out;")).await?.take(0)?;
|
||||
Ok(contract)
|
||||
}
|
||||
|
||||
pub async fn list_by_admin(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result = db
|
||||
pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result = DB
|
||||
.query(format!("select * from {ACTIVE_VM} where in = {ACCOUNT}:{admin} fetch out;"))
|
||||
.await?;
|
||||
let contracts: Vec<Self> = result.take(0)?;
|
||||
Ok(contracts)
|
||||
}
|
||||
|
||||
pub async fn list_by_node(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result = db
|
||||
pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result = DB
|
||||
.query(format!("select * from {ACTIVE_VM} where out = {VM_NODE}:{admin} fetch out;"))
|
||||
.await?;
|
||||
let contracts: Vec<Self> = result.take(0)?;
|
||||
Ok(contracts)
|
||||
}
|
||||
|
||||
pub async fn list_by_operator(
|
||||
db: &Surreal<Client>,
|
||||
operator: &str,
|
||||
) -> Result<Vec<Self>, Error> {
|
||||
let mut result = db
|
||||
pub async fn list_by_operator(operator: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result = DB
|
||||
.query(format!(
|
||||
"select
|
||||
(select * from ->operator->vm_node<-{ACTIVE_VM} fetch out) as contracts
|
||||
@ -724,7 +699,7 @@ pub struct AppNodeWithReports {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ActiveApp {
|
||||
pub struct AppContract {
|
||||
id: RecordId,
|
||||
#[serde(rename = "in")]
|
||||
admin: RecordId,
|
||||
@ -745,36 +720,6 @@ pub struct ActiveApp {
|
||||
hratls_pubkey: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ActiveAppWithNode {
|
||||
pub id: RecordId,
|
||||
#[serde(rename = "in")]
|
||||
pub admin: RecordId,
|
||||
#[serde(rename = "out")]
|
||||
pub app_node: AppNode,
|
||||
pub app_name: String,
|
||||
pub mapped_ports: Vec<(u64, u64)>,
|
||||
pub host_ipv4: String,
|
||||
pub vcpus: u64,
|
||||
pub memory_mb: u64,
|
||||
pub disk_size_gb: u64,
|
||||
pub created_at: Datetime,
|
||||
pub price_per_unit: u64,
|
||||
pub locked_nano: u64,
|
||||
pub collected_at: Datetime,
|
||||
pub mr_enclave: String,
|
||||
pub package_url: String,
|
||||
pub hratls_pubkey: String,
|
||||
}
|
||||
|
||||
impl ActiveAppWithNode {
|
||||
pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> {
|
||||
let contract: Option<Self> =
|
||||
db.query(format!("select * from {ACTIVE_APP}:{uuid} fetch out;")).await?.take(0)?;
|
||||
Ok(contract)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Ban {
|
||||
id: RecordId,
|
||||
@ -805,27 +750,18 @@ pub struct Report {
|
||||
to_node: RecordId,
|
||||
created_at: Datetime,
|
||||
pub reason: String,
|
||||
pub contract_id: String,
|
||||
}
|
||||
|
||||
impl Report {
|
||||
// TODO: test this functionality and remove this comment
|
||||
pub async fn create(
|
||||
db: &Surreal<Client>,
|
||||
from_account: RecordId,
|
||||
to_node: RecordId,
|
||||
reason: String,
|
||||
contract_id: String,
|
||||
) -> Result<(), Error> {
|
||||
let _: Vec<Self> = db
|
||||
let _: Vec<Self> = DB
|
||||
.insert("report")
|
||||
.relation(Report {
|
||||
from_account,
|
||||
to_node,
|
||||
created_at: Datetime::default(),
|
||||
reason,
|
||||
contract_id,
|
||||
})
|
||||
.relation(Report { from_account, to_node, created_at: Datetime::default(), reason })
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@ -844,28 +780,27 @@ pub struct Operator {
|
||||
}
|
||||
|
||||
impl Operator {
|
||||
pub async fn list(db: &Surreal<Client>) -> Result<Vec<Self>, Error> {
|
||||
let mut result = db
|
||||
.query(
|
||||
pub async fn list() -> Result<Vec<Self>, Error> {
|
||||
let mut result = DB
|
||||
.query(format!(
|
||||
"array::distinct(array::flatten( [
|
||||
(select operator from vm_node group by operator).operator,
|
||||
(select operator from app_node group by operator).operator
|
||||
]));"
|
||||
.to_string(),
|
||||
)
|
||||
))
|
||||
.await?;
|
||||
let operator_accounts: Vec<RecordId> = result.take(0)?;
|
||||
let mut operators: Vec<Self> = Vec::new();
|
||||
for account in operator_accounts.iter() {
|
||||
if let Some(operator) = Self::inspect(db, &account.key().to_string()).await? {
|
||||
if let Some(operator) = Self::inspect(&account.key().to_string()).await? {
|
||||
operators.push(operator);
|
||||
}
|
||||
}
|
||||
Ok(operators)
|
||||
}
|
||||
|
||||
pub async fn inspect(db: &Surreal<Client>, account: &str) -> Result<Option<Self>, Error> {
|
||||
let mut result = db
|
||||
pub async fn inspect(account: &str) -> Result<Option<Self>, Error> {
|
||||
let mut result = DB
|
||||
.query(format!(
|
||||
"$vm_nodes = (select id from vm_node where operator = account:{account}).id;
|
||||
$app_nodes = (select id from app_node where operator = account:{account}).id;
|
||||
@ -886,11 +821,10 @@ impl Operator {
|
||||
}
|
||||
|
||||
pub async fn inspect_nodes(
|
||||
db: &Surreal<Client>,
|
||||
account: &str,
|
||||
) -> Result<(Option<Self>, Vec<VmNodeWithReports>, Vec<AppNodeWithReports>), Error> {
|
||||
let operator = Self::inspect(db, account).await?;
|
||||
let mut result = db
|
||||
let operator = Self::inspect(account).await?;
|
||||
let mut result = DB
|
||||
.query(format!(
|
||||
"select *, operator, <-report.* as reports from vm_node
|
||||
where operator = account:{account};"
|
||||
@ -941,7 +875,7 @@ impl From<&old_brain::BrainData> for Vec<ActiveVm> {
|
||||
for old_c in old_data.vm_contracts.iter() {
|
||||
let mut mapped_ports = Vec::new();
|
||||
for port in old_c.exposed_ports.iter() {
|
||||
mapped_ports.push((*port, 8080u32));
|
||||
mapped_ports.push((*port, 8080 as u32));
|
||||
}
|
||||
contracts.push(ActiveVm {
|
||||
id: RecordId::from((ACTIVE_VM, old_c.uuid.replace("-", ""))),
|
||||
|
140
src/grpc.rs
140
src/grpc.rs
@ -15,26 +15,16 @@ use detee_shared::{
|
||||
},
|
||||
};
|
||||
use nanoid::nanoid;
|
||||
use surrealdb::{engine::remote::ws::Client, Surreal};
|
||||
|
||||
use log::info;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use surrealdb::RecordId;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_stream::{Stream, StreamExt};
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
|
||||
pub struct BrainGeneralCliForReal {
|
||||
pub db: Arc<Surreal<Client>>,
|
||||
}
|
||||
|
||||
impl BrainGeneralCliForReal {
|
||||
pub fn new(db: Arc<Surreal<Client>>) -> Self {
|
||||
Self { db }
|
||||
}
|
||||
}
|
||||
pub struct BrainGeneralCliForReal {}
|
||||
|
||||
impl From<db::Account> for AccountBalance {
|
||||
fn from(account: db::Account) -> Self {
|
||||
@ -249,15 +239,7 @@ impl From<VmNodeResources> for db::VmNodeResources {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BrainVmDaemonForReal {
|
||||
pub db: Arc<Surreal<Client>>,
|
||||
}
|
||||
|
||||
impl BrainVmDaemonForReal {
|
||||
pub fn new(db: Arc<Surreal<Client>>) -> Self {
|
||||
Self { db }
|
||||
}
|
||||
}
|
||||
pub struct BrainVmDaemonForReal {}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl BrainVmDaemon for BrainVmDaemonForReal {
|
||||
@ -285,11 +267,11 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
|
||||
max_ports_per_vm: 0,
|
||||
offline_minutes: 0,
|
||||
}
|
||||
.register(&self.db)
|
||||
.register()
|
||||
.await?;
|
||||
|
||||
info!("Sending existing contracts to {}", req.node_pubkey);
|
||||
let contracts = db::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?;
|
||||
let contracts = db::ActiveVmWithNode::list_by_node(&req.node_pubkey).await?;
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for contract in contracts {
|
||||
@ -317,11 +299,10 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
|
||||
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
{
|
||||
let db = self.db.clone();
|
||||
let pubkey = pubkey.clone();
|
||||
let tx = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
match db::listen_for_node::<db::DeletedVm>(&db, &pubkey, tx).await {
|
||||
match db::listen_for_node::<db::DeletedVm>(&pubkey, tx).await {
|
||||
Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"),
|
||||
Err(e) => {
|
||||
log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}")
|
||||
@ -330,19 +311,17 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
|
||||
});
|
||||
}
|
||||
{
|
||||
let db = self.db.clone();
|
||||
let pubkey = pubkey.clone();
|
||||
let tx = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = db::listen_for_node::<db::NewVmReq>(&db, &pubkey, tx.clone()).await;
|
||||
let _ = db::listen_for_node::<db::NewVmReq>(&pubkey, tx.clone()).await;
|
||||
});
|
||||
}
|
||||
{
|
||||
let db = self.db.clone();
|
||||
let pubkey = pubkey.clone();
|
||||
let tx = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = db::listen_for_node::<db::UpdateVmReq>(&db, &pubkey, tx.clone()).await;
|
||||
let _ = db::listen_for_node::<db::UpdateVmReq>(&pubkey, tx.clone()).await;
|
||||
});
|
||||
}
|
||||
|
||||
@ -382,32 +361,27 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
|
||||
// TODO: move new_vm_req to active_vm
|
||||
// also handle failure properly
|
||||
if !new_vm_resp.error.is_empty() {
|
||||
db::NewVmReq::submit_error(
|
||||
&self.db,
|
||||
&new_vm_resp.uuid,
|
||||
new_vm_resp.error,
|
||||
)
|
||||
.await?;
|
||||
db::NewVmReq::submit_error(&new_vm_resp.uuid, new_vm_resp.error)
|
||||
.await?;
|
||||
} else {
|
||||
db::upsert_record(
|
||||
&self.db,
|
||||
"measurement_args",
|
||||
&new_vm_resp.uuid,
|
||||
new_vm_resp.args.clone(),
|
||||
)
|
||||
.await?;
|
||||
if let Some(args) = new_vm_resp.args {
|
||||
db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?;
|
||||
db::ActiveVm::activate(&new_vm_resp.uuid, args).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(vm_daemon_message::Msg::UpdateVmResp(_update_vm_resp)) => {
|
||||
Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {
|
||||
todo!();
|
||||
// self.data.submit_updatevm_resp(update_vm_resp).await;
|
||||
}
|
||||
Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => {
|
||||
let node_resources: db::VmNodeResources = node_resources.into();
|
||||
node_resources.merge(&self.db, &pubkey).await?;
|
||||
node_resources.merge(&pubkey).await?;
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
@ -431,32 +405,24 @@ impl BrainGeneralCli for BrainGeneralCliForReal {
|
||||
|
||||
async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
Ok(Response::new(db::Account::get(&self.db, &req.pubkey).await?.into()))
|
||||
Ok(Response::new(db::Account::get(&req.pubkey).await?.into()))
|
||||
}
|
||||
|
||||
async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
let (account, node, contract_id) =
|
||||
match db::ActiveVmWithNode::get_by_uuid(&self.db, &req.contract).await? {
|
||||
Some(vm_contract)
|
||||
if vm_contract.admin.key().to_string() == req.admin_pubkey
|
||||
&& vm_contract.vm_node.id.key().to_string() == req.node_pubkey =>
|
||||
{
|
||||
(vm_contract.admin, vm_contract.vm_node.id, vm_contract.id.to_string())
|
||||
}
|
||||
_ => match db::ActiveAppWithNode::get_by_uuid(&self.db, &req.contract).await? {
|
||||
Some(app_contract)
|
||||
if app_contract.admin.key().to_string() == req.admin_pubkey
|
||||
&& app_contract.app_node.id.key().to_string() == req.node_pubkey =>
|
||||
{
|
||||
(app_contract.admin, app_contract.app_node.id, app_contract.id.to_string())
|
||||
}
|
||||
_ => {
|
||||
return Err(Status::unauthenticated("No contract found by this ID."));
|
||||
}
|
||||
},
|
||||
};
|
||||
db::Report::create(&self.db, account, node, req.reason, contract_id).await?;
|
||||
let (account, node) = match db::ActiveVmWithNode::get_by_uuid(&req.contract).await? {
|
||||
Some(vm_contract)
|
||||
if vm_contract.admin.key().to_string() == req.admin_pubkey
|
||||
&& vm_contract.vm_node.id.key().to_string() == req.node_pubkey =>
|
||||
{
|
||||
(vm_contract.admin, vm_contract.vm_node.id)
|
||||
}
|
||||
_ => {
|
||||
// TODO: Hey, Noor! Please add app contract here.
|
||||
return Err(Status::unauthenticated("No contract found by this ID."));
|
||||
}
|
||||
};
|
||||
db::Report::create(account, node, req.reason).await?;
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
|
||||
@ -465,7 +431,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal {
|
||||
req: Request<Empty>,
|
||||
) -> Result<Response<Self::ListOperatorsStream>, Status> {
|
||||
let _ = check_sig_from_req(req)?;
|
||||
let operators = db::Operator::list(&self.db).await?;
|
||||
let operators = db::Operator::list().await?;
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for op in operators {
|
||||
@ -480,7 +446,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal {
|
||||
&self,
|
||||
req: Request<Pubkey>,
|
||||
) -> Result<Response<InspectOperatorResp>, Status> {
|
||||
match db::Operator::inspect_nodes(&self.db, &req.into_inner().pubkey).await? {
|
||||
match db::Operator::inspect_nodes(&req.into_inner().pubkey).await? {
|
||||
(Some(op), vm_nodes, app_nodes) => Ok(Response::new(InspectOperatorResp {
|
||||
operator: Some(op.into()),
|
||||
vm_nodes: vm_nodes.into_iter().map(|n| n.into()).collect(),
|
||||
@ -524,7 +490,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal {
|
||||
async fn airdrop(&self, req: Request<AirdropReq>) -> Result<Response<Empty>, Status> {
|
||||
check_admin_key(&req)?;
|
||||
let req = check_sig_from_req(req)?;
|
||||
db::Account::airdrop(&self.db, &req.pubkey, req.tokens).await?;
|
||||
db::Account::airdrop(&req.pubkey, req.tokens).await?;
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
|
||||
@ -591,15 +557,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BrainVmCliForReal {
|
||||
pub db: Arc<Surreal<Client>>,
|
||||
}
|
||||
|
||||
impl BrainVmCliForReal {
|
||||
pub fn new(db: Arc<Surreal<Client>>) -> Self {
|
||||
Self { db }
|
||||
}
|
||||
}
|
||||
pub struct BrainVmCliForReal {}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl BrainVmCli for BrainVmCliForReal {
|
||||
@ -609,23 +567,19 @@ impl BrainVmCli for BrainVmCliForReal {
|
||||
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("New VM requested via CLI: {req:?}");
|
||||
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
|
||||
if db::Account::is_banned_by_node(&req.admin_pubkey, &req.node_pubkey).await? {
|
||||
return Err(Status::permission_denied("This operator banned you. What did you do?"));
|
||||
}
|
||||
|
||||
let new_vm_req: db::NewVmReq = req.into();
|
||||
let id = new_vm_req.id.key().to_string();
|
||||
|
||||
let db = self.db.clone();
|
||||
|
||||
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
|
||||
tokio::spawn(async move {
|
||||
let _ = oneshot_tx.send(db::NewVmResp::listen(&db, &id).await);
|
||||
let _ = oneshot_tx.send(db::NewVmResp::listen(&id).await);
|
||||
});
|
||||
new_vm_req.submit(&self.db).await?;
|
||||
new_vm_req.submit().await?;
|
||||
|
||||
match oneshot_rx.await {
|
||||
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded("Request failed due to timeout. Please try again later or contact the DeTEE devs team.")),
|
||||
Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())),
|
||||
Err(e) => {
|
||||
log::error!("Something weird happened. Reached error {e:?}");
|
||||
@ -685,20 +639,20 @@ impl BrainVmCli for BrainVmCliForReal {
|
||||
);
|
||||
let mut contracts = Vec::new();
|
||||
if !req.uuid.is_empty() {
|
||||
if let Some(specific_contract) =
|
||||
db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await?
|
||||
{
|
||||
if let Some(specific_contract) = db::ActiveVmWithNode::get_by_uuid(&req.uuid).await? {
|
||||
if specific_contract.admin.key().to_string() == req.wallet {
|
||||
contracts.push(specific_contract);
|
||||
contracts.push(specific_contract.into());
|
||||
}
|
||||
// TODO: allow operator to inspect contracts
|
||||
}
|
||||
} else if req.as_operator {
|
||||
contracts
|
||||
.append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?);
|
||||
} else {
|
||||
contracts
|
||||
.append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?);
|
||||
if req.as_operator {
|
||||
contracts
|
||||
.append(&mut db::ActiveVmWithNode::list_by_operator(&req.wallet).await?.into());
|
||||
} else {
|
||||
contracts
|
||||
.append(&mut db::ActiveVmWithNode::list_by_admin(&req.wallet).await?.into());
|
||||
}
|
||||
}
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
@ -716,7 +670,7 @@ impl BrainVmCli for BrainVmCliForReal {
|
||||
) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("CLI requested ListVmNodesStream: {req:?}");
|
||||
let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?;
|
||||
let nodes = db::VmNodeWithReports::find_by_filters(req).await?;
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for node in nodes {
|
||||
@ -734,7 +688,7 @@ impl BrainVmCli for BrainVmCliForReal {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("Unknown CLI requested ListVmNodesStream: {req:?}");
|
||||
// TODO: optimize this query so that it gets only one node
|
||||
let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?;
|
||||
let nodes = db::VmNodeWithReports::find_by_filters(req).await?;
|
||||
if let Some(node) = nodes.into_iter().next() {
|
||||
return Ok(Response::new(node.into()));
|
||||
}
|
||||
@ -800,7 +754,7 @@ fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Res
|
||||
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
|
||||
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
|
||||
let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds();
|
||||
if !(-4..=4).contains(&seconds_elapsed) {
|
||||
if seconds_elapsed > 4 || seconds_elapsed < -4 {
|
||||
return Err(Status::unauthenticated(format!(
|
||||
"Date is not within 4 sec of the time of the server: CLI {} vs Server {}",
|
||||
parsed_time, now
|
||||
@ -841,7 +795,7 @@ fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Res
|
||||
.verify(message.as_bytes(), &signature)
|
||||
.map_err(|_| Status::unauthenticated("the signature is not valid"))?;
|
||||
if let Some(req_pubkey) = req.get_pubkey() {
|
||||
if *pubkey_value.to_str().unwrap() != req_pubkey {
|
||||
if pubkey_value.to_str().unwrap().to_string() != req_pubkey {
|
||||
return Err(Status::unauthenticated(
|
||||
"pubkey of signature does not match pubkey of request",
|
||||
));
|
||||
@ -855,7 +809,7 @@ fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Resul
|
||||
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
|
||||
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
|
||||
let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds();
|
||||
if !(-4..=4).contains(&seconds_elapsed) {
|
||||
if seconds_elapsed > 4 || seconds_elapsed < -4 {
|
||||
return Err(Status::unauthenticated(format!(
|
||||
"Date is not within 4 sec of the time of the server: CLI {} vs Server {}",
|
||||
parsed_time, now
|
||||
|
@ -1,8 +1,2 @@
|
||||
#[allow(dead_code)]
|
||||
pub mod prepare_test_env;
|
||||
#[allow(dead_code)]
|
||||
pub mod test_utils;
|
||||
#[allow(dead_code)]
|
||||
pub mod vm_cli_utils;
|
||||
#[allow(dead_code)]
|
||||
pub mod vm_daemon_utils;
|
||||
|
@ -3,64 +3,47 @@ use detee_shared::{
|
||||
general_proto::brain_general_cli_server::BrainGeneralCliServer,
|
||||
vm_proto::brain_vm_daemon_server::BrainVmDaemonServer,
|
||||
};
|
||||
use dotenv::dotenv;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmCliForReal, BrainVmDaemonForReal};
|
||||
use surrealdb::engine::remote::ws::Client;
|
||||
use surrealdb::Surreal;
|
||||
use tokio::io::DuplexStream;
|
||||
use tokio::{net::TcpListener, sync::OnceCell};
|
||||
use tonic::transport::{Channel, Endpoint, Server, Uri};
|
||||
use tower::service_fn;
|
||||
|
||||
pub const DB_URL: &str = "localhost:8000";
|
||||
pub const DB_NS: &str = "test_brain";
|
||||
pub const DB_NAME: &str = "test_migration_db";
|
||||
|
||||
pub static DB_STATE: OnceCell<()> = OnceCell::const_new();
|
||||
|
||||
pub async fn prepare_test_db() -> Surreal<Client> {
|
||||
dotenv().ok();
|
||||
|
||||
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
|
||||
let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env");
|
||||
let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env");
|
||||
let db_ns = "test_brain";
|
||||
let db_name = "test_migration_db";
|
||||
|
||||
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name)
|
||||
.await
|
||||
.unwrap();
|
||||
pub async fn prepare_test_db() {
|
||||
DB_STATE
|
||||
.get_or_init(|| async {
|
||||
surreal_brain::db::init(DB_URL, DB_NS, DB_NAME)
|
||||
.await
|
||||
.expect("Failed to initialize the database");
|
||||
|
||||
let old_brain_data = surreal_brain::old_brain::BrainData::load_from_disk().unwrap();
|
||||
db.query(format!("REMOVE DATABASE {db_name}")).await.unwrap();
|
||||
db.query(std::fs::read_to_string("interim_tables.surql").unwrap()).await.unwrap();
|
||||
surreal_brain::db::migration0(&db, &old_brain_data).await.unwrap();
|
||||
surreal_brain::db::DB.query(format!("REMOVE DATABASE {DB_NAME}")).await.unwrap();
|
||||
surreal_brain::db::DB
|
||||
.query(std::fs::read_to_string("interim_tables.surql").unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
surreal_brain::db::migration0(&old_brain_data).await.unwrap();
|
||||
})
|
||||
.await;
|
||||
db
|
||||
}
|
||||
|
||||
pub async fn run_service_in_background() -> SocketAddr {
|
||||
dotenv().ok();
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
|
||||
let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env");
|
||||
let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env");
|
||||
let db_ns = "test_brain";
|
||||
let db_name = "test_migration_db";
|
||||
|
||||
tokio::spawn(async move {
|
||||
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name)
|
||||
.await
|
||||
.unwrap();
|
||||
let db_arc = Arc::new(db);
|
||||
|
||||
Server::builder()
|
||||
.add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone())))
|
||||
.add_service(BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone())))
|
||||
.add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone())))
|
||||
.add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal {}))
|
||||
.add_service(BrainVmCliServer::new(BrainVmCliForReal {}))
|
||||
.add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal {}))
|
||||
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
|
||||
.await
|
||||
.unwrap();
|
||||
@ -71,45 +54,46 @@ pub async fn run_service_in_background() -> SocketAddr {
|
||||
addr
|
||||
}
|
||||
|
||||
pub async fn run_service_for_stream_server() -> DuplexStream {
|
||||
dotenv().ok();
|
||||
pub async fn run_service_for_stream_server() -> (DuplexStream, SocketAddr) {
|
||||
let (client, server) = tokio::io::duplex(1024);
|
||||
|
||||
let db_url = std::env::var("DB_URL").expect("DB_URL not set in .env");
|
||||
let db_user = std::env::var("DB_USER").expect("DB_USER not set in .env");
|
||||
let db_pass = std::env::var("DB_PASS").expect("DB_PASS not set in .env");
|
||||
let db_ns = "test_brain";
|
||||
let db_name = "test_migration_db";
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let db = surreal_brain::db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name)
|
||||
.await
|
||||
.unwrap();
|
||||
let db_arc = Arc::new(db);
|
||||
|
||||
tonic::transport::Server::builder()
|
||||
.add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone())))
|
||||
.add_service(BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone())))
|
||||
.add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone())))
|
||||
.add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal {}))
|
||||
.add_service(BrainVmCliServer::new(BrainVmCliForReal {}))
|
||||
.add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal {}))
|
||||
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
|
||||
.await
|
||||
});
|
||||
client
|
||||
(client, addr)
|
||||
}
|
||||
|
||||
pub async fn connect_stream_client_channel(c_stream: DuplexStream) -> Channel {
|
||||
pub async fn connect_stream_client_channel(c_stream: DuplexStream, addr: SocketAddr) -> Channel {
|
||||
let url = format!("http://{}", addr);
|
||||
let mut client = Some(c_stream);
|
||||
|
||||
Endpoint::from_static("http://127.0.0.1:0")
|
||||
Endpoint::try_from(url)
|
||||
.unwrap()
|
||||
.connect_with_connector(service_fn(move |_: Uri| {
|
||||
let client = client.take().unwrap();
|
||||
async move { Ok::<TokioIo<DuplexStream>, std::io::Error>(TokioIo::new(client)) }
|
||||
let client = client.take();
|
||||
|
||||
async move {
|
||||
if let Some(client) = client {
|
||||
Ok(TokioIo::new(client))
|
||||
} else {
|
||||
Err(std::io::Error::new(std::io::ErrorKind::Other, "Client already taken"))
|
||||
}
|
||||
}
|
||||
}))
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn run_service_for_stream() -> Channel {
|
||||
let client = run_service_for_stream_server().await;
|
||||
connect_stream_client_channel(client).await
|
||||
let (client, addr) = run_service_for_stream_server().await;
|
||||
connect_stream_client_channel(client, addr).await
|
||||
}
|
||||
|
@ -38,6 +38,7 @@ impl Key {
|
||||
Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn sign_stream_auth(&self, contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> {
|
||||
let pubkey = self.pubkey.clone();
|
||||
let timestamp = chrono::Utc::now().to_rfc3339();
|
||||
|
@ -1,47 +0,0 @@
|
||||
use super::test_utils::Key;
|
||||
use detee_shared::vm_proto;
|
||||
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
|
||||
use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ};
|
||||
use surreal_brain::db;
|
||||
use surrealdb::engine::remote::ws::Client;
|
||||
use surrealdb::Surreal;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
pub async fn create_new_vm(
|
||||
db: &Surreal<Client>,
|
||||
key: Key,
|
||||
node_pubkey: String,
|
||||
brain_channel: Channel,
|
||||
) -> String {
|
||||
let new_vm_req = vm_proto::NewVmReq {
|
||||
admin_pubkey: key.pubkey.clone(),
|
||||
node_pubkey,
|
||||
price_per_unit: 1200,
|
||||
extra_ports: vec![8080, 8081],
|
||||
locked_nano: 0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
|
||||
let new_vm_resp =
|
||||
client_vm_cli.new_vm(key.sign_request(new_vm_req).unwrap()).await.unwrap().into_inner();
|
||||
|
||||
assert!(new_vm_resp.error.is_empty());
|
||||
assert!(new_vm_resp.uuid.len() == 40);
|
||||
|
||||
// wait for update db
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(700)).await;
|
||||
|
||||
let vm_req_db: Option<db::NewVmReq> =
|
||||
db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap();
|
||||
|
||||
if let Some(new_vm_req) = vm_req_db {
|
||||
panic!("New VM request found in DB: {:?}", new_vm_req);
|
||||
}
|
||||
|
||||
let active_vm_op: Option<db::ActiveVm> =
|
||||
db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap();
|
||||
let active_vm = active_vm_op.unwrap();
|
||||
|
||||
active_vm.id.key().to_string()
|
||||
}
|
@ -1,134 +0,0 @@
|
||||
use super::test_utils::Key;
|
||||
use detee_shared::vm_proto;
|
||||
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
|
||||
use detee_shared::vm_proto::RegisterVmNodeReq;
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
pub async fn mock_vm_daemon(brain_channel: Channel) -> String {
|
||||
let daemon_client = BrainVmDaemonClient::new(brain_channel);
|
||||
let daemon_key = Key::new();
|
||||
|
||||
register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await;
|
||||
|
||||
let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx));
|
||||
|
||||
let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
tokio::spawn(daemon_msg_sender(
|
||||
daemon_client.clone(),
|
||||
daemon_key.clone(),
|
||||
daemon_msg_tx.clone(),
|
||||
rx,
|
||||
));
|
||||
|
||||
tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx));
|
||||
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
|
||||
daemon_key.pubkey
|
||||
}
|
||||
|
||||
pub async fn register_vm_node(
|
||||
mut client: BrainVmDaemonClient<Channel>,
|
||||
key: Key,
|
||||
operator_wallet: String,
|
||||
) -> Vec<vm_proto::VmContract> {
|
||||
let node_pubkey = key.pubkey.clone();
|
||||
|
||||
let req = RegisterVmNodeReq {
|
||||
node_pubkey,
|
||||
operator_wallet,
|
||||
main_ip: String::from("185.243.218.213"),
|
||||
city: String::from("Oslo"),
|
||||
country: String::from("Norway"),
|
||||
region: String::from("EU"),
|
||||
price: 1200,
|
||||
};
|
||||
|
||||
let mut grpc_stream =
|
||||
client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner();
|
||||
|
||||
let mut vm_contracts = Vec::new();
|
||||
while let Some(stream_update) = grpc_stream.next().await {
|
||||
match stream_update {
|
||||
Ok(vm_c) => {
|
||||
vm_contracts.push(vm_c);
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Received error instead of vm_contracts: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
vm_contracts
|
||||
}
|
||||
|
||||
pub async fn daemon_listener(
|
||||
mut client: BrainVmDaemonClient<Channel>,
|
||||
key: Key,
|
||||
tx: mpsc::Sender<vm_proto::BrainVmMessage>,
|
||||
) {
|
||||
let mut grpc_stream =
|
||||
client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner();
|
||||
|
||||
while let Some(Ok(stream_update)) = grpc_stream.next().await {
|
||||
log::info!("vm deamon got notified: {:?}", &stream_update);
|
||||
let _ = tx.send(stream_update).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn daemon_msg_sender(
|
||||
mut client: BrainVmDaemonClient<Channel>,
|
||||
key: Key,
|
||||
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
|
||||
rx: mpsc::Receiver<vm_proto::VmDaemonMessage>,
|
||||
) {
|
||||
let rx_stream = ReceiverStream::new(rx);
|
||||
tx.send(vm_proto::VmDaemonMessage {
|
||||
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
client.daemon_messages(rx_stream).await.unwrap();
|
||||
}
|
||||
|
||||
pub async fn daemon_engine(
|
||||
tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
|
||||
mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
|
||||
) {
|
||||
while let Some(brain_msg) = rx.recv().await {
|
||||
match brain_msg.msg {
|
||||
Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
|
||||
let args = Some(vm_proto::MeasurementArgs {
|
||||
dtrfs_api_endpoint: String::from("184.107.169.199:48865"),
|
||||
exposed_ports: new_vm_req.extra_ports,
|
||||
ovmf_hash: String::from(
|
||||
"0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76",
|
||||
),
|
||||
ips: vec![],
|
||||
});
|
||||
|
||||
let new_vm_resp = vm_proto::NewVmResp {
|
||||
uuid: new_vm_req.uuid.clone(),
|
||||
args,
|
||||
error: String::new(),
|
||||
};
|
||||
|
||||
let res_data = vm_proto::VmDaemonMessage {
|
||||
msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)),
|
||||
};
|
||||
tx.send(res_data).await.unwrap();
|
||||
}
|
||||
Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
|
||||
todo!()
|
||||
}
|
||||
Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => {
|
||||
todo!()
|
||||
}
|
||||
None => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
@ -1,8 +1,6 @@
|
||||
use common::{
|
||||
prepare_test_env::{prepare_test_db, run_service_for_stream, run_service_in_background},
|
||||
test_utils::Key,
|
||||
vm_cli_utils::create_new_vm,
|
||||
vm_daemon_utils::mock_vm_daemon,
|
||||
};
|
||||
use detee_shared::common_proto::Empty;
|
||||
use detee_shared::general_proto::ReportNodeReq;
|
||||
@ -12,8 +10,6 @@ use detee_shared::{
|
||||
common_proto::Pubkey, general_proto::brain_general_cli_client::BrainGeneralCliClient,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use surreal_brain::constants::VM_NODE;
|
||||
use surreal_brain::db::VmNodeWithReports;
|
||||
|
||||
mod common;
|
||||
|
||||
@ -37,71 +33,30 @@ async fn test_general_balance() {
|
||||
assert_eq!(acc_bal.tmp_locked, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_vm_creation() {
|
||||
let db = prepare_test_db().await;
|
||||
|
||||
let brain_channel = run_service_for_stream().await;
|
||||
let daemon_key = mock_vm_daemon(brain_channel.clone()).await;
|
||||
|
||||
let key = Key::new();
|
||||
|
||||
let _ = create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_report_node() {
|
||||
let db = prepare_test_db().await;
|
||||
prepare_test_db().await;
|
||||
|
||||
let brain_channel = run_service_for_stream().await;
|
||||
let daemon_key = mock_vm_daemon(brain_channel.clone()).await;
|
||||
let mut client_gen_cli = BrainGeneralCliClient::new(brain_channel.clone());
|
||||
let addr = run_service_in_background().await;
|
||||
let mut client = BrainGeneralCliClient::connect(format!("http://{}", addr)).await.unwrap();
|
||||
|
||||
let key = Key::new();
|
||||
let pubkey = key.pubkey.clone();
|
||||
|
||||
let report_req = ReportNodeReq {
|
||||
admin_pubkey: pubkey.clone(),
|
||||
node_pubkey: daemon_key.clone(),
|
||||
// TODO: create contract, node and operator in db and use it here
|
||||
let req_data = ReportNodeReq {
|
||||
admin_pubkey: pubkey,
|
||||
node_pubkey: String::from("node_pubkey"),
|
||||
contract: String::from("uuid"),
|
||||
reason: String::from("reason"),
|
||||
};
|
||||
|
||||
let report_error =
|
||||
client_gen_cli.report_node(key.sign_request(report_req).unwrap()).await.err().unwrap();
|
||||
let report_error = client.report_node(key.sign_request(req_data).unwrap()).await.err().unwrap();
|
||||
|
||||
println!("Report error: {:?}", report_error);
|
||||
assert_eq!(report_error.message(), "No contract found by this ID.");
|
||||
|
||||
let active_vm_id =
|
||||
create_new_vm(&db, key.clone(), daemon_key.clone(), brain_channel.clone()).await;
|
||||
|
||||
let reason = String::from("something went wrong on vm");
|
||||
let report_req = ReportNodeReq {
|
||||
admin_pubkey: pubkey,
|
||||
node_pubkey: daemon_key.clone(),
|
||||
contract: active_vm_id,
|
||||
reason: reason.clone(),
|
||||
};
|
||||
|
||||
let _ = client_gen_cli
|
||||
.report_node(key.sign_request(report_req).unwrap())
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
|
||||
let vm_nodes: Vec<VmNodeWithReports> = db
|
||||
.query(format!(
|
||||
"SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};"
|
||||
))
|
||||
.await
|
||||
.unwrap()
|
||||
.take(0)
|
||||
.unwrap();
|
||||
|
||||
let vm_node_with_report = vm_nodes.get(0).unwrap();
|
||||
|
||||
assert!(vm_node_with_report.reports[0].reason == reason);
|
||||
// verify report in db
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1,11 +1,17 @@
|
||||
use common::{
|
||||
prepare_test_env::{prepare_test_db, run_service_for_stream, run_service_in_background},
|
||||
prepare_test_env::{
|
||||
connect_stream_client_channel, prepare_test_db, run_service_for_stream_server,
|
||||
run_service_in_background,
|
||||
},
|
||||
test_utils::Key,
|
||||
vm_daemon_utils::{mock_vm_daemon, register_vm_node},
|
||||
};
|
||||
use detee_shared::vm_proto;
|
||||
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
|
||||
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
|
||||
use detee_shared::vm_proto::RegisterVmNodeReq;
|
||||
use futures::StreamExt;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
mod common;
|
||||
|
||||
@ -25,20 +31,144 @@ async fn test_reg_vm_node() {
|
||||
assert!(vm_contracts.is_empty())
|
||||
}
|
||||
|
||||
async fn register_vm_node(
|
||||
mut client: BrainVmDaemonClient<tonic::transport::Channel>,
|
||||
key: Key,
|
||||
operator_wallet: String,
|
||||
) -> Vec<vm_proto::VmContract> {
|
||||
let node_pubkey = key.pubkey.clone();
|
||||
|
||||
let req = RegisterVmNodeReq {
|
||||
node_pubkey,
|
||||
operator_wallet,
|
||||
main_ip: String::from("185.243.218.213"),
|
||||
city: String::from("Oslo"),
|
||||
country: String::from("Norway"),
|
||||
region: String::from("EU"),
|
||||
price: 1200,
|
||||
};
|
||||
|
||||
let mut grpc_stream =
|
||||
client.register_vm_node(key.sign_request(req).unwrap()).await.unwrap().into_inner();
|
||||
|
||||
let mut vm_contracts = Vec::new();
|
||||
while let Some(stream_update) = grpc_stream.next().await {
|
||||
match stream_update {
|
||||
Ok(vm_c) => {
|
||||
vm_contracts.push(vm_c);
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Received error instead of vm_contracts: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
vm_contracts
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_brain_message() {
|
||||
env_logger::builder().filter_level(log::LevelFilter::Info).init();
|
||||
let db = prepare_test_db().await;
|
||||
// spawn grpc stream server
|
||||
// mock a daemon
|
||||
// mock a cli client to interact with brain
|
||||
|
||||
let brain_channel = run_service_for_stream().await;
|
||||
let daemon_key = mock_vm_daemon(brain_channel.clone()).await;
|
||||
let mut cli_client = BrainVmCliClient::new(brain_channel);
|
||||
// validate if something happening in "surreal_brain::db::NewVmReq", "surreal_brain::db::UpdateVmReq", "surreal_brain::db::DeletedVm" these table
|
||||
// mock daemon will responde to brain
|
||||
|
||||
env_logger::builder().filter_level(log::LevelFilter::Info).init();
|
||||
let _ = prepare_test_db().await;
|
||||
|
||||
let (tokio_duplex, addr) = run_service_for_stream_server().await;
|
||||
|
||||
let channel = connect_stream_client_channel(tokio_duplex, addr).await;
|
||||
|
||||
let mut daemon_client = BrainVmDaemonClient::new(channel.clone());
|
||||
|
||||
let daemon_key = Key::new();
|
||||
|
||||
register_vm_node(daemon_client.clone(), daemon_key.clone(), Key::new().pubkey).await;
|
||||
|
||||
let mut daemon_join_set = JoinSet::new();
|
||||
|
||||
let (tx, mut brain_msg_rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
// listen to brain
|
||||
let mut daemon_client_01 = daemon_client.clone();
|
||||
let daemon_key_01 = daemon_key.clone();
|
||||
daemon_join_set.spawn(async move {
|
||||
let mut grpc_stream = daemon_client_01
|
||||
.brain_messages(daemon_key_01.sign_stream_auth(vec![]).unwrap())
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
|
||||
while let Some(Ok(stream_update)) = grpc_stream.next().await {
|
||||
log::info!("vm deamon got notified: {:?}", &stream_update);
|
||||
let _ = tx.send(stream_update).await;
|
||||
}
|
||||
});
|
||||
|
||||
// send to brain
|
||||
let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
let daemon_msg_tx_01 = daemon_msg_tx.clone();
|
||||
let daemon_key_02 = daemon_key.clone();
|
||||
daemon_join_set.spawn(async move {
|
||||
let rx_stream = ReceiverStream::new(rx);
|
||||
daemon_msg_tx_01
|
||||
.send(vm_proto::VmDaemonMessage {
|
||||
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(
|
||||
daemon_key_02.sign_stream_auth(vec![]).unwrap(),
|
||||
)),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
daemon_client.daemon_messages(rx_stream).await.unwrap();
|
||||
});
|
||||
|
||||
// daemon engine
|
||||
daemon_join_set.spawn(async move {
|
||||
while let Some(brain_msg) = brain_msg_rx.recv().await {
|
||||
match brain_msg.msg {
|
||||
Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
|
||||
let args = Some(vm_proto::MeasurementArgs {
|
||||
dtrfs_api_endpoint: String::from("184.107.169.199:48865"),
|
||||
exposed_ports: new_vm_req.extra_ports,
|
||||
ovmf_hash: String::from(
|
||||
"0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76",
|
||||
),
|
||||
ips: vec![],
|
||||
});
|
||||
|
||||
let new_vm_resp = vm_proto::NewVmResp {
|
||||
uuid: new_vm_req.uuid.clone(),
|
||||
args,
|
||||
error: String::new(),
|
||||
};
|
||||
|
||||
let res_data = vm_proto::VmDaemonMessage {
|
||||
msg: Some(vm_proto::vm_daemon_message::Msg::NewVmResp(new_vm_resp)),
|
||||
};
|
||||
daemon_msg_tx.send(res_data).await.unwrap();
|
||||
}
|
||||
Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
|
||||
todo!()
|
||||
}
|
||||
Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => {
|
||||
todo!()
|
||||
}
|
||||
None => todo!(),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
|
||||
let mut cli_client = BrainVmCliClient::new(channel);
|
||||
|
||||
let cli_key = Key::new();
|
||||
|
||||
let req = vm_proto::NewVmReq {
|
||||
admin_pubkey: cli_key.pubkey.clone(),
|
||||
node_pubkey: daemon_key,
|
||||
node_pubkey: daemon_key.pubkey.clone(),
|
||||
price_per_unit: 1200,
|
||||
extra_ports: vec![8080, 8081],
|
||||
locked_nano: 0,
|
||||
@ -51,7 +181,8 @@ async fn test_brain_message() {
|
||||
assert!(new_vm_resp.uuid.len() == 40);
|
||||
|
||||
let id = ("measurement_args", new_vm_resp.uuid);
|
||||
let data_in_db: detee_shared::vm_proto::MeasurementArgs = db.select(id).await.unwrap().unwrap();
|
||||
let data_in_db: detee_shared::vm_proto::MeasurementArgs =
|
||||
surreal_brain::db::DB.select(id).await.unwrap().unwrap();
|
||||
|
||||
assert_eq!(data_in_db, new_vm_resp.args.unwrap());
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user