longer struct names

This commit is contained in:
ghe0 2025-05-05 19:23:13 +03:00
parent d233ace7a2
commit 10f369d00d
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
10 changed files with 105 additions and 98 deletions

@ -27,10 +27,10 @@ async fn main() {
let addr = BRAIN_GRPC_ADDR.parse().unwrap(); let addr = BRAIN_GRPC_ADDR.parse().unwrap();
let snp_daemon_server = let snp_daemon_server =
BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone())); BrainVmDaemonServer::new(grpc::vm::BrainVmDaemonServer::new(db_arc.clone()));
let snp_cli_server = BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone())); let snp_cli_server = BrainVmCliServer::new(grpc::vm::BrainVmCliServer::new(db_arc.clone()));
let general_service_server = let general_service_server =
BrainGeneralCliServer::new(grpc::general::BrainCliServer::new(db_arc.clone())); BrainGeneralCliServer::new(grpc::general::BrainGeneralCliServer::new(db_arc.clone()));
let cert = std::fs::read_to_string(CERT_PATH).unwrap(); let cert = std::fs::read_to_string(CERT_PATH).unwrap();
let key = std::fs::read_to_string(CERT_KEY_PATH).unwrap(); let key = std::fs::read_to_string(CERT_KEY_PATH).unwrap();

