gheorghe refactored using his ideas

This commit is contained in:
ghe0 2025-05-05 18:29:09 +03:00
parent 079b4a02aa
commit d233ace7a2
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
17 changed files with 426 additions and 458 deletions

@ -1,3 +1,3 @@
reorder_impl_items = true
use_small_heuristics = "Max"
imports_granularity = "Crate"
imports_granularity = "Module"

@ -6,9 +6,8 @@ use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
use dotenv::dotenv;
use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH};
use surreal_brain::db;
use surreal_brain::grpc::prelude::{
BrainGeneralCliForReal, BrainVmCliForReal, BrainVmDaemonForReal,
};
use surreal_brain::grpc;
use tonic::transport::{Identity, Server, ServerTlsConfig};
#[tokio::main]
@ -27,10 +26,11 @@ async fn main() {
let addr = BRAIN_GRPC_ADDR.parse().unwrap();
let snp_daemon_server = BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone()));
let snp_cli_server = BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone()));
let snp_daemon_server =
BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone()));
let snp_cli_server = BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone()));
let general_service_server =
BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone()));
BrainGeneralCliServer::new(grpc::general::BrainCliServer::new(db_arc.clone()));
let cert = std::fs::read_to_string(CERT_PATH).unwrap();
let key = std::fs::read_to_string(CERT_KEY_PATH).unwrap();

