From 079b4a02aa536092b2aca5314170ee5fa547b0a4 Mon Sep 17 00:00:00 2001 From: Noor Date: Mon, 5 May 2025 19:27:10 +0530 Subject: [PATCH] Major code refactor vm, app and general modules sperated from db and grpc common module for share methods and types --- src/bin/brain.rs | 5 +- src/db/app.rs | 118 +++++ src/db/common/mod.rs | 114 +++++ src/db/general.rs | 211 +++++++++ src/db/mod.rs | 21 + src/{db.rs => db/vm.rs} | 428 +----------------- src/grpc.rs | 907 --------------------------------------- src/grpc/app.rs | 1 + src/grpc/common/mod.rs | 173 ++++++++ src/grpc/common/types.rs | 222 ++++++++++ src/grpc/general.rs | 201 +++++++++ src/grpc/mod.rs | 12 + src/grpc/vm.rs | 342 +++++++++++++++ 13 files changed, 1422 insertions(+), 1333 deletions(-) create mode 100644 src/db/app.rs create mode 100644 src/db/common/mod.rs create mode 100644 src/db/general.rs create mode 100644 src/db/mod.rs rename src/{db.rs => db/vm.rs} (59%) delete mode 100644 src/grpc.rs create mode 100644 src/grpc/app.rs create mode 100644 src/grpc/common/mod.rs create mode 100644 src/grpc/common/types.rs create mode 100644 src/grpc/general.rs create mode 100644 src/grpc/mod.rs create mode 100644 src/grpc/vm.rs diff --git a/src/bin/brain.rs b/src/bin/brain.rs index 75b04cf..30e99d4 100644 --- a/src/bin/brain.rs +++ b/src/bin/brain.rs @@ -6,8 +6,9 @@ use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; use dotenv::dotenv; use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH}; use surreal_brain::db; -use surreal_brain::grpc::BrainVmCliForReal; -use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmDaemonForReal}; +use surreal_brain::grpc::prelude::{ + BrainGeneralCliForReal, BrainVmCliForReal, BrainVmDaemonForReal, +}; use tonic::transport::{Identity, Server, ServerTlsConfig}; #[tokio::main] diff --git a/src/db/app.rs b/src/db/app.rs new file mode 100644 index 0000000..2ac6490 --- /dev/null +++ b/src/db/app.rs @@ -0,0 +1,118 @@ +use crate::constants::{ACCOUNT, ACTIVE_APP}; +use crate::db::general::Report; + +use super::Error; +use crate::old_brain; +use serde::{Deserialize, Serialize}; +use surrealdb::{engine::remote::ws::Client, sql::Datetime, RecordId, Surreal}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct AppNode { + pub id: RecordId, + pub operator: RecordId, + pub country: String, + pub region: String, + pub city: String, + pub ip: String, + pub avail_mem_mb: u32, + pub avail_vcpus: u32, + pub avail_storage_gbs: u32, + pub avail_ports: u32, + pub max_ports_per_app: u32, + pub price: u64, + pub offline_minutes: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AppNodeWithReports { + pub id: RecordId, + pub operator: RecordId, + pub country: String, + pub region: String, + pub city: String, + pub ip: String, + pub avail_mem_mb: u32, + pub avail_vcpus: u32, + pub avail_storage_gbs: u32, + pub avail_ports: u32, + pub max_ports_per_app: u32, + pub price: u64, + pub offline_minutes: u64, + pub reports: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ActiveApp { + id: RecordId, + #[serde(rename = "in")] + admin: RecordId, + #[serde(rename = "out")] + app_node: RecordId, + app_name: String, + mapped_ports: Vec<(u64, u64)>, + host_ipv4: String, + vcpus: u64, + memory_mb: u64, + disk_size_gb: u64, + created_at: Datetime, + price_per_unit: u64, + locked_nano: u64, + collected_at: Datetime, + mr_enclave: String, + package_url: String, + hratls_pubkey: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ActiveAppWithNode { + pub id: RecordId, + #[serde(rename = "in")] + pub admin: RecordId, + #[serde(rename = "out")] + pub app_node: AppNode, + pub app_name: String, + pub mapped_ports: Vec<(u64, u64)>, + pub host_ipv4: String, + pub vcpus: u64, + pub memory_mb: u64, + pub disk_size_gb: u64, + pub created_at: Datetime, + pub price_per_unit: u64, + pub locked_nano: u64, + pub collected_at: Datetime, + pub mr_enclave: String, + pub package_url: String, + pub hratls_pubkey: String, +} + +impl ActiveAppWithNode { + pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { + let contract: Option = + db.query(format!("select * from {ACTIVE_APP}:{uuid} fetch out;")).await?.take(0)?; + Ok(contract) + } +} + +impl From<&old_brain::BrainData> for Vec { + fn from(old_data: &old_brain::BrainData) -> Self { + let mut nodes = Vec::new(); + for old_node in old_data.app_nodes.iter() { + nodes.push(AppNode { + id: RecordId::from(("app_node", old_node.node_pubkey.clone())), + operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), + country: old_node.country.clone(), + region: old_node.region.clone(), + city: old_node.city.clone(), + ip: old_node.ip.clone(), + avail_mem_mb: old_node.avail_mem_mb, + avail_vcpus: old_node.avail_vcpus, + avail_storage_gbs: old_node.avail_storage_mb, + avail_ports: old_node.avail_no_of_port, + max_ports_per_app: old_node.max_ports_per_app, + price: old_node.price, + offline_minutes: old_node.offline_minutes, + }); + } + nodes + } +} diff --git a/src/db/common/mod.rs b/src/db/common/mod.rs new file mode 100644 index 0000000..a093880 --- /dev/null +++ b/src/db/common/mod.rs @@ -0,0 +1,114 @@ +pub use crate::constants::{ + ACCOUNT, ACTIVE_APP, ACTIVE_VM, DB_SCHEMA_FILE, DELETED_VM, ID_ALPHABET, NEW_VM_REQ, + UPDATE_VM_REQ, VM_CONTRACT, VM_NODE, +}; + +use crate::db::prelude::*; +use crate::old_brain; +use serde::{Deserialize, Serialize}; +use surrealdb::Notification; +use surrealdb::{ + engine::remote::ws::{Client, Ws}, + opt::auth::Root, + Surreal, +}; +use tokio::sync::mpsc::Sender; +use tokio_stream::StreamExt; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Internal DB error: {0}")] + DataBase(#[from] surrealdb::Error), + #[error("Daemon channel got closed: {0}")] + DaemonConnection(#[from] tokio::sync::mpsc::error::SendError), + #[error(transparent)] + StdIo(#[from] std::io::Error), + #[error(transparent)] + TimeOut(#[from] tokio::time::error::Elapsed), +} + +pub async fn db_connection( + db_address: &str, + username: &str, + password: &str, + ns: &str, + db: &str, +) -> Result, Error> { + let db_connection: Surreal = Surreal::init(); + db_connection.connect::(db_address).await?; + // Sign in to the server + db_connection.signin(Root { username, password }).await?; + db_connection.use_ns(ns).use_db(db).await?; + Ok(db_connection) +} + +pub async fn migration0( + db: &Surreal, + old_data: &old_brain::BrainData, +) -> Result<(), Error> { + let accounts: Vec = old_data.into(); + let vm_nodes: Vec = old_data.into(); + let app_nodes: Vec = old_data.into(); + let vm_contracts: Vec = old_data.into(); + + let schema = std::fs::read_to_string(DB_SCHEMA_FILE)?; + db.query(schema).await?; + + println!("Inserting accounts..."); + let _: Vec = db.insert(()).content(accounts).await?; + println!("Inserting vm nodes..."); + let _: Vec = db.insert(()).content(vm_nodes).await?; + println!("Inserting app nodes..."); + let _: Vec = db.insert(()).content(app_nodes).await?; + println!("Inserting vm contracts..."); + let _: Vec = db.insert("vm_contract").relation(vm_contracts).await?; + + Ok(()) +} + +pub async fn upsert_record( + db: &Surreal, + table: &str, + id: &str, + my_record: SomeRecord, +) -> Result<(), Error> { + #[derive(Deserialize)] + struct Wrapper {} + let _: Option = db.create((table, id)).content(my_record).await?; + Ok(()) +} + +pub async fn listen_for_node< + T: Into + std::marker::Unpin + for<'de> Deserialize<'de>, +>( + db: &Surreal, + node: &str, + tx: Sender, +) -> Result<(), Error> { + let table_name = match std::any::type_name::() { + "surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(), + "surreal_brain::db::UpdateVmReq" => UPDATE_VM_REQ.to_string(), + "surreal_brain::db::DeletedVm" => DELETED_VM.to_string(), + wat => { + log::error!("listen_for_node: T has type {wat}"); + String::from("wat") + } + }; + let mut resp = + db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; + let mut live_stream = resp.stream::>(0)?; + while let Some(result) = live_stream.next().await { + match result { + Ok(notification) => { + if notification.action == surrealdb::Action::Create { + tx.send(notification.data.into()).await? + } + } + Err(e) => { + log::warn!("listen_for_deletion DB stream failed for {node}: {e}"); + return Err(Error::from(e)); + } + } + } + Ok(()) +} diff --git a/src/db/general.rs b/src/db/general.rs new file mode 100644 index 0000000..b06e463 --- /dev/null +++ b/src/db/general.rs @@ -0,0 +1,211 @@ +use crate::constants::ACCOUNT; +use crate::db::app::AppNodeWithReports; +use crate::db::vm::VmNodeWithReports; + +use super::Error; +use crate::old_brain; +use serde::{Deserialize, Serialize}; +use surrealdb::{engine::remote::ws::Client, sql::Datetime, RecordId, Surreal}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Account { + pub id: RecordId, + pub balance: u64, + pub tmp_locked: u64, + pub escrow: u64, + pub email: String, +} + +impl Account { + pub async fn get(db: &Surreal, address: &str) -> Result { + let id = (ACCOUNT, address); + let account: Option = db.select(id).await?; + let account = match account { + Some(account) => account, + None => { + Self { id: id.into(), balance: 0, tmp_locked: 0, escrow: 0, email: String::new() } + } + }; + Ok(account) + } + + pub async fn airdrop(db: &Surreal, account: &str, tokens: u64) -> Result<(), Error> { + let tokens = tokens.saturating_mul(1_000_000_000); + let _ = db + .query(format!("upsert account:{account} SET balance = (balance || 0) + {tokens};")) + .await?; + Ok(()) + } +} + +impl Account { + pub async fn is_banned_by_node( + db: &Surreal, + user: &str, + node: &str, + ) -> Result { + let ban: Option = db + .query(format!( + "(select operator->ban[0] as ban + from vm_node:{node} + where operator->ban->account contains account:{user} + ).ban;" + )) + .await? + .take(0)?; + Ok(ban.is_some()) + } +} + +impl From<&old_brain::BrainData> for Vec { + fn from(old_data: &old_brain::BrainData) -> Self { + let mut accounts = Vec::new(); + for old_account in old_data.accounts.iter() { + let mut a = Account { + id: RecordId::from(("account", old_account.key())), + balance: old_account.value().balance, + tmp_locked: old_account.value().tmp_locked, + escrow: 0, + email: String::new(), + }; + if let Some(operator) = old_data.operators.get(old_account.key()) { + a.escrow = operator.escrow; + a.email = operator.email.clone(); + } + accounts.push(a); + } + accounts + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Ban { + id: RecordId, + #[serde(rename = "in")] + from_account: RecordId, + #[serde(rename = "out")] + to_account: RecordId, + created_at: Datetime, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Kick { + id: RecordId, + #[serde(rename = "in")] + from_account: RecordId, + #[serde(rename = "out")] + to_account: RecordId, + created_at: Datetime, + reason: String, + contract: RecordId, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Report { + #[serde(rename = "in")] + from_account: RecordId, + #[serde(rename = "out")] + to_node: RecordId, + created_at: Datetime, + pub reason: String, + pub contract_id: String, +} + +impl Report { + // TODO: test this functionality and remove this comment + pub async fn create( + db: &Surreal, + from_account: RecordId, + to_node: RecordId, + reason: String, + contract_id: String, + ) -> Result<(), Error> { + let _: Vec = db + .insert("report") + .relation(Report { + from_account, + to_node, + created_at: Datetime::default(), + reason, + contract_id, + }) + .await?; + Ok(()) + } +} + +/// This is the operator obtained from the DB, +/// however the relation is defined using OperatorRelation +#[derive(Debug, Serialize, Deserialize)] +pub struct Operator { + pub account: RecordId, + pub app_nodes: u64, + pub vm_nodes: u64, + pub email: String, + pub escrow: u64, + pub reports: u64, +} + +impl Operator { + pub async fn list(db: &Surreal) -> Result, Error> { + let mut result = db + .query( + "array::distinct(array::flatten( [ + (select operator from vm_node group by operator).operator, + (select operator from app_node group by operator).operator + ]));" + .to_string(), + ) + .await?; + let operator_accounts: Vec = result.take(0)?; + let mut operators: Vec = Vec::new(); + for account in operator_accounts.iter() { + if let Some(operator) = Self::inspect(db, &account.key().to_string()).await? { + operators.push(operator); + } + } + Ok(operators) + } + + pub async fn inspect(db: &Surreal, account: &str) -> Result, Error> { + let mut result = db + .query(format!( + "$vm_nodes = (select id from vm_node where operator = account:{account}).id; + $app_nodes = (select id from app_node where operator = account:{account}).id; + select *, + id as account, + email, + escrow, + $vm_nodes.len() as vm_nodes, + $app_nodes.len() as app_nodes, + (select id from report where $vm_nodes contains out).len() + + (select id from report where $app_nodes contains out).len() + as reports + from account where id = account:{account};" + )) + .await?; + let operator: Option = result.take(2)?; + Ok(operator) + } + + pub async fn inspect_nodes( + db: &Surreal, + account: &str, + ) -> Result<(Option, Vec, Vec), Error> { + let operator = Self::inspect(db, account).await?; + let mut result = db + .query(format!( + "select *, operator, <-report.* as reports from vm_node + where operator = account:{account};" + )) + .query(format!( + "select *, operator, <-report.* as reports from app_node + where operator = account:{account};" + )) + .await?; + let vm_nodes: Vec = result.take(0)?; + let app_nodes: Vec = result.take(1)?; + + Ok((operator, vm_nodes, app_nodes)) + } +} diff --git a/src/db/mod.rs b/src/db/mod.rs new file mode 100644 index 0000000..179da9e --- /dev/null +++ b/src/db/mod.rs @@ -0,0 +1,21 @@ +pub mod app; +pub mod common; +pub mod general; +pub mod vm; + +pub use prelude::*; +pub mod prelude { + pub use super::app::{ActiveAppWithNode, AppNode, AppNodeWithReports}; + pub use super::common::listen_for_node; + pub use super::common::{db_connection, migration0, upsert_record, Error}; + pub use super::general::{Account, Operator, Report}; + pub use super::vm::{ + ActiveVm, ActiveVmWithNode, DaemonNotification, DeletedVm, NewVmReq, NewVmResp, + UpdateVmReq, VmNode, VmNodeResources, VmNodeWithReports, + }; +} + +pub use crate::constants::{ + ACCOUNT, ACTIVE_APP, ACTIVE_VM, DB_SCHEMA_FILE, DELETED_VM, ID_ALPHABET, NEW_VM_REQ, + UPDATE_VM_REQ, VM_CONTRACT, VM_NODE, +}; diff --git a/src/db.rs b/src/db/vm.rs similarity index 59% rename from src/db.rs rename to src/db/vm.rs index a20d86e..54b5df6 100644 --- a/src/db.rs +++ b/src/db/vm.rs @@ -1,134 +1,13 @@ use std::{str::FromStr, time::Duration}; -pub use crate::constants::{ - ACCOUNT, ACTIVE_APP, ACTIVE_VM, DB_SCHEMA_FILE, DELETED_VM, ID_ALPHABET, NEW_VM_REQ, - UPDATE_VM_REQ, VM_CONTRACT, VM_NODE, -}; - +use super::Error; +use crate::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, VM_NODE}; +use crate::db::general::Report; use crate::old_brain; use serde::{Deserialize, Serialize}; -use surrealdb::{ - engine::remote::ws::{Client, Ws}, - opt::auth::Root, - sql::Datetime, - Notification, RecordId, Surreal, -}; -use tokio::sync::mpsc::Sender; +use surrealdb::{engine::remote::ws::Client, sql::Datetime, Notification, RecordId, Surreal}; use tokio_stream::StreamExt as _; -#[derive(thiserror::Error, Debug)] -pub enum Error { - #[error("Internal DB error: {0}")] - DataBase(#[from] surrealdb::Error), - #[error("Daemon channel got closed: {0}")] - DaemonConnection(#[from] tokio::sync::mpsc::error::SendError), - #[error(transparent)] - StdIo(#[from] std::io::Error), - #[error(transparent)] - TimeOut(#[from] tokio::time::error::Elapsed), -} - -pub async fn db_connection( - db_address: &str, - username: &str, - password: &str, - ns: &str, - db: &str, -) -> Result, Error> { - let db_connection: Surreal = Surreal::init(); - db_connection.connect::(db_address).await?; - // Sign in to the server - db_connection.signin(Root { username, password }).await?; - db_connection.use_ns(ns).use_db(db).await?; - Ok(db_connection) -} - -pub async fn upsert_record( - db: &Surreal, - table: &str, - id: &str, - my_record: SomeRecord, -) -> Result<(), Error> { - #[derive(Deserialize)] - struct Wrapper {} - let _: Option = db.create((table, id)).content(my_record).await?; - Ok(()) -} - -pub async fn migration0( - db: &Surreal, - old_data: &old_brain::BrainData, -) -> Result<(), Error> { - let accounts: Vec = old_data.into(); - let vm_nodes: Vec = old_data.into(); - let app_nodes: Vec = old_data.into(); - let vm_contracts: Vec = old_data.into(); - - let schema = std::fs::read_to_string(DB_SCHEMA_FILE)?; - db.query(schema).await?; - - println!("Inserting accounts..."); - let _: Vec = db.insert(()).content(accounts).await?; - println!("Inserting vm nodes..."); - let _: Vec = db.insert(()).content(vm_nodes).await?; - println!("Inserting app nodes..."); - let _: Vec = db.insert(()).content(app_nodes).await?; - println!("Inserting vm contracts..."); - let _: Vec = db.insert("vm_contract").relation(vm_contracts).await?; - - Ok(()) -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Account { - pub id: RecordId, - pub balance: u64, - pub tmp_locked: u64, - pub escrow: u64, - pub email: String, -} - -impl Account { - pub async fn get(db: &Surreal, address: &str) -> Result { - let id = (ACCOUNT, address); - let account: Option = db.select(id).await?; - let account = match account { - Some(account) => account, - None => { - Self { id: id.into(), balance: 0, tmp_locked: 0, escrow: 0, email: String::new() } - } - }; - Ok(account) - } - - pub async fn airdrop(db: &Surreal, account: &str, tokens: u64) -> Result<(), Error> { - let tokens = tokens.saturating_mul(1_000_000_000); - let _ = db - .query(format!("upsert account:{account} SET balance = (balance || 0) + {tokens};")) - .await?; - Ok(()) - } -} - -impl Account { - pub async fn is_banned_by_node( - db: &Surreal, - user: &str, - node: &str, - ) -> Result { - let ban: Option = db - .query(format!( - "(select operator->ban[0] as ban - from vm_node:{node} - where operator->ban->account contains account:{user} - ).ban;" - )) - .await? - .take(0)?; - Ok(ban.is_some()) - } -} - #[derive(Debug, Serialize, Deserialize)] pub struct VmNode { pub id: RecordId, @@ -466,41 +345,6 @@ pub struct UpdateVmReq { pub locked_nano: u64, } -pub async fn listen_for_node< - T: Into + std::marker::Unpin + for<'de> Deserialize<'de>, ->( - db: &Surreal, - node: &str, - tx: Sender, -) -> Result<(), Error> { - let table_name = match std::any::type_name::() { - "surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(), - "surreal_brain::db::UpdateVmReq" => UPDATE_VM_REQ.to_string(), - "surreal_brain::db::DeletedVm" => DELETED_VM.to_string(), - wat => { - log::error!("listen_for_node: T has type {wat}"); - String::from("wat") - } - }; - let mut resp = - db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; - let mut live_stream = resp.stream::>(0)?; - while let Some(result) = live_stream.next().await { - match result { - Ok(notification) => { - if notification.action == surrealdb::Action::Create { - tx.send(notification.data.into()).await? - } - } - Err(e) => { - log::warn!("listen_for_deletion DB stream failed for {node}: {e}"); - return Err(Error::from(e)); - } - } - } - Ok(()) -} - #[derive(Debug, Serialize, Deserialize)] pub struct DeletedVm { pub id: RecordId, @@ -688,225 +532,6 @@ impl ActiveVmWithNode { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct AppNode { - pub id: RecordId, - pub operator: RecordId, - pub country: String, - pub region: String, - pub city: String, - pub ip: String, - pub avail_mem_mb: u32, - pub avail_vcpus: u32, - pub avail_storage_gbs: u32, - pub avail_ports: u32, - pub max_ports_per_app: u32, - pub price: u64, - pub offline_minutes: u64, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct AppNodeWithReports { - pub id: RecordId, - pub operator: RecordId, - pub country: String, - pub region: String, - pub city: String, - pub ip: String, - pub avail_mem_mb: u32, - pub avail_vcpus: u32, - pub avail_storage_gbs: u32, - pub avail_ports: u32, - pub max_ports_per_app: u32, - pub price: u64, - pub offline_minutes: u64, - pub reports: Vec, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ActiveApp { - id: RecordId, - #[serde(rename = "in")] - admin: RecordId, - #[serde(rename = "out")] - app_node: RecordId, - app_name: String, - mapped_ports: Vec<(u64, u64)>, - host_ipv4: String, - vcpus: u64, - memory_mb: u64, - disk_size_gb: u64, - created_at: Datetime, - price_per_unit: u64, - locked_nano: u64, - collected_at: Datetime, - mr_enclave: String, - package_url: String, - hratls_pubkey: String, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ActiveAppWithNode { - pub id: RecordId, - #[serde(rename = "in")] - pub admin: RecordId, - #[serde(rename = "out")] - pub app_node: AppNode, - pub app_name: String, - pub mapped_ports: Vec<(u64, u64)>, - pub host_ipv4: String, - pub vcpus: u64, - pub memory_mb: u64, - pub disk_size_gb: u64, - pub created_at: Datetime, - pub price_per_unit: u64, - pub locked_nano: u64, - pub collected_at: Datetime, - pub mr_enclave: String, - pub package_url: String, - pub hratls_pubkey: String, -} - -impl ActiveAppWithNode { - pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { - let contract: Option = - db.query(format!("select * from {ACTIVE_APP}:{uuid} fetch out;")).await?.take(0)?; - Ok(contract) - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Ban { - id: RecordId, - #[serde(rename = "in")] - from_account: RecordId, - #[serde(rename = "out")] - to_account: RecordId, - created_at: Datetime, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Kick { - id: RecordId, - #[serde(rename = "in")] - from_account: RecordId, - #[serde(rename = "out")] - to_account: RecordId, - created_at: Datetime, - reason: String, - contract: RecordId, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Report { - #[serde(rename = "in")] - from_account: RecordId, - #[serde(rename = "out")] - to_node: RecordId, - created_at: Datetime, - pub reason: String, - pub contract_id: String, -} - -impl Report { - // TODO: test this functionality and remove this comment - pub async fn create( - db: &Surreal, - from_account: RecordId, - to_node: RecordId, - reason: String, - contract_id: String, - ) -> Result<(), Error> { - let _: Vec = db - .insert("report") - .relation(Report { - from_account, - to_node, - created_at: Datetime::default(), - reason, - contract_id, - }) - .await?; - Ok(()) - } -} - -/// This is the operator obtained from the DB, -/// however the relation is defined using OperatorRelation -#[derive(Debug, Serialize, Deserialize)] -pub struct Operator { - pub account: RecordId, - pub app_nodes: u64, - pub vm_nodes: u64, - pub email: String, - pub escrow: u64, - pub reports: u64, -} - -impl Operator { - pub async fn list(db: &Surreal) -> Result, Error> { - let mut result = db - .query( - "array::distinct(array::flatten( [ - (select operator from vm_node group by operator).operator, - (select operator from app_node group by operator).operator - ]));" - .to_string(), - ) - .await?; - let operator_accounts: Vec = result.take(0)?; - let mut operators: Vec = Vec::new(); - for account in operator_accounts.iter() { - if let Some(operator) = Self::inspect(db, &account.key().to_string()).await? { - operators.push(operator); - } - } - Ok(operators) - } - - pub async fn inspect(db: &Surreal, account: &str) -> Result, Error> { - let mut result = db - .query(format!( - "$vm_nodes = (select id from vm_node where operator = account:{account}).id; - $app_nodes = (select id from app_node where operator = account:{account}).id; - select *, - id as account, - email, - escrow, - $vm_nodes.len() as vm_nodes, - $app_nodes.len() as app_nodes, - (select id from report where $vm_nodes contains out).len() + - (select id from report where $app_nodes contains out).len() - as reports - from account where id = account:{account};" - )) - .await?; - let operator: Option = result.take(2)?; - Ok(operator) - } - - pub async fn inspect_nodes( - db: &Surreal, - account: &str, - ) -> Result<(Option, Vec, Vec), Error> { - let operator = Self::inspect(db, account).await?; - let mut result = db - .query(format!( - "select *, operator, <-report.* as reports from vm_node - where operator = account:{account};" - )) - .query(format!( - "select *, operator, <-report.* as reports from app_node - where operator = account:{account};" - )) - .await?; - let vm_nodes: Vec = result.take(0)?; - let app_nodes: Vec = result.take(1)?; - - Ok((operator, vm_nodes, app_nodes)) - } -} - // TODO: delete all of these From implementation after migration 0 gets executed impl From<&old_brain::BrainData> for Vec { @@ -965,48 +590,3 @@ impl From<&old_brain::BrainData> for Vec { contracts } } - -impl From<&old_brain::BrainData> for Vec { - fn from(old_data: &old_brain::BrainData) -> Self { - let mut nodes = Vec::new(); - for old_node in old_data.app_nodes.iter() { - nodes.push(AppNode { - id: RecordId::from(("app_node", old_node.node_pubkey.clone())), - operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), - country: old_node.country.clone(), - region: old_node.region.clone(), - city: old_node.city.clone(), - ip: old_node.ip.clone(), - avail_mem_mb: old_node.avail_mem_mb, - avail_vcpus: old_node.avail_vcpus, - avail_storage_gbs: old_node.avail_storage_mb, - avail_ports: old_node.avail_no_of_port, - max_ports_per_app: old_node.max_ports_per_app, - price: old_node.price, - offline_minutes: old_node.offline_minutes, - }); - } - nodes - } -} - -impl From<&old_brain::BrainData> for Vec { - fn from(old_data: &old_brain::BrainData) -> Self { - let mut accounts = Vec::new(); - for old_account in old_data.accounts.iter() { - let mut a = Account { - id: RecordId::from(("account", old_account.key())), - balance: old_account.value().balance, - tmp_locked: old_account.value().tmp_locked, - escrow: 0, - email: String::new(), - }; - if let Some(operator) = old_data.operators.get(old_account.key()) { - a.escrow = operator.escrow; - a.email = operator.email.clone(); - } - accounts.push(a); - } - accounts - } -} diff --git a/src/grpc.rs b/src/grpc.rs deleted file mode 100644 index 2114a9b..0000000 --- a/src/grpc.rs +++ /dev/null @@ -1,907 +0,0 @@ -#![allow(dead_code)] -use crate::constants::{ACCOUNT, ADMIN_ACCOUNTS, VM_NODE}; -use crate::db; -use detee_shared::app_proto::{AppContract, AppNodeListResp}; -use detee_shared::{ - common_proto::{Empty, Pubkey}, - general_proto::{ - brain_general_cli_server::BrainGeneralCli, Account, AccountBalance, AirdropReq, BanUserReq, - InspectOperatorResp, KickReq, KickResp, ListOperatorsResp, RegOperatorReq, ReportNodeReq, - SlashReq, - }, - vm_proto::{ - brain_vm_cli_server::BrainVmCli, brain_vm_daemon_server::BrainVmDaemon, ListVmContractsReq, - *, - }, -}; -use nanoid::nanoid; -use surrealdb::{engine::remote::ws::Client, Surreal}; - -use log::info; -use std::pin::Pin; -use std::sync::Arc; -use surrealdb::RecordId; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tokio_stream::{Stream, StreamExt}; -use tonic::{Request, Response, Status, Streaming}; - -pub struct BrainGeneralCliForReal { - pub db: Arc>, -} - -impl BrainGeneralCliForReal { - pub fn new(db: Arc>) -> Self { - Self { db } - } -} - -impl From for AccountBalance { - fn from(account: db::Account) -> Self { - AccountBalance { balance: account.balance, tmp_locked: account.tmp_locked } - } -} - -impl From for db::NewVmReq { - fn from(new_vm_req: NewVmReq) -> Self { - Self { - id: RecordId::from((db::NEW_VM_REQ, nanoid!(40, &db::ID_ALPHABET))), - admin: RecordId::from((db::ACCOUNT, new_vm_req.admin_pubkey)), - vm_node: RecordId::from((db::VM_NODE, new_vm_req.node_pubkey)), - hostname: new_vm_req.hostname, - extra_ports: new_vm_req.extra_ports, - public_ipv4: new_vm_req.public_ipv4, - public_ipv6: new_vm_req.public_ipv6, - disk_size_gb: new_vm_req.disk_size_gb, - vcpus: new_vm_req.vcpus, - memory_mb: new_vm_req.memory_mb, - kernel_url: new_vm_req.kernel_url, - kernel_sha: new_vm_req.kernel_sha, - dtrfs_url: new_vm_req.dtrfs_url, - dtrfs_sha: new_vm_req.dtrfs_sha, - price_per_unit: new_vm_req.price_per_unit, - locked_nano: new_vm_req.locked_nano, - created_at: surrealdb::sql::Datetime::default(), - error: String::new(), - } - } -} - -impl From for NewVmReq { - fn from(new_vm_req: db::NewVmReq) -> Self { - Self { - uuid: new_vm_req.id.key().to_string(), - hostname: new_vm_req.hostname, - admin_pubkey: new_vm_req.admin.key().to_string(), - node_pubkey: new_vm_req.vm_node.key().to_string(), - extra_ports: new_vm_req.extra_ports, - public_ipv4: new_vm_req.public_ipv4, - public_ipv6: new_vm_req.public_ipv6, - disk_size_gb: new_vm_req.disk_size_gb, - vcpus: new_vm_req.vcpus, - memory_mb: new_vm_req.memory_mb, - kernel_url: new_vm_req.kernel_url, - kernel_sha: new_vm_req.kernel_sha, - dtrfs_url: new_vm_req.dtrfs_url, - dtrfs_sha: new_vm_req.dtrfs_sha, - price_per_unit: new_vm_req.price_per_unit, - locked_nano: new_vm_req.locked_nano, - } - } -} - -impl From for NewVmResp { - fn from(resp: db::NewVmResp) -> Self { - match resp { - // TODO: This will require a small architecture change to pass MeasurementArgs from - // Daemon to CLI - db::NewVmResp::Args(uuid, args) => { - NewVmResp { uuid, error: String::new(), args: Some(args) } - } - db::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None }, - } - } -} - -impl From for UpdateVmReq { - fn from(update_vm_req: db::UpdateVmReq) -> Self { - Self { - uuid: update_vm_req.id.key().to_string(), - // daemon does not care about VM hostname - hostname: String::new(), - admin_pubkey: update_vm_req.admin.key().to_string(), - disk_size_gb: update_vm_req.disk_size_gb, - vcpus: update_vm_req.vcpus, - memory_mb: update_vm_req.memory_mb, - kernel_url: update_vm_req.kernel_url, - kernel_sha: update_vm_req.kernel_sha, - dtrfs_url: update_vm_req.dtrfs_url, - dtrfs_sha: update_vm_req.dtrfs_sha, - } - } -} - -impl From for DeleteVmReq { - fn from(delete_vm_req: db::DeletedVm) -> Self { - Self { - uuid: delete_vm_req.id.key().to_string(), - admin_pubkey: delete_vm_req.admin.key().to_string(), - } - } -} - -impl From for BrainVmMessage { - fn from(notification: db::DaemonNotification) -> Self { - match notification { - db::DaemonNotification::Create(new_vm_req) => { - BrainVmMessage { msg: Some(brain_vm_message::Msg::NewVmReq(new_vm_req.into())) } - } - db::DaemonNotification::Update(update_vm_req) => BrainVmMessage { - msg: Some(brain_vm_message::Msg::UpdateVmReq(update_vm_req.into())), - }, - db::DaemonNotification::Delete(deleted_vm) => { - BrainVmMessage { msg: Some(brain_vm_message::Msg::DeleteVm(deleted_vm.into())) } - } - } - } -} - -impl From for VmContract { - fn from(db_c: db::ActiveVmWithNode) -> Self { - let mut exposed_ports = Vec::new(); - for port in db_c.mapped_ports.iter() { - exposed_ports.push(port.0); - } - VmContract { - uuid: db_c.id.key().to_string(), - hostname: db_c.hostname.clone(), - admin_pubkey: db_c.admin.key().to_string(), - node_pubkey: db_c.vm_node.id.key().to_string(), - node_ip: db_c.vm_node.ip.clone(), - location: format!( - "{}, {}, {}", - db_c.vm_node.city, db_c.vm_node.region, db_c.vm_node.country - ), - memory_mb: db_c.memory_mb, - vcpus: db_c.vcpus, - disk_size_gb: db_c.disk_size_gb, - mapped_ports: db_c - .mapped_ports - .iter() - .map(|(h, g)| MappedPort { host_port: *h, guest_port: *g }) - .collect(), - vm_public_ipv6: db_c.public_ipv6.clone(), - vm_public_ipv4: db_c.public_ipv4.clone(), - locked_nano: db_c.locked_nano, - dtrfs_sha: db_c.dtrfs_sha.clone(), - kernel_sha: db_c.kernel_sha.clone(), - nano_per_minute: db_c.price_per_minute(), - created_at: db_c.created_at.to_rfc3339(), - // TODO: remove updated_at from the proto - // This will get moved to VM history (users will be able to - // query old contracts, which also shows updates of existing contracts). - updated_at: db_c.created_at.to_rfc3339(), - collected_at: db_c.collected_at.to_rfc3339(), - } - } -} - -impl From for tonic::Status { - fn from(e: db::Error) -> Self { - Self::internal(format!("Internal error: {e}")) - } -} - -impl From for ListOperatorsResp { - fn from(db_o: db::Operator) -> Self { - ListOperatorsResp { - pubkey: db_o.account.key().to_string(), - escrow: db_o.escrow, - email: db_o.email, - app_nodes: db_o.app_nodes, - vm_nodes: db_o.vm_nodes, - reports: db_o.reports, - } - } -} - -impl From for VmNodeListResp { - fn from(vm_node: db::VmNodeWithReports) -> Self { - Self { - operator: vm_node.operator.key().to_string(), - node_pubkey: vm_node.id.key().to_string(), - country: vm_node.country, - region: vm_node.region, - city: vm_node.city, - ip: vm_node.ip, - reports: vm_node.reports.iter().map(|n| n.reason.clone()).collect(), - price: vm_node.price, - } - } -} - -impl From for AppNodeListResp { - fn from(app_node: db::AppNodeWithReports) -> Self { - Self { - operator: app_node.operator.key().to_string(), - node_pubkey: app_node.id.key().to_string(), - country: app_node.country, - region: app_node.region, - city: app_node.city, - ip: app_node.ip, - reports: app_node.reports.iter().map(|n| n.reason.clone()).collect(), - price: app_node.price, - } - } -} - -impl From for db::VmNodeResources { - fn from(res: VmNodeResources) -> Self { - Self { - avail_mem_mb: res.avail_memory_mb, - avail_vcpus: res.avail_vcpus, - avail_storage_gbs: res.avail_storage_gb, - avail_ipv4: res.avail_ipv4, - avail_ipv6: res.avail_ipv6, - avail_ports: res.avail_ports, - max_ports_per_vm: res.max_ports_per_vm, - } - } -} - -pub struct BrainVmDaemonForReal { - pub db: Arc>, -} - -impl BrainVmDaemonForReal { - pub fn new(db: Arc>) -> Self { - Self { db } - } -} - -#[tonic::async_trait] -impl BrainVmDaemon for BrainVmDaemonForReal { - type RegisterVmNodeStream = Pin> + Send>>; - async fn register_vm_node( - &self, - req: Request, - ) -> Result, Status> { - let req = check_sig_from_req(req)?; - info!("Starting registration process for {:?}", req); - db::VmNode { - id: surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())), - operator: surrealdb::RecordId::from((ACCOUNT, req.operator_wallet)), - country: req.country, - region: req.region, - city: req.city, - ip: req.main_ip, - price: req.price, - avail_mem_mb: 0, - avail_vcpus: 0, - avail_storage_gbs: 0, - avail_ipv4: 0, - avail_ipv6: 0, - avail_ports: 0, - max_ports_per_vm: 0, - offline_minutes: 0, - } - .register(&self.db) - .await?; - - info!("Sending existing contracts to {}", req.node_pubkey); - let contracts = db::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?; - let (tx, rx) = mpsc::channel(6); - tokio::spawn(async move { - for contract in contracts { - let _ = tx.send(Ok(contract.into())).await; - } - }); - let output_stream = ReceiverStream::new(rx); - Ok(Response::new(Box::pin(output_stream) as Self::RegisterVmNodeStream)) - } - - type BrainMessagesStream = Pin> + Send>>; - async fn brain_messages( - &self, - req: Request, - ) -> Result, Status> { - let auth = req.into_inner(); - let pubkey = auth.pubkey.clone(); - check_sig_from_parts( - &pubkey, - &auth.timestamp, - &format!("{:?}", auth.contracts), - &auth.signature, - )?; - info!("Daemon {} connected to receive brain messages", pubkey); - - let (tx, rx) = mpsc::channel(6); - { - let db = self.db.clone(); - let pubkey = pubkey.clone(); - let tx = tx.clone(); - tokio::spawn(async move { - match db::listen_for_node::(&db, &pubkey, tx).await { - Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"), - Err(e) => { - log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}") - } - }; - }); - } - { - let db = self.db.clone(); - let pubkey = pubkey.clone(); - let tx = tx.clone(); - tokio::spawn(async move { - let _ = db::listen_for_node::(&db, &pubkey, tx.clone()).await; - }); - } - { - let db = self.db.clone(); - let pubkey = pubkey.clone(); - let tx = tx.clone(); - tokio::spawn(async move { - let _ = db::listen_for_node::(&db, &pubkey, tx.clone()).await; - }); - } - - let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into())); - Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream)) - } - - async fn daemon_messages( - &self, - req: Request>, - ) -> Result, Status> { - let mut req_stream = req.into_inner(); - let pubkey: String; - if let Some(Ok(msg)) = req_stream.next().await { - log::debug!("demon_messages received the following auth message: {:?}", msg.msg); - if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg { - pubkey = auth.pubkey.clone(); - check_sig_from_parts( - &pubkey, - &auth.timestamp, - &format!("{:?}", auth.contracts), - &auth.signature, - )?; - } else { - return Err(Status::unauthenticated( - "Could not authenticate the daemon: could not extract auth signature", - )); - } - } else { - return Err(Status::unauthenticated("Could not authenticate the daemon")); - } - - while let Some(daemon_message) = req_stream.next().await { - match daemon_message { - Ok(msg) => match msg.msg { - Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { - // TODO: move new_vm_req to active_vm - // also handle failure properly - if !new_vm_resp.error.is_empty() { - db::NewVmReq::submit_error( - &self.db, - &new_vm_resp.uuid, - new_vm_resp.error, - ) - .await?; - } else { - db::upsert_record( - &self.db, - "measurement_args", - &new_vm_resp.uuid, - new_vm_resp.args.clone(), - ) - .await?; - if let Some(args) = new_vm_resp.args { - db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?; - } - } - } - Some(vm_daemon_message::Msg::UpdateVmResp(_update_vm_resp)) => { - todo!(); - // self.data.submit_updatevm_resp(update_vm_resp).await; - } - Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { - let node_resources: db::VmNodeResources = node_resources.into(); - node_resources.merge(&self.db, &pubkey).await?; - } - _ => {} - }, - Err(e) => { - log::warn!("Daemon disconnected: {e:?}"); - } - } - } - Ok(Response::new(Empty {})) - } -} - -#[tonic::async_trait] -impl BrainGeneralCli for BrainGeneralCliForReal { - type ListAccountsStream = Pin> + Send>>; - type ListAllAppContractsStream = - Pin> + Send>>; - type ListAllVmContractsStream = Pin> + Send>>; - type ListOperatorsStream = - Pin> + Send>>; - - async fn get_balance(&self, req: Request) -> Result, Status> { - let req = check_sig_from_req(req)?; - Ok(Response::new(db::Account::get(&self.db, &req.pubkey).await?.into())) - } - - async fn report_node(&self, req: Request) -> Result, Status> { - let req = check_sig_from_req(req)?; - let (account, node, contract_id) = - match db::ActiveVmWithNode::get_by_uuid(&self.db, &req.contract).await? { - Some(vm_contract) - if vm_contract.admin.key().to_string() == req.admin_pubkey - && vm_contract.vm_node.id.key().to_string() == req.node_pubkey => - { - (vm_contract.admin, vm_contract.vm_node.id, vm_contract.id.to_string()) - } - _ => match db::ActiveAppWithNode::get_by_uuid(&self.db, &req.contract).await? { - Some(app_contract) - if app_contract.admin.key().to_string() == req.admin_pubkey - && app_contract.app_node.id.key().to_string() == req.node_pubkey => - { - (app_contract.admin, app_contract.app_node.id, app_contract.id.to_string()) - } - _ => { - return Err(Status::unauthenticated("No contract found by this ID.")); - } - }, - }; - db::Report::create(&self.db, account, node, req.reason, contract_id).await?; - Ok(Response::new(Empty {})) - } - - async fn list_operators( - &self, - req: Request, - ) -> Result, Status> { - let _ = check_sig_from_req(req)?; - let operators = db::Operator::list(&self.db).await?; - let (tx, rx) = mpsc::channel(6); - tokio::spawn(async move { - for op in operators { - let _ = tx.send(Ok(op.into())).await; - } - }); - let output_stream = ReceiverStream::new(rx); - Ok(Response::new(Box::pin(output_stream) as Self::ListOperatorsStream)) - } - - async fn inspect_operator( - &self, - req: Request, - ) -> Result, Status> { - match db::Operator::inspect_nodes(&self.db, &req.into_inner().pubkey).await? { - (Some(op), vm_nodes, app_nodes) => Ok(Response::new(InspectOperatorResp { - operator: Some(op.into()), - vm_nodes: vm_nodes.into_iter().map(|n| n.into()).collect(), - app_nodes: app_nodes.into_iter().map(|n| n.into()).collect(), - })), - (None, _, _) => Err(Status::not_found("The wallet you specified is not an operator")), - } - } - - async fn register_operator( - &self, - _req: Request, - ) -> Result, Status> { - todo!(); - // let req = check_sig_from_req(req)?; - // info!("Regitering new operator: {req:?}"); - // match self.data.register_operator(req) { - // Ok(()) => Ok(Response::new(Empty {})), - // Err(e) => Err(Status::failed_precondition(e.to_string())), - // } - } - - async fn kick_contract(&self, _req: Request) -> Result, Status> { - todo!(); - // let req = check_sig_from_req(req)?; - // match self.data.kick_contract(&req.operator_wallet, &req.contract_uuid, &req.reason).await { - // Ok(nano_lp) => Ok(Response::new(KickResp { nano_lp })), - // Err(e) => Err(Status::permission_denied(e.to_string())), - // } - } - - async fn ban_user(&self, _req: Request) -> Result, Status> { - todo!(); - // let req = check_sig_from_req(req)?; - // self.data.ban_user(&req.operator_wallet, &req.user_wallet); - // Ok(Response::new(Empty {})) - } - - // admin commands - - async fn airdrop(&self, req: Request) -> Result, Status> { - check_admin_key(&req)?; - let req = check_sig_from_req(req)?; - db::Account::airdrop(&self.db, &req.pubkey, req.tokens).await?; - Ok(Response::new(Empty {})) - } - - async fn slash(&self, _req: Request) -> Result, Status> { - todo!(); - // check_admin_key(&req)?; - // let req = check_sig_from_req(req)?; - // self.data.slash_account(&req.pubkey, req.tokens); - // Ok(Response::new(Empty {})) - } - - async fn list_accounts( - &self, - _req: Request, - ) -> Result, Status> { - todo!(); - // check_admin_key(&req)?; - // let _ = check_sig_from_req(req)?; - // let accounts = self.data.list_accounts(); - // let (tx, rx) = mpsc::channel(6); - // tokio::spawn(async move { - // for account in accounts { - // let _ = tx.send(Ok(account.into())).await; - // } - // }); - // let output_stream = ReceiverStream::new(rx); - // Ok(Response::new(Box::pin(output_stream) as Self::ListAccountsStream)) - } - - async fn list_all_vm_contracts( - &self, - _req: Request, - ) -> Result, Status> { - todo!(); - // check_admin_key(&req)?; - // let _ = check_sig_from_req(req)?; - // let contracts = self.data.list_all_contracts(); - // let (tx, rx) = mpsc::channel(6); - // tokio::spawn(async move { - // for contract in contracts { - // let _ = tx.send(Ok(contract.into())).await; - // } - // }); - // let output_stream = ReceiverStream::new(rx); - // Ok(Response::new(Box::pin(output_stream) as Self::ListAllVmContractsStream)) - } - - async fn list_all_app_contracts( - &self, - _req: tonic::Request, - ) -> Result, Status> { - todo!(); - // check_admin_key(&req)?; - // let _ = check_sig_from_req(req)?; - // let contracts = self.data.list_all_app_contracts(); - // let (tx, rx) = mpsc::channel(6); - // tokio::spawn(async move { - // for contract in contracts { - // let _ = tx.send(Ok(contract.into())).await; - // } - // }); - // let output_stream = ReceiverStream::new(rx); - // Ok(Response::new(Box::pin(output_stream))) - } -} - -pub struct BrainVmCliForReal { - pub db: Arc>, -} - -impl BrainVmCliForReal { - pub fn new(db: Arc>) -> Self { - Self { db } - } -} - -#[tonic::async_trait] -impl BrainVmCli for BrainVmCliForReal { - type ListVmContractsStream = Pin> + Send>>; - type ListVmNodesStream = Pin> + Send>>; - - async fn new_vm(&self, req: Request) -> Result, Status> { - let req = check_sig_from_req(req)?; - info!("New VM requested via CLI: {req:?}"); - if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { - return Err(Status::permission_denied("This operator banned you. What did you do?")); - } - - let new_vm_req: db::NewVmReq = req.into(); - let id = new_vm_req.id.key().to_string(); - - let db = self.db.clone(); - - let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); - tokio::spawn(async move { - let _ = oneshot_tx.send(db::NewVmResp::listen(&db, &id).await); - }); - new_vm_req.submit(&self.db).await?; - - match oneshot_rx.await { - Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded("Request failed due to timeout. Please try again later or contact the DeTEE devs team.")), - Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), - Err(e) => { - log::error!("Something weird happened. Reached error {e:?}"); - Err(Status::unknown( - "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", - )) - } - } - } - - async fn update_vm(&self, req: Request) -> Result, Status> { - let req = check_sig_from_req(req)?; - info!("Update VM requested via CLI: {req:?}"); - todo!(); - // let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); - // self.data.submit_updatevm_req(req, oneshot_tx).await; - // match oneshot_rx.await { - // Ok(response) => { - // info!("Sending UpdateVMResp: {response:?}"); - // Ok(Response::new(response)) - // } - // Err(e) => Err(Status::unknown(format!( - // "Update VM request failed due to error: {e}" - // ))), - // } - } - - async fn extend_vm(&self, req: Request) -> Result, Status> { - let _req = check_sig_from_req(req)?; - todo!(); - // match self - // .data - // .extend_vm_contract_time(&req.uuid, &req.admin_pubkey, req.locked_nano) - // { - // Ok(()) => Ok(Response::new(Empty {})), - // Err(e) => Err(Status::unknown(format!("Could not extend contract: {e}"))), - // } - } - - async fn delete_vm(&self, req: Request) -> Result, Status> { - let _req = check_sig_from_req(req)?; - todo!(); - // match self.data.delete_vm(req).await { - // Ok(()) => Ok(Response::new(Empty {})), - // Err(e) => Err(Status::not_found(e.to_string())), - // } - } - - async fn list_vm_contracts( - &self, - req: Request, - ) -> Result, Status> { - let req = check_sig_from_req(req)?; - info!( - "CLI {} requested ListVMVmContractsStream. As operator: {}", - req.wallet, req.as_operator - ); - let mut contracts = Vec::new(); - if !req.uuid.is_empty() { - if let Some(specific_contract) = - db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? - { - if specific_contract.admin.key().to_string() == req.wallet { - contracts.push(specific_contract); - } - // TODO: allow operator to inspect contracts - } - } else if req.as_operator { - contracts - .append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?); - } else { - contracts - .append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?); - } - let (tx, rx) = mpsc::channel(6); - tokio::spawn(async move { - for contract in contracts { - let _ = tx.send(Ok(contract.into())).await; - } - }); - let output_stream = ReceiverStream::new(rx); - Ok(Response::new(Box::pin(output_stream) as Self::ListVmContractsStream)) - } - - async fn list_vm_nodes( - &self, - req: Request, - ) -> Result, tonic::Status> { - let req = check_sig_from_req(req)?; - info!("CLI requested ListVmNodesStream: {req:?}"); - let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; - let (tx, rx) = mpsc::channel(6); - tokio::spawn(async move { - for node in nodes { - let _ = tx.send(Ok(node.into())).await; - } - }); - let output_stream = ReceiverStream::new(rx); - Ok(Response::new(Box::pin(output_stream) as Self::ListVmNodesStream)) - } - - async fn get_one_vm_node( - &self, - req: Request, - ) -> Result, Status> { - let req = check_sig_from_req(req)?; - info!("Unknown CLI requested ListVmNodesStream: {req:?}"); - // TODO: optimize this query so that it gets only one node - let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; - if let Some(node) = nodes.into_iter().next() { - return Ok(Response::new(node.into())); - } - Err(Status::not_found("Could not find any node based on your search criteria")) - } -} - -trait PubkeyGetter { - fn get_pubkey(&self) -> Option; -} - -macro_rules! impl_pubkey_getter { - ($t:ty, $field:ident) => { - impl PubkeyGetter for $t { - fn get_pubkey(&self) -> Option { - Some(self.$field.clone()) - } - } - }; - ($t:ty) => { - impl PubkeyGetter for $t { - fn get_pubkey(&self) -> Option { - None - } - } - }; -} - -impl_pubkey_getter!(Pubkey, pubkey); -impl_pubkey_getter!(NewVmReq, admin_pubkey); -impl_pubkey_getter!(DeleteVmReq, admin_pubkey); -impl_pubkey_getter!(UpdateVmReq, admin_pubkey); -impl_pubkey_getter!(ExtendVmReq, admin_pubkey); -impl_pubkey_getter!(ReportNodeReq, admin_pubkey); -impl_pubkey_getter!(ListVmContractsReq, wallet); -impl_pubkey_getter!(RegisterVmNodeReq, node_pubkey); -impl_pubkey_getter!(RegOperatorReq, pubkey); -impl_pubkey_getter!(KickReq, operator_wallet); -impl_pubkey_getter!(BanUserReq, operator_wallet); - -impl_pubkey_getter!(VmNodeFilters); -impl_pubkey_getter!(Empty); -impl_pubkey_getter!(AirdropReq); -impl_pubkey_getter!(SlashReq); - -// impl_pubkey_getter!(NewAppReq, admin_pubkey); -// impl_pubkey_getter!(DelAppReq, admin_pubkey); -// impl_pubkey_getter!(ListAppContractsReq, admin_pubkey); -// -// impl_pubkey_getter!(RegisterAppNodeReq); -// impl_pubkey_getter!(AppNodeFilters); - -fn check_sig_from_req(req: Request) -> Result { - let time = match req.metadata().get("timestamp") { - Some(t) => t.clone(), - None => return Err(Status::unauthenticated("Timestamp not found in metadata.")), - }; - let time = time - .to_str() - .map_err(|_| Status::unauthenticated("Timestamp in metadata is not a string"))?; - - let now = chrono::Utc::now(); - let parsed_time = chrono::DateTime::parse_from_rfc3339(time) - .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; - let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); - if !(-4..=4).contains(&seconds_elapsed) { - return Err(Status::unauthenticated(format!( - "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", - parsed_time, now - ))); - } - - let signature = match req.metadata().get("request-signature") { - Some(t) => t, - None => return Err(Status::unauthenticated("signature not found in metadata.")), - }; - let signature = bs58::decode(signature) - .into_vec() - .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; - let signature = ed25519_dalek::Signature::from_bytes( - signature - .as_slice() - .try_into() - .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, - ); - - let pubkey_value = match req.metadata().get("pubkey") { - Some(p) => p.clone(), - None => return Err(Status::unauthenticated("pubkey not found in metadata.")), - }; - let pubkey = ed25519_dalek::VerifyingKey::from_bytes( - &bs58::decode(&pubkey_value) - .into_vec() - .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? - .try_into() - .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, - ) - .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; - - let req = req.into_inner(); - let message = format!("{time}{req:?}"); - use ed25519_dalek::Verifier; - pubkey - .verify(message.as_bytes(), &signature) - .map_err(|_| Status::unauthenticated("the signature is not valid"))?; - if let Some(req_pubkey) = req.get_pubkey() { - if *pubkey_value.to_str().unwrap() != req_pubkey { - return Err(Status::unauthenticated( - "pubkey of signature does not match pubkey of request", - )); - } - } - Ok(req) -} - -fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> { - let now = chrono::Utc::now(); - let parsed_time = chrono::DateTime::parse_from_rfc3339(time) - .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; - let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); - if !(-4..=4).contains(&seconds_elapsed) { - return Err(Status::unauthenticated(format!( - "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", - parsed_time, now - ))); - } - - let signature = bs58::decode(sig) - .into_vec() - .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; - let signature = ed25519_dalek::Signature::from_bytes( - signature - .as_slice() - .try_into() - .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, - ); - - let pubkey = ed25519_dalek::VerifyingKey::from_bytes( - &bs58::decode(&pubkey) - .into_vec() - .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? - .try_into() - .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, - ) - .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; - - let msg = time.to_string() + msg; - use ed25519_dalek::Verifier; - pubkey - .verify(msg.as_bytes(), &signature) - .map_err(|_| Status::unauthenticated("the signature is not valid"))?; - - Ok(()) -} - -fn check_admin_key(req: &Request) -> Result<(), Status> { - let pubkey = match req.metadata().get("pubkey") { - Some(p) => p.clone(), - None => return Err(Status::unauthenticated("pubkey not found in metadata.")), - }; - let pubkey = pubkey - .to_str() - .map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?; - - if !ADMIN_ACCOUNTS.contains(&pubkey) { - return Err(Status::unauthenticated("This operation is reserved to admin accounts")); - } - - Ok(()) -} diff --git a/src/grpc/app.rs b/src/grpc/app.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/grpc/app.rs @@ -0,0 +1 @@ + diff --git a/src/grpc/common/mod.rs b/src/grpc/common/mod.rs new file mode 100644 index 0000000..4b3bcc0 --- /dev/null +++ b/src/grpc/common/mod.rs @@ -0,0 +1,173 @@ +pub mod types; + +use crate::constants::ADMIN_ACCOUNTS; +use detee_shared::{ + common_proto::{Empty, Pubkey}, + general_proto::{AirdropReq, BanUserReq, KickReq, RegOperatorReq, ReportNodeReq, SlashReq}, + vm_proto::{ListVmContractsReq, *}, +}; +use tonic::{Request, Status}; + +pub trait PubkeyGetter { + fn get_pubkey(&self) -> Option; +} + +macro_rules! impl_pubkey_getter { + ($t:ty, $field:ident) => { + impl PubkeyGetter for $t { + fn get_pubkey(&self) -> Option { + Some(self.$field.clone()) + } + } + }; + ($t:ty) => { + impl PubkeyGetter for $t { + fn get_pubkey(&self) -> Option { + None + } + } + }; +} + +impl_pubkey_getter!(Pubkey, pubkey); +impl_pubkey_getter!(NewVmReq, admin_pubkey); +impl_pubkey_getter!(DeleteVmReq, admin_pubkey); +impl_pubkey_getter!(UpdateVmReq, admin_pubkey); +impl_pubkey_getter!(ExtendVmReq, admin_pubkey); +impl_pubkey_getter!(ReportNodeReq, admin_pubkey); +impl_pubkey_getter!(ListVmContractsReq, wallet); +impl_pubkey_getter!(RegisterVmNodeReq, node_pubkey); +impl_pubkey_getter!(RegOperatorReq, pubkey); +impl_pubkey_getter!(KickReq, operator_wallet); +impl_pubkey_getter!(BanUserReq, operator_wallet); + +impl_pubkey_getter!(VmNodeFilters); +impl_pubkey_getter!(Empty); +impl_pubkey_getter!(AirdropReq); +impl_pubkey_getter!(SlashReq); + +// impl_pubkey_getter!(NewAppReq, admin_pubkey); +// impl_pubkey_getter!(DelAppReq, admin_pubkey); +// impl_pubkey_getter!(ListAppContractsReq, admin_pubkey); +// +// impl_pubkey_getter!(RegisterAppNodeReq); +// impl_pubkey_getter!(AppNodeFilters); + +pub fn check_sig_from_req(req: Request) -> Result { + let time = match req.metadata().get("timestamp") { + Some(t) => t.clone(), + None => return Err(Status::unauthenticated("Timestamp not found in metadata.")), + }; + let time = time + .to_str() + .map_err(|_| Status::unauthenticated("Timestamp in metadata is not a string"))?; + + let now = chrono::Utc::now(); + let parsed_time = chrono::DateTime::parse_from_rfc3339(time) + .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; + let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); + if !(-4..=4).contains(&seconds_elapsed) { + return Err(Status::unauthenticated(format!( + "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", + parsed_time, now + ))); + } + + let signature = match req.metadata().get("request-signature") { + Some(t) => t, + None => return Err(Status::unauthenticated("signature not found in metadata.")), + }; + let signature = bs58::decode(signature) + .into_vec() + .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; + let signature = ed25519_dalek::Signature::from_bytes( + signature + .as_slice() + .try_into() + .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, + ); + + let pubkey_value = match req.metadata().get("pubkey") { + Some(p) => p.clone(), + None => return Err(Status::unauthenticated("pubkey not found in metadata.")), + }; + let pubkey = ed25519_dalek::VerifyingKey::from_bytes( + &bs58::decode(&pubkey_value) + .into_vec() + .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? + .try_into() + .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, + ) + .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; + + let req = req.into_inner(); + let message = format!("{time}{req:?}"); + use ed25519_dalek::Verifier; + pubkey + .verify(message.as_bytes(), &signature) + .map_err(|_| Status::unauthenticated("the signature is not valid"))?; + if let Some(req_pubkey) = req.get_pubkey() { + if *pubkey_value.to_str().unwrap() != req_pubkey { + return Err(Status::unauthenticated( + "pubkey of signature does not match pubkey of request", + )); + } + } + Ok(req) +} + +pub fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> { + let now = chrono::Utc::now(); + let parsed_time = chrono::DateTime::parse_from_rfc3339(time) + .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; + let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); + if !(-4..=4).contains(&seconds_elapsed) { + return Err(Status::unauthenticated(format!( + "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", + parsed_time, now + ))); + } + + let signature = bs58::decode(sig) + .into_vec() + .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; + let signature = ed25519_dalek::Signature::from_bytes( + signature + .as_slice() + .try_into() + .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, + ); + + let pubkey = ed25519_dalek::VerifyingKey::from_bytes( + &bs58::decode(&pubkey) + .into_vec() + .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? + .try_into() + .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, + ) + .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; + + let msg = time.to_string() + msg; + use ed25519_dalek::Verifier; + pubkey + .verify(msg.as_bytes(), &signature) + .map_err(|_| Status::unauthenticated("the signature is not valid"))?; + + Ok(()) +} + +pub fn check_admin_key(req: &Request) -> Result<(), Status> { + let pubkey = match req.metadata().get("pubkey") { + Some(p) => p.clone(), + None => return Err(Status::unauthenticated("pubkey not found in metadata.")), + }; + let pubkey = pubkey + .to_str() + .map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?; + + if !ADMIN_ACCOUNTS.contains(&pubkey) { + return Err(Status::unauthenticated("This operation is reserved to admin accounts")); + } + + Ok(()) +} diff --git a/src/grpc/common/types.rs b/src/grpc/common/types.rs new file mode 100644 index 0000000..cb34d97 --- /dev/null +++ b/src/grpc/common/types.rs @@ -0,0 +1,222 @@ +use crate::db; +use detee_shared::app_proto::AppNodeListResp; +use detee_shared::{ + general_proto::{AccountBalance, ListOperatorsResp}, + vm_proto::*, +}; +use nanoid::nanoid; + +use surrealdb::RecordId; + +impl From for AccountBalance { + fn from(account: db::Account) -> Self { + AccountBalance { balance: account.balance, tmp_locked: account.tmp_locked } + } +} + +impl From for db::NewVmReq { + fn from(new_vm_req: NewVmReq) -> Self { + Self { + id: RecordId::from((db::NEW_VM_REQ, nanoid!(40, &db::ID_ALPHABET))), + admin: RecordId::from((db::ACCOUNT, new_vm_req.admin_pubkey)), + vm_node: RecordId::from((db::VM_NODE, new_vm_req.node_pubkey)), + hostname: new_vm_req.hostname, + extra_ports: new_vm_req.extra_ports, + public_ipv4: new_vm_req.public_ipv4, + public_ipv6: new_vm_req.public_ipv6, + disk_size_gb: new_vm_req.disk_size_gb, + vcpus: new_vm_req.vcpus, + memory_mb: new_vm_req.memory_mb, + kernel_url: new_vm_req.kernel_url, + kernel_sha: new_vm_req.kernel_sha, + dtrfs_url: new_vm_req.dtrfs_url, + dtrfs_sha: new_vm_req.dtrfs_sha, + price_per_unit: new_vm_req.price_per_unit, + locked_nano: new_vm_req.locked_nano, + created_at: surrealdb::sql::Datetime::default(), + error: String::new(), + } + } +} + +impl From for NewVmReq { + fn from(new_vm_req: db::NewVmReq) -> Self { + Self { + uuid: new_vm_req.id.key().to_string(), + hostname: new_vm_req.hostname, + admin_pubkey: new_vm_req.admin.key().to_string(), + node_pubkey: new_vm_req.vm_node.key().to_string(), + extra_ports: new_vm_req.extra_ports, + public_ipv4: new_vm_req.public_ipv4, + public_ipv6: new_vm_req.public_ipv6, + disk_size_gb: new_vm_req.disk_size_gb, + vcpus: new_vm_req.vcpus, + memory_mb: new_vm_req.memory_mb, + kernel_url: new_vm_req.kernel_url, + kernel_sha: new_vm_req.kernel_sha, + dtrfs_url: new_vm_req.dtrfs_url, + dtrfs_sha: new_vm_req.dtrfs_sha, + price_per_unit: new_vm_req.price_per_unit, + locked_nano: new_vm_req.locked_nano, + } + } +} + +impl From for NewVmResp { + fn from(resp: db::NewVmResp) -> Self { + match resp { + // TODO: This will require a small architecture change to pass MeasurementArgs from + // Daemon to CLI + db::NewVmResp::Args(uuid, args) => { + NewVmResp { uuid, error: String::new(), args: Some(args) } + } + db::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None }, + } + } +} + +impl From for UpdateVmReq { + fn from(update_vm_req: db::UpdateVmReq) -> Self { + Self { + uuid: update_vm_req.id.key().to_string(), + // daemon does not care about VM hostname + hostname: String::new(), + admin_pubkey: update_vm_req.admin.key().to_string(), + disk_size_gb: update_vm_req.disk_size_gb, + vcpus: update_vm_req.vcpus, + memory_mb: update_vm_req.memory_mb, + kernel_url: update_vm_req.kernel_url, + kernel_sha: update_vm_req.kernel_sha, + dtrfs_url: update_vm_req.dtrfs_url, + dtrfs_sha: update_vm_req.dtrfs_sha, + } + } +} + +impl From for DeleteVmReq { + fn from(delete_vm_req: db::DeletedVm) -> Self { + Self { + uuid: delete_vm_req.id.key().to_string(), + admin_pubkey: delete_vm_req.admin.key().to_string(), + } + } +} + +impl From for BrainVmMessage { + fn from(notification: db::DaemonNotification) -> Self { + match notification { + db::DaemonNotification::Create(new_vm_req) => { + BrainVmMessage { msg: Some(brain_vm_message::Msg::NewVmReq(new_vm_req.into())) } + } + db::DaemonNotification::Update(update_vm_req) => BrainVmMessage { + msg: Some(brain_vm_message::Msg::UpdateVmReq(update_vm_req.into())), + }, + db::DaemonNotification::Delete(deleted_vm) => { + BrainVmMessage { msg: Some(brain_vm_message::Msg::DeleteVm(deleted_vm.into())) } + } + } + } +} + +impl From for VmContract { + fn from(db_c: db::ActiveVmWithNode) -> Self { + let mut exposed_ports = Vec::new(); + for port in db_c.mapped_ports.iter() { + exposed_ports.push(port.0); + } + VmContract { + uuid: db_c.id.key().to_string(), + hostname: db_c.hostname.clone(), + admin_pubkey: db_c.admin.key().to_string(), + node_pubkey: db_c.vm_node.id.key().to_string(), + node_ip: db_c.vm_node.ip.clone(), + location: format!( + "{}, {}, {}", + db_c.vm_node.city, db_c.vm_node.region, db_c.vm_node.country + ), + memory_mb: db_c.memory_mb, + vcpus: db_c.vcpus, + disk_size_gb: db_c.disk_size_gb, + mapped_ports: db_c + .mapped_ports + .iter() + .map(|(h, g)| MappedPort { host_port: *h, guest_port: *g }) + .collect(), + vm_public_ipv6: db_c.public_ipv6.clone(), + vm_public_ipv4: db_c.public_ipv4.clone(), + locked_nano: db_c.locked_nano, + dtrfs_sha: db_c.dtrfs_sha.clone(), + kernel_sha: db_c.kernel_sha.clone(), + nano_per_minute: db_c.price_per_minute(), + created_at: db_c.created_at.to_rfc3339(), + // TODO: remove updated_at from the proto + // This will get moved to VM history (users will be able to + // query old contracts, which also shows updates of existing contracts). + updated_at: db_c.created_at.to_rfc3339(), + collected_at: db_c.collected_at.to_rfc3339(), + } + } +} + +impl From for tonic::Status { + fn from(e: db::Error) -> Self { + Self::internal(format!("Internal error: {e}")) + } +} + +impl From for ListOperatorsResp { + fn from(db_o: db::Operator) -> Self { + ListOperatorsResp { + pubkey: db_o.account.key().to_string(), + escrow: db_o.escrow, + email: db_o.email, + app_nodes: db_o.app_nodes, + vm_nodes: db_o.vm_nodes, + reports: db_o.reports, + } + } +} + +impl From for VmNodeListResp { + fn from(vm_node: db::VmNodeWithReports) -> Self { + Self { + operator: vm_node.operator.key().to_string(), + node_pubkey: vm_node.id.key().to_string(), + country: vm_node.country, + region: vm_node.region, + city: vm_node.city, + ip: vm_node.ip, + reports: vm_node.reports.iter().map(|n| n.reason.clone()).collect(), + price: vm_node.price, + } + } +} + +impl From for AppNodeListResp { + fn from(app_node: db::AppNodeWithReports) -> Self { + Self { + operator: app_node.operator.key().to_string(), + node_pubkey: app_node.id.key().to_string(), + country: app_node.country, + region: app_node.region, + city: app_node.city, + ip: app_node.ip, + reports: app_node.reports.iter().map(|n| n.reason.clone()).collect(), + price: app_node.price, + } + } +} + +impl From for db::VmNodeResources { + fn from(res: VmNodeResources) -> Self { + Self { + avail_mem_mb: res.avail_memory_mb, + avail_vcpus: res.avail_vcpus, + avail_storage_gbs: res.avail_storage_gb, + avail_ipv4: res.avail_ipv4, + avail_ipv6: res.avail_ipv6, + avail_ports: res.avail_ports, + max_ports_per_vm: res.max_ports_per_vm, + } + } +} diff --git a/src/grpc/general.rs b/src/grpc/general.rs new file mode 100644 index 0000000..aa30353 --- /dev/null +++ b/src/grpc/general.rs @@ -0,0 +1,201 @@ +use crate::db; +use crate::grpc::prelude::*; +use detee_shared::app_proto::AppContract; +use detee_shared::{ + common_proto::{Empty, Pubkey}, + general_proto::{ + brain_general_cli_server::BrainGeneralCli, Account, AccountBalance, AirdropReq, BanUserReq, + InspectOperatorResp, KickReq, KickResp, ListOperatorsResp, RegOperatorReq, ReportNodeReq, + SlashReq, + }, + vm_proto::VmContract, +}; +use surrealdb::{engine::remote::ws::Client, Surreal}; + +use std::pin::Pin; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::Stream; +use tonic::{Request, Response, Status}; + +pub struct BrainGeneralCliForReal { + pub db: Arc>, +} + +impl BrainGeneralCliForReal { + pub fn new(db: Arc>) -> Self { + Self { db } + } +} + +#[tonic::async_trait] +impl BrainGeneralCli for BrainGeneralCliForReal { + type ListAccountsStream = Pin> + Send>>; + type ListAllAppContractsStream = + Pin> + Send>>; + type ListAllVmContractsStream = Pin> + Send>>; + type ListOperatorsStream = + Pin> + Send>>; + + async fn get_balance(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + Ok(Response::new(db::Account::get(&self.db, &req.pubkey).await?.into())) + } + + async fn report_node(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + let (account, node, contract_id) = + match db::ActiveVmWithNode::get_by_uuid(&self.db, &req.contract).await? { + Some(vm_contract) + if vm_contract.admin.key().to_string() == req.admin_pubkey + && vm_contract.vm_node.id.key().to_string() == req.node_pubkey => + { + (vm_contract.admin, vm_contract.vm_node.id, vm_contract.id.to_string()) + } + _ => match db::ActiveAppWithNode::get_by_uuid(&self.db, &req.contract).await? { + Some(app_contract) + if app_contract.admin.key().to_string() == req.admin_pubkey + && app_contract.app_node.id.key().to_string() == req.node_pubkey => + { + (app_contract.admin, app_contract.app_node.id, app_contract.id.to_string()) + } + _ => { + return Err(Status::unauthenticated("No contract found by this ID.")); + } + }, + }; + db::Report::create(&self.db, account, node, req.reason, contract_id).await?; + Ok(Response::new(Empty {})) + } + + async fn list_operators( + &self, + req: Request, + ) -> Result, Status> { + let _ = check_sig_from_req(req)?; + let operators = db::Operator::list(&self.db).await?; + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for op in operators { + let _ = tx.send(Ok(op.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(output_stream) as Self::ListOperatorsStream)) + } + + async fn inspect_operator( + &self, + req: Request, + ) -> Result, Status> { + match db::Operator::inspect_nodes(&self.db, &req.into_inner().pubkey).await? { + (Some(op), vm_nodes, app_nodes) => Ok(Response::new(InspectOperatorResp { + operator: Some(op.into()), + vm_nodes: vm_nodes.into_iter().map(|n| n.into()).collect(), + app_nodes: app_nodes.into_iter().map(|n| n.into()).collect(), + })), + (None, _, _) => Err(Status::not_found("The wallet you specified is not an operator")), + } + } + + async fn register_operator( + &self, + _req: Request, + ) -> Result, Status> { + todo!(); + // let req = check_sig_from_req(req)?; + // info!("Regitering new operator: {req:?}"); + // match self.data.register_operator(req) { + // Ok(()) => Ok(Response::new(Empty {})), + // Err(e) => Err(Status::failed_precondition(e.to_string())), + // } + } + + async fn kick_contract(&self, _req: Request) -> Result, Status> { + todo!(); + // let req = check_sig_from_req(req)?; + // match self.data.kick_contract(&req.operator_wallet, &req.contract_uuid, &req.reason).await { + // Ok(nano_lp) => Ok(Response::new(KickResp { nano_lp })), + // Err(e) => Err(Status::permission_denied(e.to_string())), + // } + } + + async fn ban_user(&self, _req: Request) -> Result, Status> { + todo!(); + // let req = check_sig_from_req(req)?; + // self.data.ban_user(&req.operator_wallet, &req.user_wallet); + // Ok(Response::new(Empty {})) + } + + // admin commands + + async fn airdrop(&self, req: Request) -> Result, Status> { + check_admin_key(&req)?; + let req = check_sig_from_req(req)?; + db::Account::airdrop(&self.db, &req.pubkey, req.tokens).await?; + Ok(Response::new(Empty {})) + } + + async fn slash(&self, _req: Request) -> Result, Status> { + todo!(); + // check_admin_key(&req)?; + // let req = check_sig_from_req(req)?; + // self.data.slash_account(&req.pubkey, req.tokens); + // Ok(Response::new(Empty {})) + } + + async fn list_accounts( + &self, + _req: Request, + ) -> Result, Status> { + todo!(); + // check_admin_key(&req)?; + // let _ = check_sig_from_req(req)?; + // let accounts = self.data.list_accounts(); + // let (tx, rx) = mpsc::channel(6); + // tokio::spawn(async move { + // for account in accounts { + // let _ = tx.send(Ok(account.into())).await; + // } + // }); + // let output_stream = ReceiverStream::new(rx); + // Ok(Response::new(Box::pin(output_stream) as Self::ListAccountsStream)) + } + + async fn list_all_vm_contracts( + &self, + _req: Request, + ) -> Result, Status> { + todo!(); + // check_admin_key(&req)?; + // let _ = check_sig_from_req(req)?; + // let contracts = self.data.list_all_contracts(); + // let (tx, rx) = mpsc::channel(6); + // tokio::spawn(async move { + // for contract in contracts { + // let _ = tx.send(Ok(contract.into())).await; + // } + // }); + // let output_stream = ReceiverStream::new(rx); + // Ok(Response::new(Box::pin(output_stream) as Self::ListAllVmContractsStream)) + } + + async fn list_all_app_contracts( + &self, + _req: tonic::Request, + ) -> Result, Status> { + todo!(); + // check_admin_key(&req)?; + // let _ = check_sig_from_req(req)?; + // let contracts = self.data.list_all_app_contracts(); + // let (tx, rx) = mpsc::channel(6); + // tokio::spawn(async move { + // for contract in contracts { + // let _ = tx.send(Ok(contract.into())).await; + // } + // }); + // let output_stream = ReceiverStream::new(rx); + // Ok(Response::new(Box::pin(output_stream))) + } +} diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs new file mode 100644 index 0000000..c825978 --- /dev/null +++ b/src/grpc/mod.rs @@ -0,0 +1,12 @@ +pub mod app; +pub mod common; +pub mod general; +pub mod vm; + +pub mod prelude { + pub use super::common::{ + check_admin_key, check_sig_from_parts, check_sig_from_req, PubkeyGetter, + }; + pub use super::general::BrainGeneralCliForReal; + pub use super::vm::{BrainVmCliForReal, BrainVmDaemonForReal}; +} diff --git a/src/grpc/vm.rs b/src/grpc/vm.rs new file mode 100644 index 0000000..e1bba01 --- /dev/null +++ b/src/grpc/vm.rs @@ -0,0 +1,342 @@ +#![allow(dead_code)] +use crate::constants::{ACCOUNT, VM_NODE}; +use crate::db; +use crate::grpc::prelude::*; +use detee_shared::{ + common_proto::Empty, + vm_proto::{ + brain_vm_cli_server::BrainVmCli, brain_vm_daemon_server::BrainVmDaemon, ListVmContractsReq, + *, + }, +}; +use surrealdb::{engine::remote::ws::Client, Surreal}; + +use log::info; +use std::pin::Pin; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::{Stream, StreamExt}; +use tonic::{Request, Response, Status, Streaming}; + +pub struct BrainVmDaemonForReal { + pub db: Arc>, +} + +impl BrainVmDaemonForReal { + pub fn new(db: Arc>) -> Self { + Self { db } + } +} + +#[tonic::async_trait] +impl BrainVmDaemon for BrainVmDaemonForReal { + type RegisterVmNodeStream = Pin> + Send>>; + async fn register_vm_node( + &self, + req: Request, + ) -> Result, Status> { + let req = check_sig_from_req(req)?; + info!("Starting registration process for {:?}", req); + db::VmNode { + id: surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())), + operator: surrealdb::RecordId::from((ACCOUNT, req.operator_wallet)), + country: req.country, + region: req.region, + city: req.city, + ip: req.main_ip, + price: req.price, + avail_mem_mb: 0, + avail_vcpus: 0, + avail_storage_gbs: 0, + avail_ipv4: 0, + avail_ipv6: 0, + avail_ports: 0, + max_ports_per_vm: 0, + offline_minutes: 0, + } + .register(&self.db) + .await?; + + info!("Sending existing contracts to {}", req.node_pubkey); + let contracts = db::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?; + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for contract in contracts { + let _ = tx.send(Ok(contract.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(output_stream) as Self::RegisterVmNodeStream)) + } + + type BrainMessagesStream = Pin> + Send>>; + async fn brain_messages( + &self, + req: Request, + ) -> Result, Status> { + let auth = req.into_inner(); + let pubkey = auth.pubkey.clone(); + check_sig_from_parts( + &pubkey, + &auth.timestamp, + &format!("{:?}", auth.contracts), + &auth.signature, + )?; + info!("Daemon {} connected to receive brain messages", pubkey); + + let (tx, rx) = mpsc::channel(6); + { + let db = self.db.clone(); + let pubkey = pubkey.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + match db::listen_for_node::(&db, &pubkey, tx).await { + Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"), + Err(e) => { + log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}") + } + }; + }); + } + { + let db = self.db.clone(); + let pubkey = pubkey.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + let _ = db::listen_for_node::(&db, &pubkey, tx.clone()).await; + }); + } + { + let db = self.db.clone(); + let pubkey = pubkey.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + let _ = db::listen_for_node::(&db, &pubkey, tx.clone()).await; + }); + } + + let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into())); + Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream)) + } + + async fn daemon_messages( + &self, + req: Request>, + ) -> Result, Status> { + let mut req_stream = req.into_inner(); + let pubkey: String; + if let Some(Ok(msg)) = req_stream.next().await { + log::debug!("demon_messages received the following auth message: {:?}", msg.msg); + if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg { + pubkey = auth.pubkey.clone(); + check_sig_from_parts( + &pubkey, + &auth.timestamp, + &format!("{:?}", auth.contracts), + &auth.signature, + )?; + } else { + return Err(Status::unauthenticated( + "Could not authenticate the daemon: could not extract auth signature", + )); + } + } else { + return Err(Status::unauthenticated("Could not authenticate the daemon")); + } + + while let Some(daemon_message) = req_stream.next().await { + match daemon_message { + Ok(msg) => match msg.msg { + Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { + // TODO: move new_vm_req to active_vm + // also handle failure properly + if !new_vm_resp.error.is_empty() { + db::NewVmReq::submit_error( + &self.db, + &new_vm_resp.uuid, + new_vm_resp.error, + ) + .await?; + } else { + db::upsert_record( + &self.db, + "measurement_args", + &new_vm_resp.uuid, + new_vm_resp.args.clone(), + ) + .await?; + if let Some(args) = new_vm_resp.args { + db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?; + } + } + } + Some(vm_daemon_message::Msg::UpdateVmResp(_update_vm_resp)) => { + todo!(); + // self.data.submit_updatevm_resp(update_vm_resp).await; + } + Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { + let node_resources: db::VmNodeResources = node_resources.into(); + node_resources.merge(&self.db, &pubkey).await?; + } + _ => {} + }, + Err(e) => { + log::warn!("Daemon disconnected: {e:?}"); + } + } + } + Ok(Response::new(Empty {})) + } +} + +pub struct BrainVmCliForReal { + pub db: Arc>, +} + +impl BrainVmCliForReal { + pub fn new(db: Arc>) -> Self { + Self { db } + } +} + +#[tonic::async_trait] +impl BrainVmCli for BrainVmCliForReal { + type ListVmContractsStream = Pin> + Send>>; + type ListVmNodesStream = Pin> + Send>>; + + async fn new_vm(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + info!("New VM requested via CLI: {req:?}"); + if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { + return Err(Status::permission_denied("This operator banned you. What did you do?")); + } + + let new_vm_req: db::NewVmReq = req.into(); + let id = new_vm_req.id.key().to_string(); + + let db = self.db.clone(); + + let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + let _ = oneshot_tx.send(db::NewVmResp::listen(&db, &id).await); + }); + new_vm_req.submit(&self.db).await?; + + match oneshot_rx.await { + Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded("Request failed due to timeout. Please try again later or contact the DeTEE devs team.")), + Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), + Err(e) => { + log::error!("Something weird happened. Reached error {e:?}"); + Err(Status::unknown( + "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", + )) + } + } + } + + async fn update_vm(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + info!("Update VM requested via CLI: {req:?}"); + todo!(); + // let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); + // self.data.submit_updatevm_req(req, oneshot_tx).await; + // match oneshot_rx.await { + // Ok(response) => { + // info!("Sending UpdateVMResp: {response:?}"); + // Ok(Response::new(response)) + // } + // Err(e) => Err(Status::unknown(format!( + // "Update VM request failed due to error: {e}" + // ))), + // } + } + + async fn extend_vm(&self, req: Request) -> Result, Status> { + let _req = check_sig_from_req(req)?; + todo!(); + // match self + // .data + // .extend_vm_contract_time(&req.uuid, &req.admin_pubkey, req.locked_nano) + // { + // Ok(()) => Ok(Response::new(Empty {})), + // Err(e) => Err(Status::unknown(format!("Could not extend contract: {e}"))), + // } + } + + async fn delete_vm(&self, req: Request) -> Result, Status> { + let _req = check_sig_from_req(req)?; + todo!(); + // match self.data.delete_vm(req).await { + // Ok(()) => Ok(Response::new(Empty {})), + // Err(e) => Err(Status::not_found(e.to_string())), + // } + } + + async fn list_vm_contracts( + &self, + req: Request, + ) -> Result, Status> { + let req = check_sig_from_req(req)?; + info!( + "CLI {} requested ListVMVmContractsStream. As operator: {}", + req.wallet, req.as_operator + ); + let mut contracts = Vec::new(); + if !req.uuid.is_empty() { + if let Some(specific_contract) = + db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? + { + if specific_contract.admin.key().to_string() == req.wallet { + contracts.push(specific_contract); + } + // TODO: allow operator to inspect contracts + } + } else if req.as_operator { + contracts + .append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?); + } else { + contracts + .append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?); + } + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for contract in contracts { + let _ = tx.send(Ok(contract.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(output_stream) as Self::ListVmContractsStream)) + } + + async fn list_vm_nodes( + &self, + req: Request, + ) -> Result, tonic::Status> { + let req = check_sig_from_req(req)?; + info!("CLI requested ListVmNodesStream: {req:?}"); + let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for node in nodes { + let _ = tx.send(Ok(node.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(output_stream) as Self::ListVmNodesStream)) + } + + async fn get_one_vm_node( + &self, + req: Request, + ) -> Result, Status> { + let req = check_sig_from_req(req)?; + info!("Unknown CLI requested ListVmNodesStream: {req:?}"); + // TODO: optimize this query so that it gets only one node + let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; + if let Some(node) = nodes.into_iter().next() { + return Ok(Response::new(node.into())); + } + Err(Status::not_found("Could not find any node based on your search criteria")) + } +}