@ -9,7 +9,7 @@ use surrealdb::sql::Datetime;
use surrealdb::{RecordId, Surreal}; use surrealdb::{RecordId, Surreal};
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct Node { pub struct AppNode {
pub id: RecordId, pub id: RecordId,
pub operator: RecordId, pub operator: RecordId,
pub country: String, pub country: String,
@ -26,7 +26,7 @@ pub struct Node {
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct NodeWithReports { pub struct AppNodeWithReports {
pub id: RecordId, pub id: RecordId,
pub operator: RecordId, pub operator: RecordId,
pub country: String, pub country: String,
@ -71,7 +71,7 @@ pub struct ActiveAppWithNode {
#[serde(rename = "in")] #[serde(rename = "in")]
pub admin: RecordId, pub admin: RecordId,
#[serde(rename = "out")] #[serde(rename = "out")]
pub app_node: Node, pub app_node: AppNode,
pub app_name: String, pub app_name: String,
pub mapped_ports: Vec<(u64, u64)>, pub mapped_ports: Vec<(u64, u64)>,
pub host_ipv4: String, pub host_ipv4: String,
@ -95,11 +95,11 @@ impl ActiveAppWithNode {
} }
} }
impl From<&old_brain::BrainData> for Vec<Node> { impl From<&old_brain::BrainData> for Vec<AppNode> {
fn from(old_data: &old_brain::BrainData) -> Self { fn from(old_data: &old_brain::BrainData) -> Self {
let mut nodes = Vec::new(); let mut nodes = Vec::new();
for old_node in old_data.app_nodes.iter() { for old_node in old_data.app_nodes.iter() {
nodes.push(Node { nodes.push(AppNode {
id: RecordId::from(("app_node", old_node.node_pubkey.clone())), id: RecordId::from(("app_node", old_node.node_pubkey.clone())),
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
country: old_node.country.clone(), country: old_node.country.clone(),

@ -1,6 +1,5 @@
use crate::constants::ACCOUNT; use crate::constants::ACCOUNT;
use crate::db::app; use crate::db::prelude::*;
use crate::db::vm;
use super::Error; use super::Error;
use crate::old_brain; use crate::old_brain;
@ -193,7 +192,7 @@ impl Operator {
pub async fn inspect_nodes( pub async fn inspect_nodes(
db: &Surreal<Client>, db: &Surreal<Client>,
account: &str, account: &str,
) -> Result<(Option<Self>, Vec<vm::NodeWithReports>, Vec<app::NodeWithReports>), Error> { ) -> Result<(Option<Self>, Vec<VmNodeWithReports>, Vec<AppNodeWithReports>), Error> {
let operator = Self::inspect(db, account).await?; let operator = Self::inspect(db, account).await?;
let mut result = db let mut result = db
.query(format!( .query(format!(
@ -205,8 +204,8 @@ impl Operator {
where operator = account:{account};" where operator = account:{account};"
)) ))
.await?; .await?;
let vm_nodes: Vec<vm::NodeWithReports> = result.take(0)?; let vm_nodes: Vec<VmNodeWithReports> = result.take(0)?;
let app_nodes: Vec<app::NodeWithReports> = result.take(1)?; let app_nodes: Vec<AppNodeWithReports> = result.take(1)?;
Ok((operator, vm_nodes, app_nodes)) Ok((operator, vm_nodes, app_nodes))
} }

@ -4,6 +4,7 @@ pub mod vm;
use crate::constants::{DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ}; use crate::constants::{DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ};
use crate::old_brain; use crate::old_brain;
use prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::{Client, Ws}; use surrealdb::engine::remote::ws::{Client, Ws};
use surrealdb::opt::auth::Root; use surrealdb::opt::auth::Root;
@ -16,13 +17,20 @@ pub enum Error {
#[error("Internal DB error: {0}")] #[error("Internal DB error: {0}")]
DataBase(#[from] surrealdb::Error), DataBase(#[from] surrealdb::Error),
#[error("Daemon channel got closed: {0}")] #[error("Daemon channel got closed: {0}")]
VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<vm::DaemonNotification>), VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<VmDaemonNotification>),
#[error(transparent)] #[error(transparent)]
StdIo(#[from] std::io::Error), StdIo(#[from] std::io::Error),
#[error(transparent)] #[error(transparent)]
TimeOut(#[from] tokio::time::error::Elapsed), TimeOut(#[from] tokio::time::error::Elapsed),
} }
pub mod prelude {
pub use super::app::*;
pub use super::general::*;
pub use super::vm::*;
pub use super::*;
}
pub async fn db_connection( pub async fn db_connection(
db_address: &str, db_address: &str,
username: &str, username: &str,
@ -42,22 +50,22 @@ pub async fn migration0(
db: &Surreal<Client>, db: &Surreal<Client>,
old_data: &old_brain::BrainData, old_data: &old_brain::BrainData,
) -> Result<(), Error> { ) -> Result<(), Error> {
let accounts: Vec<crate::db::general::Account> = old_data.into(); let accounts: Vec<Account> = old_data.into();
let vm_nodes: Vec<crate::db::vm::Node> = old_data.into(); let vm_nodes: Vec<VmNode> = old_data.into();
let app_nodes: Vec<crate::db::app::Node> = old_data.into(); let app_nodes: Vec<AppNode> = old_data.into();
let vm_contracts: Vec<crate::db::vm::ActiveVm> = old_data.into(); let vm_contracts: Vec<ActiveVm> = old_data.into();
let schema = std::fs::read_to_string(crate::constants::DB_SCHEMA_FILE)?; let schema = std::fs::read_to_string(crate::constants::DB_SCHEMA_FILE)?;
db.query(schema).await?; db.query(schema).await?;
println!("Inserting accounts..."); println!("Inserting accounts...");
let _: Vec<crate::db::general::Account> = db.insert(()).content(accounts).await?; let _: Vec<Account> = db.insert(()).content(accounts).await?;
println!("Inserting vm nodes..."); println!("Inserting vm nodes...");
let _: Vec<crate::db::vm::Node> = db.insert(()).content(vm_nodes).await?; let _: Vec<VmNode> = db.insert(()).content(vm_nodes).await?;
println!("Inserting app nodes..."); println!("Inserting app nodes...");
let _: Vec<crate::db::app::Node> = db.insert(()).content(app_nodes).await?; let _: Vec<AppNode> = db.insert(()).content(app_nodes).await?;
println!("Inserting vm contracts..."); println!("Inserting vm contracts...");
let _: Vec<crate::db::vm::ActiveVm> = db.insert("vm_contract").relation(vm_contracts).await?; let _: Vec<ActiveVm> = db.insert("vm_contract").relation(vm_contracts).await?;
Ok(()) Ok(())
} }
@ -75,11 +83,11 @@ pub async fn upsert_record<SomeRecord: Serialize + 'static>(
} }
pub async fn listen_for_vm_node< pub async fn listen_for_vm_node<
T: Into<vm::DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>, T: Into<vm::VmDaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>,
>( >(
db: &Surreal<Client>, db: &Surreal<Client>,
node: &str, node: &str,
tx: Sender<vm::DaemonNotification>, tx: Sender<vm::VmDaemonNotification>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let table_name = match std::any::type_name::<T>() { let table_name = match std::any::type_name::<T>() {
"surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(), "surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(),

@ -12,7 +12,7 @@ use surrealdb::{Notification, RecordId, Surreal};
use tokio_stream::StreamExt as _; use tokio_stream::StreamExt as _;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct Node { pub struct VmNode {
pub id: RecordId, pub id: RecordId,
pub operator: RecordId, pub operator: RecordId,
pub country: String, pub country: String,
@ -31,7 +31,7 @@ pub struct Node {
} }
#[derive(Serialize)] #[derive(Serialize)]
pub struct NodeResources { pub struct VmNodeResources {
pub avail_mem_mb: u32, pub avail_mem_mb: u32,
pub avail_vcpus: u32, pub avail_vcpus: u32,
pub avail_storage_gbs: u32, pub avail_storage_gbs: u32,
@ -41,22 +41,22 @@ pub struct NodeResources {
pub max_ports_per_vm: u32, pub max_ports_per_vm: u32,
} }
impl NodeResources { impl VmNodeResources {
pub async fn merge(self, db: &Surreal<Client>, node_id: &str) -> Result<(), Error> { pub async fn merge(self, db: &Surreal<Client>, node_id: &str) -> Result<(), Error> {
let _: Option<Node> = db.update((VM_NODE, node_id)).merge(self).await?; let _: Option<VmNode> = db.update((VM_NODE, node_id)).merge(self).await?;
Ok(()) Ok(())
} }
} }
impl Node { impl VmNode {
pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> { pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> {
let _: Option<Node> = db.upsert(self.id.clone()).content(self).await?; let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?;
Ok(()) Ok(())
} }
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct NodeWithReports { pub struct VmNodeWithReports {
pub id: RecordId, pub id: RecordId,
pub operator: RecordId, pub operator: RecordId,
pub country: String, pub country: String,
@ -75,7 +75,7 @@ pub struct NodeWithReports {
pub reports: Vec<Report>, pub reports: Vec<Report>,
} }
impl NodeWithReports { impl VmNodeWithReports {
// TODO: find a more elegant way to do this than importing gRPC in the DB module // TODO: find a more elegant way to do this than importing gRPC in the DB module
// https://en.wikipedia.org/wiki/Dependency_inversion_principle // https://en.wikipedia.org/wiki/Dependency_inversion_principle
pub async fn find_by_filters( pub async fn find_by_filters(
@ -118,25 +118,25 @@ impl NodeWithReports {
} }
} }
pub enum DaemonNotification { pub enum VmDaemonNotification {
Create(NewVmReq), Create(NewVmReq),
Update(UpdateVmReq), Update(UpdateVmReq),
Delete(DeletedVm), Delete(DeletedVm),
} }
impl From<NewVmReq> for DaemonNotification { impl From<NewVmReq> for VmDaemonNotification {
fn from(value: NewVmReq) -> Self { fn from(value: NewVmReq) -> Self {
Self::Create(value) Self::Create(value)
} }
} }
impl From<UpdateVmReq> for DaemonNotification { impl From<UpdateVmReq> for VmDaemonNotification {
fn from(value: UpdateVmReq) -> Self { fn from(value: UpdateVmReq) -> Self {
Self::Update(value) Self::Update(value)
} }
} }
impl From<DeletedVm> for DaemonNotification { impl From<DeletedVm> for VmDaemonNotification {
fn from(value: DeletedVm) -> Self { fn from(value: DeletedVm) -> Self {
Self::Delete(value) Self::Delete(value)
} }
@ -455,7 +455,7 @@ pub struct ActiveVmWithNode {
#[serde(rename = "in")] #[serde(rename = "in")]
pub admin: RecordId, pub admin: RecordId,
#[serde(rename = "out")] #[serde(rename = "out")]
pub vm_node: Node, pub vm_node: VmNode,
pub hostname: String, pub hostname: String,
pub mapped_ports: Vec<(u32, u32)>, pub mapped_ports: Vec<(u32, u32)>,
pub public_ipv4: String, pub public_ipv4: String,
@ -537,11 +537,11 @@ impl ActiveVmWithNode {
// TODO: delete all of these From implementation after migration 0 gets executed // TODO: delete all of these From implementation after migration 0 gets executed
impl From<&old_brain::BrainData> for Vec<Node> { impl From<&old_brain::BrainData> for Vec<VmNode> {
fn from(old_data: &old_brain::BrainData) -> Self { fn from(old_data: &old_brain::BrainData) -> Self {
let mut nodes = Vec::new(); let mut nodes = Vec::new();
for old_node in old_data.vm_nodes.iter() { for old_node in old_data.vm_nodes.iter() {
nodes.push(Node { nodes.push(VmNode {
id: RecordId::from((VM_NODE, old_node.public_key.clone())), id: RecordId::from((VM_NODE, old_node.public_key.clone())),
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
country: old_node.country.clone(), country: old_node.country.clone(),

@ -18,18 +18,18 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream; use tokio_stream::Stream;
use tonic::{Request, Response, Status}; use tonic::{Request, Response, Status};
pub struct BrainCliServer { pub struct BrainGeneralCliServer {
pub db: Arc<Surreal<Client>>, pub db: Arc<Surreal<Client>>,
} }
impl BrainCliServer { impl BrainGeneralCliServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self { pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db } Self { db }
} }
} }
#[tonic::async_trait] #[tonic::async_trait]
impl BrainGeneralCli for BrainCliServer { impl BrainGeneralCli for BrainGeneralCliServer {
type ListAccountsStream = Pin<Box<dyn Stream<Item = Result<Account, Status>> + Send>>; type ListAccountsStream = Pin<Box<dyn Stream<Item = Result<Account, Status>> + Send>>;
type ListAllAppContractsStream = type ListAllAppContractsStream =
Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>; Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;

@ -1,5 +1,5 @@
use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE}; use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE};
use crate::db; use crate::db::prelude as db;
use detee_shared::app_proto::AppNodeListResp; use detee_shared::app_proto::AppNodeListResp;
use detee_shared::general_proto::{AccountBalance, ListOperatorsResp}; use detee_shared::general_proto::{AccountBalance, ListOperatorsResp};
use detee_shared::vm_proto::*; use detee_shared::vm_proto::*;
@ -7,13 +7,13 @@ use nanoid::nanoid;
use surrealdb::RecordId; use surrealdb::RecordId;
impl From<db::general::Account> for AccountBalance { impl From<db::Account> for AccountBalance {
fn from(account: db::general::Account) -> Self { fn from(account: db::Account) -> Self {
AccountBalance { balance: account.balance, tmp_locked: account.tmp_locked } AccountBalance { balance: account.balance, tmp_locked: account.tmp_locked }
} }
} }
impl From<NewVmReq> for db::vm::NewVmReq { impl From<NewVmReq> for db::NewVmReq {
fn from(new_vm_req: NewVmReq) -> Self { fn from(new_vm_req: NewVmReq) -> Self {
Self { Self {
id: RecordId::from((NEW_VM_REQ, nanoid!(40, &ID_ALPHABET))), id: RecordId::from((NEW_VM_REQ, nanoid!(40, &ID_ALPHABET))),
@ -38,8 +38,8 @@ impl From<NewVmReq> for db::vm::NewVmReq {
} }
} }
impl From<db::vm::NewVmReq> for NewVmReq { impl From<db::NewVmReq> for NewVmReq {
fn from(new_vm_req: db::vm::NewVmReq) -> Self { fn from(new_vm_req: db::NewVmReq) -> Self {
Self { Self {
uuid: new_vm_req.id.key().to_string(), uuid: new_vm_req.id.key().to_string(),
hostname: new_vm_req.hostname, hostname: new_vm_req.hostname,
@ -61,21 +61,21 @@ impl From<db::vm::NewVmReq> for NewVmReq {
} }
} }
impl From<db::vm::NewVmResp> for NewVmResp { impl From<db::NewVmResp> for NewVmResp {
fn from(resp: db::vm::NewVmResp) -> Self { fn from(resp: db::NewVmResp) -> Self {
match resp { match resp {
// TODO: This will require a small architecture change to pass MeasurementArgs from // TODO: This will require a small architecture change to pass MeasurementArgs from
// Daemon to CLI // Daemon to CLI
db::vm::NewVmResp::Args(uuid, args) => { db::NewVmResp::Args(uuid, args) => {
NewVmResp { uuid, error: String::new(), args: Some(args) } NewVmResp { uuid, error: String::new(), args: Some(args) }
} }
db::vm::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None }, db::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None },
} }
} }
} }
impl From<db::vm::UpdateVmReq> for UpdateVmReq { impl From<db::UpdateVmReq> for UpdateVmReq {
fn from(update_vm_req: db::vm::UpdateVmReq) -> Self { fn from(update_vm_req: db::UpdateVmReq) -> Self {
Self { Self {
uuid: update_vm_req.id.key().to_string(), uuid: update_vm_req.id.key().to_string(),
// daemon does not care about VM hostname // daemon does not care about VM hostname
@ -92,8 +92,8 @@ impl From<db::vm::UpdateVmReq> for UpdateVmReq {
} }
} }
impl From<db::vm::DeletedVm> for DeleteVmReq { impl From<db::DeletedVm> for DeleteVmReq {
fn from(delete_vm_req: db::vm::DeletedVm) -> Self { fn from(delete_vm_req: db::DeletedVm) -> Self {
Self { Self {
uuid: delete_vm_req.id.key().to_string(), uuid: delete_vm_req.id.key().to_string(),
admin_pubkey: delete_vm_req.admin.key().to_string(), admin_pubkey: delete_vm_req.admin.key().to_string(),
@ -101,24 +101,24 @@ impl From<db::vm::DeletedVm> for DeleteVmReq {
} }
} }
impl From<db::vm::DaemonNotification> for BrainVmMessage { impl From<db::VmDaemonNotification> for BrainVmMessage {
fn from(notification: db::vm::DaemonNotification) -> Self { fn from(notification: db::VmDaemonNotification) -> Self {
match notification { match notification {
db::vm::DaemonNotification::Create(new_vm_req) => { db::VmDaemonNotification::Create(new_vm_req) => {
BrainVmMessage { msg: Some(brain_vm_message::Msg::NewVmReq(new_vm_req.into())) } BrainVmMessage { msg: Some(brain_vm_message::Msg::NewVmReq(new_vm_req.into())) }
} }
db::vm::DaemonNotification::Update(update_vm_req) => BrainVmMessage { db::VmDaemonNotification::Update(update_vm_req) => BrainVmMessage {
msg: Some(brain_vm_message::Msg::UpdateVmReq(update_vm_req.into())), msg: Some(brain_vm_message::Msg::UpdateVmReq(update_vm_req.into())),
}, },
db::vm::DaemonNotification::Delete(deleted_vm) => { db::VmDaemonNotification::Delete(deleted_vm) => {
BrainVmMessage { msg: Some(brain_vm_message::Msg::DeleteVm(deleted_vm.into())) } BrainVmMessage { msg: Some(brain_vm_message::Msg::DeleteVm(deleted_vm.into())) }
} }
} }
} }
} }
impl From<db::vm::ActiveVmWithNode> for VmContract { impl From<db::ActiveVmWithNode> for VmContract {
fn from(db_c: db::vm::ActiveVmWithNode) -> Self { fn from(db_c: db::ActiveVmWithNode) -> Self {
let mut exposed_ports = Vec::new(); let mut exposed_ports = Vec::new();
for port in db_c.mapped_ports.iter() { for port in db_c.mapped_ports.iter() {
exposed_ports.push(port.0); exposed_ports.push(port.0);
@ -176,8 +176,8 @@ impl From<db::general::Operator> for ListOperatorsResp {
} }
} }
impl From<db::vm::NodeWithReports> for VmNodeListResp { impl From<db::VmNodeWithReports> for VmNodeListResp {
fn from(vm_node: db::vm::NodeWithReports) -> Self { fn from(vm_node: db::VmNodeWithReports) -> Self {
Self { Self {
operator: vm_node.operator.key().to_string(), operator: vm_node.operator.key().to_string(),
node_pubkey: vm_node.id.key().to_string(), node_pubkey: vm_node.id.key().to_string(),
@ -191,8 +191,8 @@ impl From<db::vm::NodeWithReports> for VmNodeListResp {
} }
} }
impl From<db::app::NodeWithReports> for AppNodeListResp { impl From<db::AppNodeWithReports> for AppNodeListResp {
fn from(app_node: db::app::NodeWithReports) -> Self { fn from(app_node: db::AppNodeWithReports) -> Self {
Self { Self {
operator: app_node.operator.key().to_string(), operator: app_node.operator.key().to_string(),
node_pubkey: app_node.id.key().to_string(), node_pubkey: app_node.id.key().to_string(),
@ -206,7 +206,7 @@ impl From<db::app::NodeWithReports> for AppNodeListResp {
} }
} }
impl From<VmNodeResources> for db::vm::NodeResources { impl From<VmNodeResources> for db::VmNodeResources {
fn from(res: VmNodeResources) -> Self { fn from(res: VmNodeResources) -> Self {
Self { Self {
avail_mem_mb: res.avail_memory_mb, avail_mem_mb: res.avail_memory_mb,

@ -1,6 +1,6 @@
#![allow(dead_code)] #![allow(dead_code)]
use crate::constants::{ACCOUNT, VM_NODE}; use crate::constants::{ACCOUNT, VM_NODE};
use crate::db; use crate::db::prelude as db;
use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use crate::grpc::{check_sig_from_parts, check_sig_from_req};
use detee_shared::common_proto::Empty; use detee_shared::common_proto::Empty;
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli;
@ -17,18 +17,18 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt}; use tokio_stream::{Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming}; use tonic::{Request, Response, Status, Streaming};
pub struct BrainDaemonServer { pub struct BrainVmDaemonServer {
pub db: Arc<Surreal<Client>>, pub db: Arc<Surreal<Client>>,
} }
impl BrainDaemonServer { impl BrainVmDaemonServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self { pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db } Self { db }
} }
} }
#[tonic::async_trait] #[tonic::async_trait]
impl BrainVmDaemon for BrainDaemonServer { impl BrainVmDaemon for BrainVmDaemonServer {
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainVmMessage, Status>> + Send>>; type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainVmMessage, Status>> + Send>>;
type RegisterVmNodeStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>; type RegisterVmNodeStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
@ -38,7 +38,7 @@ impl BrainVmDaemon for BrainDaemonServer {
) -> Result<Response<Self::RegisterVmNodeStream>, Status> { ) -> Result<Response<Self::RegisterVmNodeStream>, Status> {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("Starting registration process for {:?}", req); info!("Starting registration process for {:?}", req);
db::vm::Node { db::VmNode {
id: surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())), id: surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())),
operator: surrealdb::RecordId::from((ACCOUNT, req.operator_wallet)), operator: surrealdb::RecordId::from((ACCOUNT, req.operator_wallet)),
country: req.country, country: req.country,
@ -59,7 +59,7 @@ impl BrainVmDaemon for BrainDaemonServer {
.await?; .await?;
info!("Sending existing contracts to {}", req.node_pubkey); info!("Sending existing contracts to {}", req.node_pubkey);
let contracts = db::vm::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?; let contracts = db::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?;
let (tx, rx) = mpsc::channel(6); let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move { tokio::spawn(async move {
for contract in contracts { for contract in contracts {
@ -90,7 +90,7 @@ impl BrainVmDaemon for BrainDaemonServer {
let pubkey = pubkey.clone(); let pubkey = pubkey.clone();
let tx = tx.clone(); let tx = tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
match db::listen_for_vm_node::<db::vm::DeletedVm>(&db, &pubkey, tx).await { match db::listen_for_vm_node::<db::DeletedVm>(&db, &pubkey, tx).await {
Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"), Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"),
Err(e) => { Err(e) => {
log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}") log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}")
@ -103,7 +103,7 @@ impl BrainVmDaemon for BrainDaemonServer {
let pubkey = pubkey.clone(); let pubkey = pubkey.clone();
let tx = tx.clone(); let tx = tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _ = db::listen_for_vm_node::<db::vm::NewVmReq>(&db, &pubkey, tx.clone()).await; let _ = db::listen_for_vm_node::<db::NewVmReq>(&db, &pubkey, tx.clone()).await;
}); });
} }
{ {
@ -111,7 +111,7 @@ impl BrainVmDaemon for BrainDaemonServer {
let pubkey = pubkey.clone(); let pubkey = pubkey.clone();
let tx = tx.clone(); let tx = tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _ = db::listen_for_vm_node::<db::vm::UpdateVmReq>(&db, &pubkey, tx.clone()).await; let _ = db::listen_for_vm_node::<db::UpdateVmReq>(&db, &pubkey, tx.clone()).await;
}); });
} }
@ -151,7 +151,7 @@ impl BrainVmDaemon for BrainDaemonServer {
// TODO: move new_vm_req to active_vm // TODO: move new_vm_req to active_vm
// also handle failure properly // also handle failure properly
if !new_vm_resp.error.is_empty() { if !new_vm_resp.error.is_empty() {
db::vm::NewVmReq::submit_error( db::NewVmReq::submit_error(
&self.db, &self.db,
&new_vm_resp.uuid, &new_vm_resp.uuid,
new_vm_resp.error, new_vm_resp.error,
@ -166,7 +166,7 @@ impl BrainVmDaemon for BrainDaemonServer {
) )
.await?; .await?;
if let Some(args) = new_vm_resp.args { if let Some(args) = new_vm_resp.args {
db::vm::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?; db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?;
} }
} }
} }
@ -175,7 +175,7 @@ impl BrainVmDaemon for BrainDaemonServer {
// self.data.submit_updatevm_resp(update_vm_resp).await; // self.data.submit_updatevm_resp(update_vm_resp).await;
} }
Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => {
let node_resources: db::vm::NodeResources = node_resources.into(); let node_resources: db::VmNodeResources = node_resources.into();
node_resources.merge(&self.db, &pubkey).await?; node_resources.merge(&self.db, &pubkey).await?;
} }
_ => {} _ => {}
@ -189,18 +189,18 @@ impl BrainVmDaemon for BrainDaemonServer {
} }
} }
pub struct BrainCliServer { pub struct BrainVmCliServer {
pub db: Arc<Surreal<Client>>, pub db: Arc<Surreal<Client>>,
} }
impl BrainCliServer { impl BrainVmCliServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self { pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db } Self { db }
} }
} }
#[tonic::async_trait] #[tonic::async_trait]
impl BrainVmCli for BrainCliServer { impl BrainVmCli for BrainVmCliServer {
type ListVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>; type ListVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
type ListVmNodesStream = Pin<Box<dyn Stream<Item = Result<VmNodeListResp, Status>> + Send>>; type ListVmNodesStream = Pin<Box<dyn Stream<Item = Result<VmNodeListResp, Status>> + Send>>;
@ -211,14 +211,14 @@ impl BrainVmCli for BrainCliServer {
return Err(Status::permission_denied("This operator banned you. What did you do?")); return Err(Status::permission_denied("This operator banned you. What did you do?"));
} }
let new_vm_req: db::vm::NewVmReq = req.into(); let new_vm_req: db::NewVmReq = req.into();
let id = new_vm_req.id.key().to_string(); let id = new_vm_req.id.key().to_string();
let db = self.db.clone(); let db = self.db.clone();
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move { tokio::spawn(async move {
let _ = oneshot_tx.send(db::vm::NewVmResp::listen(&db, &id).await); let _ = oneshot_tx.send(db::NewVmResp::listen(&db, &id).await);
}); });
new_vm_req.submit(&self.db).await?; new_vm_req.submit(&self.db).await?;
@ -284,7 +284,7 @@ impl BrainVmCli for BrainCliServer {
let mut contracts = Vec::new(); let mut contracts = Vec::new();
if !req.uuid.is_empty() { if !req.uuid.is_empty() {
if let Some(specific_contract) = if let Some(specific_contract) =
db::vm::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await?
{ {
if specific_contract.admin.key().to_string() == req.wallet { if specific_contract.admin.key().to_string() == req.wallet {
contracts.push(specific_contract); contracts.push(specific_contract);
@ -293,10 +293,10 @@ impl BrainVmCli for BrainCliServer {
} }
} else if req.as_operator { } else if req.as_operator {
contracts contracts
.append(&mut db::vm::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?); .append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?);
} else { } else {
contracts contracts
.append(&mut db::vm::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?); .append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?);
} }
let (tx, rx) = mpsc::channel(6); let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move { tokio::spawn(async move {
@ -314,7 +314,7 @@ impl BrainVmCli for BrainCliServer {
) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> { ) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("CLI requested ListVmNodesStream: {req:?}"); info!("CLI requested ListVmNodesStream: {req:?}");
let nodes = db::vm::NodeWithReports::find_by_filters(&self.db, req).await?; let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?;
let (tx, rx) = mpsc::channel(6); let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move { tokio::spawn(async move {
for node in nodes { for node in nodes {
@ -332,7 +332,7 @@ impl BrainVmCli for BrainCliServer {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("Unknown CLI requested ListVmNodesStream: {req:?}"); info!("Unknown CLI requested ListVmNodesStream: {req:?}");
// TODO: optimize this query so that it gets only one node // TODO: optimize this query so that it gets only one node
let nodes = db::vm::NodeWithReports::find_by_filters(&self.db, req).await?; let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?;
if let Some(node) = nodes.into_iter().next() { if let Some(node) = nodes.into_iter().next() {
return Ok(Response::new(node.into())); return Ok(Response::new(node.into()));
} }

@ -57,11 +57,11 @@ pub async fn run_service_in_background() -> SocketAddr {
let db_arc = Arc::new(db); let db_arc = Arc::new(db);
Server::builder() Server::builder()
.add_service(BrainGeneralCliServer::new(grpc::general::BrainCliServer::new( .add_service(BrainGeneralCliServer::new(grpc::general::BrainGeneralCliServer::new(
db_arc.clone(), db_arc.clone(),
))) )))
.add_service(BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone()))) .add_service(BrainVmCliServer::new(grpc::vm::BrainVmCliServer::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(grpc::vm::BrainVmDaemonServer::new(db_arc.clone())))
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await .await
.unwrap(); .unwrap();
@ -89,9 +89,9 @@ pub async fn run_service_for_stream_server() -> DuplexStream {
let db_arc = Arc::new(db); let db_arc = Arc::new(db);
tonic::transport::Server::builder() tonic::transport::Server::builder()
.add_service(BrainGeneralCliServer::new(grpc::general::BrainCliServer::new(db_arc.clone()))) .add_service(BrainGeneralCliServer::new(grpc::general::BrainGeneralCliServer::new(db_arc.clone())))
.add_service(BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone()))) .add_service(BrainVmCliServer::new(grpc::vm::BrainVmCliServer::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone()))) .add_service(BrainVmDaemonServer::new(grpc::vm::BrainVmDaemonServer::new(db_arc.clone())))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await .await
}); });

@ -11,7 +11,7 @@ use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use detee_shared::vm_proto::ListVmContractsReq; use detee_shared::vm_proto::ListVmContractsReq;
use futures::StreamExt; use futures::StreamExt;
use surreal_brain::constants::VM_NODE; use surreal_brain::constants::VM_NODE;
use surreal_brain::db; use surreal_brain::db::vm::VmNodeWithReports;
mod common; mod common;
@ -88,7 +88,7 @@ async fn test_report_node() {
.unwrap() .unwrap()
.into_inner(); .into_inner();
let vm_nodes: Vec<db::vm::NodeWithReports> = db let vm_nodes: Vec<VmNodeWithReports> = db
.query(format!( .query(format!(
"SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};" "SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};"
)) ))