@ -4,10 +4,12 @@ use crate::db::general::Report;
use super::Error;
use crate::old_brain;
use serde::{Deserialize, Serialize};
use surrealdb::{engine::remote::ws::Client, sql::Datetime, RecordId, Surreal};
use surrealdb::engine::remote::ws::Client;
use surrealdb::sql::Datetime;
use surrealdb::{RecordId, Surreal};
#[derive(Debug, Serialize, Deserialize)]
pub struct AppNode {
pub struct Node {
pub id: RecordId,
pub operator: RecordId,
pub country: String,
@ -24,7 +26,7 @@ pub struct AppNode {
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AppNodeWithReports {
pub struct NodeWithReports {
pub id: RecordId,
pub operator: RecordId,
pub country: String,
@ -69,7 +71,7 @@ pub struct ActiveAppWithNode {
#[serde(rename = "in")]
pub admin: RecordId,
#[serde(rename = "out")]
pub app_node: AppNode,
pub app_node: Node,
pub app_name: String,
pub mapped_ports: Vec<(u64, u64)>,
pub host_ipv4: String,
@ -93,11 +95,11 @@ impl ActiveAppWithNode {
}
}
impl From<&old_brain::BrainData> for Vec<AppNode> {
impl From<&old_brain::BrainData> for Vec<Node> {
fn from(old_data: &old_brain::BrainData) -> Self {
let mut nodes = Vec::new();
for old_node in old_data.app_nodes.iter() {
nodes.push(AppNode {
nodes.push(Node {
id: RecordId::from(("app_node", old_node.node_pubkey.clone())),
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
country: old_node.country.clone(),

@ -1,114 +0,0 @@
pub use crate::constants::{
ACCOUNT, ACTIVE_APP, ACTIVE_VM, DB_SCHEMA_FILE, DELETED_VM, ID_ALPHABET, NEW_VM_REQ,
UPDATE_VM_REQ, VM_CONTRACT, VM_NODE,
};
use crate::db::prelude::*;
use crate::old_brain;
use serde::{Deserialize, Serialize};
use surrealdb::Notification;
use surrealdb::{
engine::remote::ws::{Client, Ws},
opt::auth::Root,
Surreal,
};
use tokio::sync::mpsc::Sender;
use tokio_stream::StreamExt;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Internal DB error: {0}")]
DataBase(#[from] surrealdb::Error),
#[error("Daemon channel got closed: {0}")]
DaemonConnection(#[from] tokio::sync::mpsc::error::SendError<DaemonNotification>),
#[error(transparent)]
StdIo(#[from] std::io::Error),
#[error(transparent)]
TimeOut(#[from] tokio::time::error::Elapsed),
}
pub async fn db_connection(
db_address: &str,
username: &str,
password: &str,
ns: &str,
db: &str,
) -> Result<Surreal<Client>, Error> {
let db_connection: Surreal<Client> = Surreal::init();
db_connection.connect::<Ws>(db_address).await?;
// Sign in to the server
db_connection.signin(Root { username, password }).await?;
db_connection.use_ns(ns).use_db(db).await?;
Ok(db_connection)
}
pub async fn migration0(
db: &Surreal<Client>,
old_data: &old_brain::BrainData,
) -> Result<(), Error> {
let accounts: Vec<Account> = old_data.into();
let vm_nodes: Vec<VmNode> = old_data.into();
let app_nodes: Vec<AppNode> = old_data.into();
let vm_contracts: Vec<ActiveVm> = old_data.into();
let schema = std::fs::read_to_string(DB_SCHEMA_FILE)?;
db.query(schema).await?;
println!("Inserting accounts...");
let _: Vec<Account> = db.insert(()).content(accounts).await?;
println!("Inserting vm nodes...");
let _: Vec<VmNode> = db.insert(()).content(vm_nodes).await?;
println!("Inserting app nodes...");
let _: Vec<AppNode> = db.insert(()).content(app_nodes).await?;
println!("Inserting vm contracts...");
let _: Vec<ActiveVm> = db.insert("vm_contract").relation(vm_contracts).await?;
Ok(())
}
pub async fn upsert_record<SomeRecord: Serialize + 'static>(
db: &Surreal<Client>,
table: &str,
id: &str,
my_record: SomeRecord,
) -> Result<(), Error> {
#[derive(Deserialize)]
struct Wrapper {}
let _: Option<Wrapper> = db.create((table, id)).content(my_record).await?;
Ok(())
}
pub async fn listen_for_node<
T: Into<DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>,
>(
db: &Surreal<Client>,
node: &str,
tx: Sender<DaemonNotification>,
) -> Result<(), Error> {
let table_name = match std::any::type_name::<T>() {
"surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(),
"surreal_brain::db::UpdateVmReq" => UPDATE_VM_REQ.to_string(),
"surreal_brain::db::DeletedVm" => DELETED_VM.to_string(),
wat => {
log::error!("listen_for_node: T has type {wat}");
String::from("wat")
}
};
let mut resp =
db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?;
let mut live_stream = resp.stream::<Notification<T>>(0)?;
while let Some(result) = live_stream.next().await {
match result {
Ok(notification) => {
if notification.action == surrealdb::Action::Create {
tx.send(notification.data.into()).await?
}
}
Err(e) => {
log::warn!("listen_for_deletion DB stream failed for {node}: {e}");
return Err(Error::from(e));
}
}
}
Ok(())
}

@ -1,11 +1,13 @@
use crate::constants::ACCOUNT;
use crate::db::app::AppNodeWithReports;
use crate::db::vm::VmNodeWithReports;
use crate::db::app;
use crate::db::vm;
use super::Error;
use crate::old_brain;
use serde::{Deserialize, Serialize};
use surrealdb::{engine::remote::ws::Client, sql::Datetime, RecordId, Surreal};
use surrealdb::engine::remote::ws::Client;
use surrealdb::sql::Datetime;
use surrealdb::{RecordId, Surreal};
#[derive(Debug, Serialize, Deserialize)]
pub struct Account {
@ -191,7 +193,7 @@ impl Operator {
pub async fn inspect_nodes(
db: &Surreal<Client>,
account: &str,
) -> Result<(Option<Self>, Vec<VmNodeWithReports>, Vec<AppNodeWithReports>), Error> {
) -> Result<(Option<Self>, Vec<vm::NodeWithReports>, Vec<app::NodeWithReports>), Error> {
let operator = Self::inspect(db, account).await?;
let mut result = db
.query(format!(
@ -203,8 +205,8 @@ impl Operator {
where operator = account:{account};"
))
.await?;
let vm_nodes: Vec<VmNodeWithReports> = result.take(0)?;
let app_nodes: Vec<AppNodeWithReports> = result.take(1)?;
let vm_nodes: Vec<vm::NodeWithReports> = result.take(0)?;
let app_nodes: Vec<app::NodeWithReports> = result.take(1)?;
Ok((operator, vm_nodes, app_nodes))
}

@ -1,21 +1,110 @@
pub mod app;
pub mod common;
pub mod general;
pub mod vm;
pub use prelude::*;
pub mod prelude {
pub use super::app::{ActiveAppWithNode, AppNode, AppNodeWithReports};
pub use super::common::listen_for_node;
pub use super::common::{db_connection, migration0, upsert_record, Error};
pub use super::general::{Account, Operator, Report};
pub use super::vm::{
ActiveVm, ActiveVmWithNode, DaemonNotification, DeletedVm, NewVmReq, NewVmResp,
UpdateVmReq, VmNode, VmNodeResources, VmNodeWithReports,
};
use crate::constants::{DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ};
use crate::old_brain;
use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::{Client, Ws};
use surrealdb::opt::auth::Root;
use surrealdb::{Notification, Surreal};
use tokio::sync::mpsc::Sender;
use tokio_stream::StreamExt;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Internal DB error: {0}")]
DataBase(#[from] surrealdb::Error),
#[error("Daemon channel got closed: {0}")]
VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<vm::DaemonNotification>),
#[error(transparent)]
StdIo(#[from] std::io::Error),
#[error(transparent)]
TimeOut(#[from] tokio::time::error::Elapsed),
}
pub use crate::constants::{
ACCOUNT, ACTIVE_APP, ACTIVE_VM, DB_SCHEMA_FILE, DELETED_VM, ID_ALPHABET, NEW_VM_REQ,
UPDATE_VM_REQ, VM_CONTRACT, VM_NODE,
pub async fn db_connection(
db_address: &str,
username: &str,
password: &str,
ns: &str,
db: &str,
) -> Result<Surreal<Client>, Error> {
let db_connection: Surreal<Client> = Surreal::init();
db_connection.connect::<Ws>(db_address).await?;
// Sign in to the server
db_connection.signin(Root { username, password }).await?;
db_connection.use_ns(ns).use_db(db).await?;
Ok(db_connection)
}
pub async fn migration0(
db: &Surreal<Client>,
old_data: &old_brain::BrainData,
) -> Result<(), Error> {
let accounts: Vec<crate::db::general::Account> = old_data.into();
let vm_nodes: Vec<crate::db::vm::Node> = old_data.into();
let app_nodes: Vec<crate::db::app::Node> = old_data.into();
let vm_contracts: Vec<crate::db::vm::ActiveVm> = old_data.into();
let schema = std::fs::read_to_string(crate::constants::DB_SCHEMA_FILE)?;
db.query(schema).await?;
println!("Inserting accounts...");
let _: Vec<crate::db::general::Account> = db.insert(()).content(accounts).await?;
println!("Inserting vm nodes...");
let _: Vec<crate::db::vm::Node> = db.insert(()).content(vm_nodes).await?;
println!("Inserting app nodes...");
let _: Vec<crate::db::app::Node> = db.insert(()).content(app_nodes).await?;
println!("Inserting vm contracts...");
let _: Vec<crate::db::vm::ActiveVm> = db.insert("vm_contract").relation(vm_contracts).await?;
Ok(())
}
pub async fn upsert_record<SomeRecord: Serialize + 'static>(
db: &Surreal<Client>,
table: &str,
id: &str,
my_record: SomeRecord,
) -> Result<(), Error> {
#[derive(Deserialize)]
struct Wrapper {}
let _: Option<Wrapper> = db.create((table, id)).content(my_record).await?;
Ok(())
}
pub async fn listen_for_vm_node<
T: Into<vm::DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>,
>(
db: &Surreal<Client>,
node: &str,
tx: Sender<vm::DaemonNotification>,
) -> Result<(), Error> {
let table_name = match std::any::type_name::<T>() {
"surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(),
"surreal_brain::db::UpdateVmReq" => UPDATE_VM_REQ.to_string(),
"surreal_brain::db::DeletedVm" => DELETED_VM.to_string(),
wat => {
log::error!("listen_for_node: T has type {wat}");
String::from("wat")
}
};
let mut resp =
db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?;
let mut live_stream = resp.stream::<Notification<T>>(0)?;
while let Some(result) = live_stream.next().await {
match result {
Ok(notification) => {
if notification.action == surrealdb::Action::Create {
tx.send(notification.data.into()).await?
}
}
Err(e) => {
log::warn!("listen_for_deletion DB stream failed for {node}: {e}");
return Err(Error::from(e));
}
}
}
Ok(())
}

@ -1,15 +1,18 @@
use std::{str::FromStr, time::Duration};
use std::str::FromStr;
use std::time::Duration;
use super::Error;
use crate::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, VM_NODE};
use crate::db::general::Report;
use crate::old_brain;
use serde::{Deserialize, Serialize};
use surrealdb::{engine::remote::ws::Client, sql::Datetime, Notification, RecordId, Surreal};
use surrealdb::engine::remote::ws::Client;
use surrealdb::sql::Datetime;
use surrealdb::{Notification, RecordId, Surreal};
use tokio_stream::StreamExt as _;
#[derive(Debug, Serialize, Deserialize)]
pub struct VmNode {
pub struct Node {
pub id: RecordId,
pub operator: RecordId,
pub country: String,
@ -28,7 +31,7 @@ pub struct VmNode {
}
#[derive(Serialize)]
pub struct VmNodeResources {
pub struct NodeResources {
pub avail_mem_mb: u32,
pub avail_vcpus: u32,
pub avail_storage_gbs: u32,
@ -38,22 +41,22 @@ pub struct VmNodeResources {
pub max_ports_per_vm: u32,
}
impl VmNodeResources {
impl NodeResources {
pub async fn merge(self, db: &Surreal<Client>, node_id: &str) -> Result<(), Error> {
let _: Option<VmNode> = db.update((VM_NODE, node_id)).merge(self).await?;
let _: Option<Node> = db.update((VM_NODE, node_id)).merge(self).await?;
Ok(())
}
}
impl VmNode {
impl Node {
pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> {
let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?;
let _: Option<Node> = db.upsert(self.id.clone()).content(self).await?;
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct VmNodeWithReports {
pub struct NodeWithReports {
pub id: RecordId,
pub operator: RecordId,
pub country: String,
@ -72,7 +75,7 @@ pub struct VmNodeWithReports {
pub reports: Vec<Report>,
}
impl VmNodeWithReports {
impl NodeWithReports {
// TODO: find a more elegant way to do this than importing gRPC in the DB module
// https://en.wikipedia.org/wiki/Dependency_inversion_principle
pub async fn find_by_filters(
@ -452,7 +455,7 @@ pub struct ActiveVmWithNode {
#[serde(rename = "in")]
pub admin: RecordId,
#[serde(rename = "out")]
pub vm_node: VmNode,
pub vm_node: Node,
pub hostname: String,
pub mapped_ports: Vec<(u32, u32)>,
pub public_ipv4: String,
@ -534,11 +537,11 @@ impl ActiveVmWithNode {
// TODO: delete all of these From implementation after migration 0 gets executed
impl From<&old_brain::BrainData> for Vec<VmNode> {
impl From<&old_brain::BrainData> for Vec<Node> {
fn from(old_data: &old_brain::BrainData) -> Self {
let mut nodes = Vec::new();
for old_node in old_data.vm_nodes.iter() {
nodes.push(VmNode {
nodes.push(Node {
id: RecordId::from((VM_NODE, old_node.public_key.clone())),
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
country: old_node.country.clone(),

@ -1,173 +0,0 @@
pub mod types;
use crate::constants::ADMIN_ACCOUNTS;
use detee_shared::{
common_proto::{Empty, Pubkey},
general_proto::{AirdropReq, BanUserReq, KickReq, RegOperatorReq, ReportNodeReq, SlashReq},
vm_proto::{ListVmContractsReq, *},
};
use tonic::{Request, Status};
pub trait PubkeyGetter {
fn get_pubkey(&self) -> Option<String>;
}
macro_rules! impl_pubkey_getter {
($t:ty, $field:ident) => {
impl PubkeyGetter for $t {
fn get_pubkey(&self) -> Option<String> {
Some(self.$field.clone())
}
}
};
($t:ty) => {
impl PubkeyGetter for $t {
fn get_pubkey(&self) -> Option<String> {
None
}
}
};
}
impl_pubkey_getter!(Pubkey, pubkey);
impl_pubkey_getter!(NewVmReq, admin_pubkey);
impl_pubkey_getter!(DeleteVmReq, admin_pubkey);
impl_pubkey_getter!(UpdateVmReq, admin_pubkey);
impl_pubkey_getter!(ExtendVmReq, admin_pubkey);
impl_pubkey_getter!(ReportNodeReq, admin_pubkey);
impl_pubkey_getter!(ListVmContractsReq, wallet);
impl_pubkey_getter!(RegisterVmNodeReq, node_pubkey);
impl_pubkey_getter!(RegOperatorReq, pubkey);
impl_pubkey_getter!(KickReq, operator_wallet);
impl_pubkey_getter!(BanUserReq, operator_wallet);
impl_pubkey_getter!(VmNodeFilters);
impl_pubkey_getter!(Empty);
impl_pubkey_getter!(AirdropReq);
impl_pubkey_getter!(SlashReq);
// impl_pubkey_getter!(NewAppReq, admin_pubkey);
// impl_pubkey_getter!(DelAppReq, admin_pubkey);
// impl_pubkey_getter!(ListAppContractsReq, admin_pubkey);
//
// impl_pubkey_getter!(RegisterAppNodeReq);
// impl_pubkey_getter!(AppNodeFilters);
pub fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Result<T, Status> {
let time = match req.metadata().get("timestamp") {
Some(t) => t.clone(),
None => return Err(Status::unauthenticated("Timestamp not found in metadata.")),
};
let time = time
.to_str()
.map_err(|_| Status::unauthenticated("Timestamp in metadata is not a string"))?;
let now = chrono::Utc::now();
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds();
if !(-4..=4).contains(&seconds_elapsed) {
return Err(Status::unauthenticated(format!(
"Date is not within 4 sec of the time of the server: CLI {} vs Server {}",
parsed_time, now
)));
}
let signature = match req.metadata().get("request-signature") {
Some(t) => t,
None => return Err(Status::unauthenticated("signature not found in metadata.")),
};
let signature = bs58::decode(signature)
.into_vec()
.map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?;
let signature = ed25519_dalek::Signature::from_bytes(
signature
.as_slice()
.try_into()
.map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?,
);
let pubkey_value = match req.metadata().get("pubkey") {
Some(p) => p.clone(),
None => return Err(Status::unauthenticated("pubkey not found in metadata.")),
};
let pubkey = ed25519_dalek::VerifyingKey::from_bytes(
&bs58::decode(&pubkey_value)
.into_vec()
.map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))?
.try_into()
.map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?,
)
.map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?;
let req = req.into_inner();
let message = format!("{time}{req:?}");
use ed25519_dalek::Verifier;
pubkey
.verify(message.as_bytes(), &signature)
.map_err(|_| Status::unauthenticated("the signature is not valid"))?;
if let Some(req_pubkey) = req.get_pubkey() {
if *pubkey_value.to_str().unwrap() != req_pubkey {
return Err(Status::unauthenticated(
"pubkey of signature does not match pubkey of request",
));
}
}
Ok(req)
}
pub fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> {
let now = chrono::Utc::now();
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds();
if !(-4..=4).contains(&seconds_elapsed) {
return Err(Status::unauthenticated(format!(
"Date is not within 4 sec of the time of the server: CLI {} vs Server {}",
parsed_time, now
)));
}
let signature = bs58::decode(sig)
.into_vec()
.map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?;
let signature = ed25519_dalek::Signature::from_bytes(
signature
.as_slice()
.try_into()
.map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?,
);
let pubkey = ed25519_dalek::VerifyingKey::from_bytes(
&bs58::decode(&pubkey)
.into_vec()
.map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))?
.try_into()
.map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?,
)
.map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?;
let msg = time.to_string() + msg;
use ed25519_dalek::Verifier;
pubkey
.verify(msg.as_bytes(), &signature)
.map_err(|_| Status::unauthenticated("the signature is not valid"))?;
Ok(())
}
pub fn check_admin_key<T>(req: &Request<T>) -> Result<(), Status> {
let pubkey = match req.metadata().get("pubkey") {
Some(p) => p.clone(),
None => return Err(Status::unauthenticated("pubkey not found in metadata.")),
};
let pubkey = pubkey
.to_str()
.map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?;
if !ADMIN_ACCOUNTS.contains(&pubkey) {
return Err(Status::unauthenticated("This operation is reserved to admin accounts"));
}
Ok(())
}

@ -1,16 +1,15 @@
use crate::db;
use crate::grpc::prelude::*;
use crate::grpc::{check_admin_key, check_sig_from_req};
use detee_shared::app_proto::AppContract;
use detee_shared::{
common_proto::{Empty, Pubkey},
general_proto::{
brain_general_cli_server::BrainGeneralCli, Account, AccountBalance, AirdropReq, BanUserReq,
InspectOperatorResp, KickReq, KickResp, ListOperatorsResp, RegOperatorReq, ReportNodeReq,
SlashReq,
},
vm_proto::VmContract,
use detee_shared::common_proto::{Empty, Pubkey};
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCli;
use detee_shared::general_proto::{
Account, AccountBalance, AirdropReq, BanUserReq, InspectOperatorResp, KickReq, KickResp,
ListOperatorsResp, RegOperatorReq, ReportNodeReq, SlashReq,
};
use surrealdb::{engine::remote::ws::Client, Surreal};
use detee_shared::vm_proto::VmContract;
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use std::pin::Pin;
use std::sync::Arc;
@ -19,18 +18,18 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
use tonic::{Request, Response, Status};
pub struct BrainGeneralCliForReal {
pub struct BrainCliServer {
pub db: Arc<Surreal<Client>>,
}
impl BrainGeneralCliForReal {
impl BrainCliServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db }
}
}
#[tonic::async_trait]
impl BrainGeneralCli for BrainGeneralCliForReal {
impl BrainGeneralCli for BrainCliServer {
type ListAccountsStream = Pin<Box<dyn Stream<Item = Result<Account, Status>> + Send>>;
type ListAllAppContractsStream =
Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
@ -40,20 +39,20 @@ impl BrainGeneralCli for BrainGeneralCliForReal {
async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> {
let req = check_sig_from_req(req)?;
Ok(Response::new(db::Account::get(&self.db, &req.pubkey).await?.into()))
Ok(Response::new(db::general::Account::get(&self.db, &req.pubkey).await?.into()))
}
async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
let (account, node, contract_id) =
match db::ActiveVmWithNode::get_by_uuid(&self.db, &req.contract).await? {
match db::vm::ActiveVmWithNode::get_by_uuid(&self.db, &req.contract).await? {
Some(vm_contract)
if vm_contract.admin.key().to_string() == req.admin_pubkey
&& vm_contract.vm_node.id.key().to_string() == req.node_pubkey =>
{
(vm_contract.admin, vm_contract.vm_node.id, vm_contract.id.to_string())
}
_ => match db::ActiveAppWithNode::get_by_uuid(&self.db, &req.contract).await? {
_ => match db::app::ActiveAppWithNode::get_by_uuid(&self.db, &req.contract).await? {
Some(app_contract)
if app_contract.admin.key().to_string() == req.admin_pubkey
&& app_contract.app_node.id.key().to_string() == req.node_pubkey =>
@ -65,7 +64,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal {
}
},
};
db::Report::create(&self.db, account, node, req.reason, contract_id).await?;
db::general::Report::create(&self.db, account, node, req.reason, contract_id).await?;
Ok(Response::new(Empty {}))
}
@ -74,7 +73,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal {
req: Request<Empty>,
) -> Result<Response<Self::ListOperatorsStream>, Status> {
let _ = check_sig_from_req(req)?;
let operators = db::Operator::list(&self.db).await?;
let operators = db::general::Operator::list(&self.db).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for op in operators {
@ -89,7 +88,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal {
&self,
req: Request<Pubkey>,
) -> Result<Response<InspectOperatorResp>, Status> {
match db::Operator::inspect_nodes(&self.db, &req.into_inner().pubkey).await? {
match db::general::Operator::inspect_nodes(&self.db, &req.into_inner().pubkey).await? {
(Some(op), vm_nodes, app_nodes) => Ok(Response::new(InspectOperatorResp {
operator: Some(op.into()),
vm_nodes: vm_nodes.into_iter().map(|n| n.into()).collect(),
@ -133,7 +132,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal {
async fn airdrop(&self, req: Request<AirdropReq>) -> Result<Response<Empty>, Status> {
check_admin_key(&req)?;
let req = check_sig_from_req(req)?;
db::Account::airdrop(&self.db, &req.pubkey, req.tokens).await?;
db::general::Account::airdrop(&self.db, &req.pubkey, req.tokens).await?;
Ok(Response::new(Empty {}))
}

@ -1,12 +1,176 @@
pub mod app;
pub mod common;
pub mod general;
pub mod types;
pub mod vm;
pub mod prelude {
pub use super::common::{
check_admin_key, check_sig_from_parts, check_sig_from_req, PubkeyGetter,
use crate::constants::ADMIN_ACCOUNTS;
use detee_shared::common_proto::{Empty, Pubkey};
use detee_shared::general_proto::{
AirdropReq, BanUserReq, KickReq, RegOperatorReq, ReportNodeReq, SlashReq,
};
pub use super::general::BrainGeneralCliForReal;
pub use super::vm::{BrainVmCliForReal, BrainVmDaemonForReal};
use detee_shared::vm_proto::{ListVmContractsReq, *};
use tonic::{Request, Status};
pub trait PubkeyGetter {
fn get_pubkey(&self) -> Option<String>;
}
macro_rules! impl_pubkey_getter {
($t:ty, $field:ident) => {
impl PubkeyGetter for $t {
fn get_pubkey(&self) -> Option<String> {
Some(self.$field.clone())
}
}
};
($t:ty) => {
impl PubkeyGetter for $t {
fn get_pubkey(&self) -> Option<String> {
None
}
}
};
}
impl_pubkey_getter!(Pubkey, pubkey);
impl_pubkey_getter!(NewVmReq, admin_pubkey);
impl_pubkey_getter!(DeleteVmReq, admin_pubkey);
impl_pubkey_getter!(UpdateVmReq, admin_pubkey);
impl_pubkey_getter!(ExtendVmReq, admin_pubkey);
impl_pubkey_getter!(ReportNodeReq, admin_pubkey);
impl_pubkey_getter!(ListVmContractsReq, wallet);
impl_pubkey_getter!(RegisterVmNodeReq, node_pubkey);
impl_pubkey_getter!(RegOperatorReq, pubkey);
impl_pubkey_getter!(KickReq, operator_wallet);
impl_pubkey_getter!(BanUserReq, operator_wallet);
impl_pubkey_getter!(VmNodeFilters);
impl_pubkey_getter!(Empty);
impl_pubkey_getter!(AirdropReq);
impl_pubkey_getter!(SlashReq);
// impl_pubkey_getter!(NewAppReq, admin_pubkey);
// impl_pubkey_getter!(DelAppReq, admin_pubkey);
// impl_pubkey_getter!(ListAppContractsReq, admin_pubkey);
//
// impl_pubkey_getter!(RegisterAppNodeReq);
// impl_pubkey_getter!(AppNodeFilters);
pub fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Result<T, Status> {
let time = match req.metadata().get("timestamp") {
Some(t) => t.clone(),
None => return Err(Status::unauthenticated("Timestamp not found in metadata.")),
};
let time = time
.to_str()
.map_err(|_| Status::unauthenticated("Timestamp in metadata is not a string"))?;
let now = chrono::Utc::now();
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds();
if !(-4..=4).contains(&seconds_elapsed) {
return Err(Status::unauthenticated(format!(
"Date is not within 4 sec of the time of the server: CLI {} vs Server {}",
parsed_time, now
)));
}
let signature = match req.metadata().get("request-signature") {
Some(t) => t,
None => return Err(Status::unauthenticated("signature not found in metadata.")),
};
let signature = bs58::decode(signature)
.into_vec()
.map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?;
let signature = ed25519_dalek::Signature::from_bytes(
signature
.as_slice()
.try_into()
.map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?,
);
let pubkey_value = match req.metadata().get("pubkey") {
Some(p) => p.clone(),
None => return Err(Status::unauthenticated("pubkey not found in metadata.")),
};
let pubkey = ed25519_dalek::VerifyingKey::from_bytes(
&bs58::decode(&pubkey_value)
.into_vec()
.map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))?
.try_into()
.map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?,
)
.map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?;
let req = req.into_inner();
let message = format!("{time}{req:?}");
use ed25519_dalek::Verifier;
pubkey
.verify(message.as_bytes(), &signature)
.map_err(|_| Status::unauthenticated("the signature is not valid"))?;
if let Some(req_pubkey) = req.get_pubkey() {
if *pubkey_value.to_str().unwrap() != req_pubkey {
return Err(Status::unauthenticated(
"pubkey of signature does not match pubkey of request",
));
}
}
Ok(req)
}
pub fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> {
let now = chrono::Utc::now();
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds();
if !(-4..=4).contains(&seconds_elapsed) {
return Err(Status::unauthenticated(format!(
"Date is not within 4 sec of the time of the server: CLI {} vs Server {}",
parsed_time, now
)));
}
let signature = bs58::decode(sig)
.into_vec()
.map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?;
let signature = ed25519_dalek::Signature::from_bytes(
signature
.as_slice()
.try_into()
.map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?,
);
let pubkey = ed25519_dalek::VerifyingKey::from_bytes(
&bs58::decode(&pubkey)
.into_vec()
.map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))?
.try_into()
.map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?,
)
.map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?;
let msg = time.to_string() + msg;
use ed25519_dalek::Verifier;
pubkey
.verify(msg.as_bytes(), &signature)
.map_err(|_| Status::unauthenticated("the signature is not valid"))?;
Ok(())
}
pub fn check_admin_key<T>(req: &Request<T>) -> Result<(), Status> {
let pubkey = match req.metadata().get("pubkey") {
Some(p) => p.clone(),
None => return Err(Status::unauthenticated("pubkey not found in metadata.")),
};
let pubkey = pubkey
.to_str()
.map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?;
if !ADMIN_ACCOUNTS.contains(&pubkey) {
return Err(Status::unauthenticated("This operation is reserved to admin accounts"));
}
Ok(())
}

@ -1,25 +1,24 @@
use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE};
use crate::db;
use detee_shared::app_proto::AppNodeListResp;
use detee_shared::{
general_proto::{AccountBalance, ListOperatorsResp},
vm_proto::*,
};
use detee_shared::general_proto::{AccountBalance, ListOperatorsResp};
use detee_shared::vm_proto::*;
use nanoid::nanoid;
use surrealdb::RecordId;
impl From<db::Account> for AccountBalance {
fn from(account: db::Account) -> Self {
impl From<db::general::Account> for AccountBalance {
fn from(account: db::general::Account) -> Self {
AccountBalance { balance: account.balance, tmp_locked: account.tmp_locked }
}
}
impl From<NewVmReq> for db::NewVmReq {
impl From<NewVmReq> for db::vm::NewVmReq {
fn from(new_vm_req: NewVmReq) -> Self {
Self {
id: RecordId::from((db::NEW_VM_REQ, nanoid!(40, &db::ID_ALPHABET))),
admin: RecordId::from((db::ACCOUNT, new_vm_req.admin_pubkey)),
vm_node: RecordId::from((db::VM_NODE, new_vm_req.node_pubkey)),
id: RecordId::from((NEW_VM_REQ, nanoid!(40, &ID_ALPHABET))),
admin: RecordId::from((ACCOUNT, new_vm_req.admin_pubkey)),
vm_node: RecordId::from((VM_NODE, new_vm_req.node_pubkey)),
hostname: new_vm_req.hostname,
extra_ports: new_vm_req.extra_ports,
public_ipv4: new_vm_req.public_ipv4,
@ -39,8 +38,8 @@ impl From<NewVmReq> for db::NewVmReq {
}
}
impl From<db::NewVmReq> for NewVmReq {
fn from(new_vm_req: db::NewVmReq) -> Self {
impl From<db::vm::NewVmReq> for NewVmReq {
fn from(new_vm_req: db::vm::NewVmReq) -> Self {
Self {
uuid: new_vm_req.id.key().to_string(),
hostname: new_vm_req.hostname,
@ -62,21 +61,21 @@ impl From<db::NewVmReq> for NewVmReq {
}
}
impl From<db::NewVmResp> for NewVmResp {
fn from(resp: db::NewVmResp) -> Self {
impl From<db::vm::NewVmResp> for NewVmResp {
fn from(resp: db::vm::NewVmResp) -> Self {
match resp {
// TODO: This will require a small architecture change to pass MeasurementArgs from
// Daemon to CLI
db::NewVmResp::Args(uuid, args) => {
db::vm::NewVmResp::Args(uuid, args) => {
NewVmResp { uuid, error: String::new(), args: Some(args) }
}
db::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None },
db::vm::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None },
}
}
}
impl From<db::UpdateVmReq> for UpdateVmReq {
fn from(update_vm_req: db::UpdateVmReq) -> Self {
impl From<db::vm::UpdateVmReq> for UpdateVmReq {
fn from(update_vm_req: db::vm::UpdateVmReq) -> Self {
Self {
uuid: update_vm_req.id.key().to_string(),
// daemon does not care about VM hostname
@ -93,8 +92,8 @@ impl From<db::UpdateVmReq> for UpdateVmReq {
}
}
impl From<db::DeletedVm> for DeleteVmReq {
fn from(delete_vm_req: db::DeletedVm) -> Self {
impl From<db::vm::DeletedVm> for DeleteVmReq {
fn from(delete_vm_req: db::vm::DeletedVm) -> Self {
Self {
uuid: delete_vm_req.id.key().to_string(),
admin_pubkey: delete_vm_req.admin.key().to_string(),
@ -102,24 +101,24 @@ impl From<db::DeletedVm> for DeleteVmReq {
}
}
impl From<db::DaemonNotification> for BrainVmMessage {
fn from(notification: db::DaemonNotification) -> Self {
impl From<db::vm::DaemonNotification> for BrainVmMessage {
fn from(notification: db::vm::DaemonNotification) -> Self {
match notification {
db::DaemonNotification::Create(new_vm_req) => {
db::vm::DaemonNotification::Create(new_vm_req) => {
BrainVmMessage { msg: Some(brain_vm_message::Msg::NewVmReq(new_vm_req.into())) }
}
db::DaemonNotification::Update(update_vm_req) => BrainVmMessage {
db::vm::DaemonNotification::Update(update_vm_req) => BrainVmMessage {
msg: Some(brain_vm_message::Msg::UpdateVmReq(update_vm_req.into())),
},
db::DaemonNotification::Delete(deleted_vm) => {
db::vm::DaemonNotification::Delete(deleted_vm) => {
BrainVmMessage { msg: Some(brain_vm_message::Msg::DeleteVm(deleted_vm.into())) }
}
}
}
}
impl From<db::ActiveVmWithNode> for VmContract {
fn from(db_c: db::ActiveVmWithNode) -> Self {
impl From<db::vm::ActiveVmWithNode> for VmContract {
fn from(db_c: db::vm::ActiveVmWithNode) -> Self {
let mut exposed_ports = Vec::new();
for port in db_c.mapped_ports.iter() {
exposed_ports.push(port.0);
@ -164,8 +163,8 @@ impl From<db::Error> for tonic::Status {
}
}
impl From<db::Operator> for ListOperatorsResp {
fn from(db_o: db::Operator) -> Self {
impl From<db::general::Operator> for ListOperatorsResp {
fn from(db_o: db::general::Operator) -> Self {
ListOperatorsResp {
pubkey: db_o.account.key().to_string(),
escrow: db_o.escrow,
@ -177,8 +176,8 @@ impl From<db::Operator> for ListOperatorsResp {
}
}
impl From<db::VmNodeWithReports> for VmNodeListResp {
fn from(vm_node: db::VmNodeWithReports) -> Self {
impl From<db::vm::NodeWithReports> for VmNodeListResp {
fn from(vm_node: db::vm::NodeWithReports) -> Self {
Self {
operator: vm_node.operator.key().to_string(),
node_pubkey: vm_node.id.key().to_string(),
@ -192,8 +191,8 @@ impl From<db::VmNodeWithReports> for VmNodeListResp {
}
}
impl From<db::AppNodeWithReports> for AppNodeListResp {
fn from(app_node: db::AppNodeWithReports) -> Self {
impl From<db::app::NodeWithReports> for AppNodeListResp {
fn from(app_node: db::app::NodeWithReports) -> Self {
Self {
operator: app_node.operator.key().to_string(),
node_pubkey: app_node.id.key().to_string(),
@ -207,7 +206,7 @@ impl From<db::AppNodeWithReports> for AppNodeListResp {
}
}
impl From<VmNodeResources> for db::VmNodeResources {
impl From<VmNodeResources> for db::vm::NodeResources {
fn from(res: VmNodeResources) -> Self {
Self {
avail_mem_mb: res.avail_memory_mb,

@ -1,15 +1,13 @@
#![allow(dead_code)]
use crate::constants::{ACCOUNT, VM_NODE};
use crate::db;
use crate::grpc::prelude::*;
use detee_shared::{
common_proto::Empty,
vm_proto::{
brain_vm_cli_server::BrainVmCli, brain_vm_daemon_server::BrainVmDaemon, ListVmContractsReq,
*,
},
};
use surrealdb::{engine::remote::ws::Client, Surreal};
use crate::grpc::{check_sig_from_parts, check_sig_from_req};
use detee_shared::common_proto::Empty;
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli;
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemon;
use detee_shared::vm_proto::{ListVmContractsReq, *};
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use log::info;
use std::pin::Pin;
@ -19,26 +17,28 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};
pub struct BrainVmDaemonForReal {
pub struct BrainDaemonServer {
pub db: Arc<Surreal<Client>>,
}
impl BrainVmDaemonForReal {
impl BrainDaemonServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db }
}
}
#[tonic::async_trait]
impl BrainVmDaemon for BrainVmDaemonForReal {
impl BrainVmDaemon for BrainDaemonServer {
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainVmMessage, Status>> + Send>>;
type RegisterVmNodeStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn register_vm_node(
&self,
req: Request<RegisterVmNodeReq>,
) -> Result<Response<Self::RegisterVmNodeStream>, Status> {
let req = check_sig_from_req(req)?;
info!("Starting registration process for {:?}", req);
db::VmNode {
db::vm::Node {
id: surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())),
operator: surrealdb::RecordId::from((ACCOUNT, req.operator_wallet)),
country: req.country,
@ -59,7 +59,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
.await?;
info!("Sending existing contracts to {}", req.node_pubkey);
let contracts = db::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?;
let contracts = db::vm::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
@ -70,7 +70,6 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
Ok(Response::new(Box::pin(output_stream) as Self::RegisterVmNodeStream))
}
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainVmMessage, Status>> + Send>>;
async fn brain_messages(
&self,
req: Request<DaemonStreamAuth>,
@ -91,7 +90,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
let pubkey = pubkey.clone();
let tx = tx.clone();
tokio::spawn(async move {
match db::listen_for_node::<db::DeletedVm>(&db, &pubkey, tx).await {
match db::listen_for_vm_node::<db::vm::DeletedVm>(&db, &pubkey, tx).await {
Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"),
Err(e) => {
log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}")
@ -104,7 +103,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
let pubkey = pubkey.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _ = db::listen_for_node::<db::NewVmReq>(&db, &pubkey, tx.clone()).await;
let _ = db::listen_for_vm_node::<db::vm::NewVmReq>(&db, &pubkey, tx.clone()).await;
});
}
{
@ -112,7 +111,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
let pubkey = pubkey.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _ = db::listen_for_node::<db::UpdateVmReq>(&db, &pubkey, tx.clone()).await;
let _ = db::listen_for_vm_node::<db::vm::UpdateVmReq>(&db, &pubkey, tx.clone()).await;
});
}
@ -152,7 +151,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
// TODO: move new_vm_req to active_vm
// also handle failure properly
if !new_vm_resp.error.is_empty() {
db::NewVmReq::submit_error(
db::vm::NewVmReq::submit_error(
&self.db,
&new_vm_resp.uuid,
new_vm_resp.error,
@ -167,7 +166,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
)
.await?;
if let Some(args) = new_vm_resp.args {
db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?;
db::vm::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?;
}
}
}
@ -176,7 +175,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
// self.data.submit_updatevm_resp(update_vm_resp).await;
}
Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => {
let node_resources: db::VmNodeResources = node_resources.into();
let node_resources: db::vm::NodeResources = node_resources.into();
node_resources.merge(&self.db, &pubkey).await?;
}
_ => {}
@ -190,36 +189,36 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
}
}
pub struct BrainVmCliForReal {
pub struct BrainCliServer {
pub db: Arc<Surreal<Client>>,
}
impl BrainVmCliForReal {
impl BrainCliServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db }
}
}
#[tonic::async_trait]
impl BrainVmCli for BrainVmCliForReal {
impl BrainVmCli for BrainCliServer {
type ListVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
type ListVmNodesStream = Pin<Box<dyn Stream<Item = Result<VmNodeListResp, Status>> + Send>>;
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
let req = check_sig_from_req(req)?;
info!("New VM requested via CLI: {req:?}");
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
if db::general::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
return Err(Status::permission_denied("This operator banned you. What did you do?"));
}
let new_vm_req: db::NewVmReq = req.into();
let new_vm_req: db::vm::NewVmReq = req.into();
let id = new_vm_req.id.key().to_string();
let db = self.db.clone();
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let _ = oneshot_tx.send(db::NewVmResp::listen(&db, &id).await);
let _ = oneshot_tx.send(db::vm::NewVmResp::listen(&db, &id).await);
});
new_vm_req.submit(&self.db).await?;
@ -285,7 +284,7 @@ impl BrainVmCli for BrainVmCliForReal {
let mut contracts = Vec::new();
if !req.uuid.is_empty() {
if let Some(specific_contract) =
db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await?
db::vm::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await?
{
if specific_contract.admin.key().to_string() == req.wallet {
contracts.push(specific_contract);
@ -294,10 +293,10 @@ impl BrainVmCli for BrainVmCliForReal {
}
} else if req.as_operator {
contracts
.append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?);
.append(&mut db::vm::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?);
} else {
contracts
.append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?);
.append(&mut db::vm::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?);
}
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
@ -315,7 +314,7 @@ impl BrainVmCli for BrainVmCliForReal {
) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> {
let req = check_sig_from_req(req)?;
info!("CLI requested ListVmNodesStream: {req:?}");
let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?;
let nodes = db::vm::NodeWithReports::find_by_filters(&self.db, req).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for node in nodes {
@ -333,7 +332,7 @@ impl BrainVmCli for BrainVmCliForReal {
let req = check_sig_from_req(req)?;
info!("Unknown CLI requested ListVmNodesStream: {req:?}");
// TODO: optimize this query so that it gets only one node
let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?;
let nodes = db::vm::NodeWithReports::find_by_filters(&self.db, req).await?;
if let Some(node) = nodes.into_iter().next() {
return Ok(Response::new(node.into()));
}

@ -1,17 +1,16 @@
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
use detee_shared::{
general_proto::brain_general_cli_server::BrainGeneralCliServer,
vm_proto::brain_vm_daemon_server::BrainVmDaemonServer,
};
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
use dotenv::dotenv;
use hyper_util::rt::TokioIo;
use std::net::SocketAddr;
use std::sync::Arc;
use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmCliForReal, BrainVmDaemonForReal};
use surreal_brain::grpc;
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use tokio::io::DuplexStream;
use tokio::{net::TcpListener, sync::OnceCell};
use tokio::net::TcpListener;
use tokio::sync::OnceCell;
use tonic::transport::{Channel, Endpoint, Server, Uri};
use tower::service_fn;
@ -58,9 +57,11 @@ pub async fn run_service_in_background() -> SocketAddr {
let db_arc = Arc::new(db);
Server::builder()
.add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone())))
.add_service(BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone())))
.add_service(BrainGeneralCliServer::new(grpc::general::BrainCliServer::new(
db_arc.clone(),
)))
.add_service(BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone())))
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.unwrap();
@ -88,9 +89,9 @@ pub async fn run_service_for_stream_server() -> DuplexStream {
let db_arc = Arc::new(db);
tonic::transport::Server::builder()
.add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone())))
.add_service(BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone())))
.add_service(BrainGeneralCliServer::new(grpc::general::BrainCliServer::new(db_arc.clone())))
.add_service(BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone())))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
});

@ -1,7 +1,6 @@
use anyhow::Result;
use detee_shared::vm_proto as snp_proto;
use ed25519_dalek::Signer;
use ed25519_dalek::SigningKey;
use ed25519_dalek::{Signer, SigningKey};
use tonic::metadata::AsciiMetadataValue;
use tonic::Request;

@ -32,14 +32,14 @@ pub async fn create_new_vm(
// wait for update db
tokio::time::sleep(tokio::time::Duration::from_millis(700)).await;
let vm_req_db: Option<db::NewVmReq> =
let vm_req_db: Option<db::vm::NewVmReq> =
db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap();
if let Some(new_vm_req) = vm_req_db {
panic!("New VM request found in DB: {:?}", new_vm_req);
}
let active_vm_op: Option<db::ActiveVm> =
let active_vm_op: Option<db::vm::ActiveVm> =
db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap();
let active_vm = active_vm_op.unwrap();

@ -1,19 +1,17 @@
use common::{
prepare_test_env::{prepare_test_db, run_service_for_stream, run_service_in_background},
test_utils::Key,
vm_cli_utils::create_new_vm,
vm_daemon_utils::mock_vm_daemon,
use common::prepare_test_env::{
prepare_test_db, run_service_for_stream, run_service_in_background,
};
use detee_shared::common_proto::Empty;
use common::test_utils::Key;
use common::vm_cli_utils::create_new_vm;
use common::vm_daemon_utils::mock_vm_daemon;
use detee_shared::common_proto::{Empty, Pubkey};
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
use detee_shared::general_proto::ReportNodeReq;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use detee_shared::vm_proto::ListVmContractsReq;
use detee_shared::{
common_proto::Pubkey, general_proto::brain_general_cli_client::BrainGeneralCliClient,
};
use futures::StreamExt;
use surreal_brain::constants::VM_NODE;
use surreal_brain::db::VmNodeWithReports;
use surreal_brain::db;
mod common;
@ -90,7 +88,7 @@ async fn test_report_node() {
.unwrap()
.into_inner();
let vm_nodes: Vec<VmNodeWithReports> = db
let vm_nodes: Vec<db::vm::NodeWithReports> = db
.query(format!(
"SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};"
))

@ -1,8 +1,8 @@
use common::{
prepare_test_env::{prepare_test_db, run_service_for_stream, run_service_in_background},
test_utils::Key,
vm_daemon_utils::{mock_vm_daemon, register_vm_node},
use common::prepare_test_env::{
prepare_test_db, run_service_for_stream, run_service_in_background,
};
use common::test_utils::Key;
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;