diff --git a/rustfmt.toml b/rustfmt.toml index a484977..d05ff90 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,3 +1,3 @@ reorder_impl_items = true use_small_heuristics = "Max" -imports_granularity = "Crate" +imports_granularity = "Module" diff --git a/src/bin/brain.rs b/src/bin/brain.rs index 30e99d4..6cf2e26 100644 --- a/src/bin/brain.rs +++ b/src/bin/brain.rs @@ -6,9 +6,8 @@ use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; use dotenv::dotenv; use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH}; use surreal_brain::db; -use surreal_brain::grpc::prelude::{ - BrainGeneralCliForReal, BrainVmCliForReal, BrainVmDaemonForReal, -}; +use surreal_brain::grpc::general::GeneralCliServer; +use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; use tonic::transport::{Identity, Server, ServerTlsConfig}; #[tokio::main] @@ -27,10 +26,9 @@ async fn main() { let addr = BRAIN_GRPC_ADDR.parse().unwrap(); - let snp_daemon_server = BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone())); - let snp_cli_server = BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone())); - let general_service_server = - BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone())); + let snp_daemon_server = BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())); + let snp_cli_server = BrainVmCliServer::new(VmCliServer::new(db_arc.clone())); + let general_service_server = BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone())); let cert = std::fs::read_to_string(CERT_PATH).unwrap(); let key = std::fs::read_to_string(CERT_KEY_PATH).unwrap(); diff --git a/src/db/app.rs b/src/db/app.rs index 2ac6490..c49bb0b 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -4,7 +4,9 @@ use crate::db::general::Report; use super::Error; use crate::old_brain; use serde::{Deserialize, Serialize}; -use surrealdb::{engine::remote::ws::Client, sql::Datetime, RecordId, Surreal}; +use surrealdb::engine::remote::ws::Client; +use surrealdb::sql::Datetime; +use surrealdb::{RecordId, Surreal}; #[derive(Debug, Serialize, Deserialize)] pub struct AppNode { diff --git a/src/db/common/mod.rs b/src/db/common/mod.rs deleted file mode 100644 index a093880..0000000 --- a/src/db/common/mod.rs +++ /dev/null @@ -1,114 +0,0 @@ -pub use crate::constants::{ - ACCOUNT, ACTIVE_APP, ACTIVE_VM, DB_SCHEMA_FILE, DELETED_VM, ID_ALPHABET, NEW_VM_REQ, - UPDATE_VM_REQ, VM_CONTRACT, VM_NODE, -}; - -use crate::db::prelude::*; -use crate::old_brain; -use serde::{Deserialize, Serialize}; -use surrealdb::Notification; -use surrealdb::{ - engine::remote::ws::{Client, Ws}, - opt::auth::Root, - Surreal, -}; -use tokio::sync::mpsc::Sender; -use tokio_stream::StreamExt; - -#[derive(thiserror::Error, Debug)] -pub enum Error { - #[error("Internal DB error: {0}")] - DataBase(#[from] surrealdb::Error), - #[error("Daemon channel got closed: {0}")] - DaemonConnection(#[from] tokio::sync::mpsc::error::SendError), - #[error(transparent)] - StdIo(#[from] std::io::Error), - #[error(transparent)] - TimeOut(#[from] tokio::time::error::Elapsed), -} - -pub async fn db_connection( - db_address: &str, - username: &str, - password: &str, - ns: &str, - db: &str, -) -> Result, Error> { - let db_connection: Surreal = Surreal::init(); - db_connection.connect::(db_address).await?; - // Sign in to the server - db_connection.signin(Root { username, password }).await?; - db_connection.use_ns(ns).use_db(db).await?; - Ok(db_connection) -} - -pub async fn migration0( - db: &Surreal, - old_data: &old_brain::BrainData, -) -> Result<(), Error> { - let accounts: Vec = old_data.into(); - let vm_nodes: Vec = old_data.into(); - let app_nodes: Vec = old_data.into(); - let vm_contracts: Vec = old_data.into(); - - let schema = std::fs::read_to_string(DB_SCHEMA_FILE)?; - db.query(schema).await?; - - println!("Inserting accounts..."); - let _: Vec = db.insert(()).content(accounts).await?; - println!("Inserting vm nodes..."); - let _: Vec = db.insert(()).content(vm_nodes).await?; - println!("Inserting app nodes..."); - let _: Vec = db.insert(()).content(app_nodes).await?; - println!("Inserting vm contracts..."); - let _: Vec = db.insert("vm_contract").relation(vm_contracts).await?; - - Ok(()) -} - -pub async fn upsert_record( - db: &Surreal, - table: &str, - id: &str, - my_record: SomeRecord, -) -> Result<(), Error> { - #[derive(Deserialize)] - struct Wrapper {} - let _: Option = db.create((table, id)).content(my_record).await?; - Ok(()) -} - -pub async fn listen_for_node< - T: Into + std::marker::Unpin + for<'de> Deserialize<'de>, ->( - db: &Surreal, - node: &str, - tx: Sender, -) -> Result<(), Error> { - let table_name = match std::any::type_name::() { - "surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(), - "surreal_brain::db::UpdateVmReq" => UPDATE_VM_REQ.to_string(), - "surreal_brain::db::DeletedVm" => DELETED_VM.to_string(), - wat => { - log::error!("listen_for_node: T has type {wat}"); - String::from("wat") - } - }; - let mut resp = - db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; - let mut live_stream = resp.stream::>(0)?; - while let Some(result) = live_stream.next().await { - match result { - Ok(notification) => { - if notification.action == surrealdb::Action::Create { - tx.send(notification.data.into()).await? - } - } - Err(e) => { - log::warn!("listen_for_deletion DB stream failed for {node}: {e}"); - return Err(Error::from(e)); - } - } - } - Ok(()) -} diff --git a/src/db/general.rs b/src/db/general.rs index b06e463..940de21 100644 --- a/src/db/general.rs +++ b/src/db/general.rs @@ -1,11 +1,12 @@ use crate::constants::ACCOUNT; -use crate::db::app::AppNodeWithReports; -use crate::db::vm::VmNodeWithReports; +use crate::db::prelude::*; use super::Error; use crate::old_brain; use serde::{Deserialize, Serialize}; -use surrealdb::{engine::remote::ws::Client, sql::Datetime, RecordId, Surreal}; +use surrealdb::engine::remote::ws::Client; +use surrealdb::sql::Datetime; +use surrealdb::{RecordId, Surreal}; #[derive(Debug, Serialize, Deserialize)] pub struct Account { diff --git a/src/db/mod.rs b/src/db/mod.rs index 179da9e..670f6d7 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,21 +1,118 @@ pub mod app; -pub mod common; pub mod general; pub mod vm; -pub use prelude::*; -pub mod prelude { - pub use super::app::{ActiveAppWithNode, AppNode, AppNodeWithReports}; - pub use super::common::listen_for_node; - pub use super::common::{db_connection, migration0, upsert_record, Error}; - pub use super::general::{Account, Operator, Report}; - pub use super::vm::{ - ActiveVm, ActiveVmWithNode, DaemonNotification, DeletedVm, NewVmReq, NewVmResp, - UpdateVmReq, VmNode, VmNodeResources, VmNodeWithReports, - }; +use crate::constants::{DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ}; +use crate::old_brain; +use prelude::*; +use serde::{Deserialize, Serialize}; +use surrealdb::engine::remote::ws::{Client, Ws}; +use surrealdb::opt::auth::Root; +use surrealdb::{Notification, Surreal}; +use tokio::sync::mpsc::Sender; +use tokio_stream::StreamExt; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Internal DB error: {0}")] + DataBase(#[from] surrealdb::Error), + #[error("Daemon channel got closed: {0}")] + VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), + #[error(transparent)] + StdIo(#[from] std::io::Error), + #[error(transparent)] + TimeOut(#[from] tokio::time::error::Elapsed), } -pub use crate::constants::{ - ACCOUNT, ACTIVE_APP, ACTIVE_VM, DB_SCHEMA_FILE, DELETED_VM, ID_ALPHABET, NEW_VM_REQ, - UPDATE_VM_REQ, VM_CONTRACT, VM_NODE, -}; +pub mod prelude { + pub use super::app::*; + pub use super::general::*; + pub use super::vm::*; + pub use super::*; +} + +pub async fn db_connection( + db_address: &str, + username: &str, + password: &str, + ns: &str, + db: &str, +) -> Result, Error> { + let db_connection: Surreal = Surreal::init(); + db_connection.connect::(db_address).await?; + // Sign in to the server + db_connection.signin(Root { username, password }).await?; + db_connection.use_ns(ns).use_db(db).await?; + Ok(db_connection) +} + +pub async fn migration0( + db: &Surreal, + old_data: &old_brain::BrainData, +) -> Result<(), Error> { + let accounts: Vec = old_data.into(); + let vm_nodes: Vec = old_data.into(); + let app_nodes: Vec = old_data.into(); + let vm_contracts: Vec = old_data.into(); + + let schema = std::fs::read_to_string(crate::constants::DB_SCHEMA_FILE)?; + db.query(schema).await?; + + println!("Inserting accounts..."); + let _: Vec = db.insert(()).content(accounts).await?; + println!("Inserting vm nodes..."); + let _: Vec = db.insert(()).content(vm_nodes).await?; + println!("Inserting app nodes..."); + let _: Vec = db.insert(()).content(app_nodes).await?; + println!("Inserting vm contracts..."); + let _: Vec = db.insert("vm_contract").relation(vm_contracts).await?; + + Ok(()) +} + +pub async fn upsert_record( + db: &Surreal, + table: &str, + id: &str, + my_record: SomeRecord, +) -> Result<(), Error> { + #[derive(Deserialize)] + struct Wrapper {} + let _: Option = db.create((table, id)).content(my_record).await?; + Ok(()) +} + +pub async fn listen_for_vm_node< + T: Into + std::marker::Unpin + for<'de> Deserialize<'de>, +>( + db: &Surreal, + node: &str, + tx: Sender, +) -> Result<(), Error> { + let table_name = match std::any::type_name::() { + "surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(), + "surreal_brain::db::UpdateVmReq" => UPDATE_VM_REQ.to_string(), + "surreal_brain::db::DeletedVm" => DELETED_VM.to_string(), + wat => { + log::error!("listen_for_node: T has type {wat}"); + String::from("wat") + } + }; + let mut resp = + db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; + let mut live_stream = resp.stream::>(0)?; + while let Some(result) = live_stream.next().await { + match result { + Ok(notification) => { + if notification.action == surrealdb::Action::Create { + tx.send(notification.data.into()).await? + } + } + Err(e) => { + log::warn!("listen_for_deletion DB stream failed for {node}: {e}"); + return Err(Error::from(e)); + } + } + } + Ok(()) +} diff --git a/src/db/vm.rs b/src/db/vm.rs index 54b5df6..5524d38 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -1,11 +1,14 @@ -use std::{str::FromStr, time::Duration}; +use std::str::FromStr; +use std::time::Duration; use super::Error; use crate::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, VM_NODE}; use crate::db::general::Report; use crate::old_brain; use serde::{Deserialize, Serialize}; -use surrealdb::{engine::remote::ws::Client, sql::Datetime, Notification, RecordId, Surreal}; +use surrealdb::engine::remote::ws::Client; +use surrealdb::sql::Datetime; +use surrealdb::{Notification, RecordId, Surreal}; use tokio_stream::StreamExt as _; #[derive(Debug, Serialize, Deserialize)] @@ -115,25 +118,25 @@ impl VmNodeWithReports { } } -pub enum DaemonNotification { +pub enum VmDaemonNotification { Create(NewVmReq), Update(UpdateVmReq), Delete(DeletedVm), } -impl From for DaemonNotification { +impl From for VmDaemonNotification { fn from(value: NewVmReq) -> Self { Self::Create(value) } } -impl From for DaemonNotification { +impl From for VmDaemonNotification { fn from(value: UpdateVmReq) -> Self { Self::Update(value) } } -impl From for DaemonNotification { +impl From for VmDaemonNotification { fn from(value: DeletedVm) -> Self { Self::Delete(value) } diff --git a/src/grpc/common/mod.rs b/src/grpc/common/mod.rs deleted file mode 100644 index 4b3bcc0..0000000 --- a/src/grpc/common/mod.rs +++ /dev/null @@ -1,173 +0,0 @@ -pub mod types; - -use crate::constants::ADMIN_ACCOUNTS; -use detee_shared::{ - common_proto::{Empty, Pubkey}, - general_proto::{AirdropReq, BanUserReq, KickReq, RegOperatorReq, ReportNodeReq, SlashReq}, - vm_proto::{ListVmContractsReq, *}, -}; -use tonic::{Request, Status}; - -pub trait PubkeyGetter { - fn get_pubkey(&self) -> Option; -} - -macro_rules! impl_pubkey_getter { - ($t:ty, $field:ident) => { - impl PubkeyGetter for $t { - fn get_pubkey(&self) -> Option { - Some(self.$field.clone()) - } - } - }; - ($t:ty) => { - impl PubkeyGetter for $t { - fn get_pubkey(&self) -> Option { - None - } - } - }; -} - -impl_pubkey_getter!(Pubkey, pubkey); -impl_pubkey_getter!(NewVmReq, admin_pubkey); -impl_pubkey_getter!(DeleteVmReq, admin_pubkey); -impl_pubkey_getter!(UpdateVmReq, admin_pubkey); -impl_pubkey_getter!(ExtendVmReq, admin_pubkey); -impl_pubkey_getter!(ReportNodeReq, admin_pubkey); -impl_pubkey_getter!(ListVmContractsReq, wallet); -impl_pubkey_getter!(RegisterVmNodeReq, node_pubkey); -impl_pubkey_getter!(RegOperatorReq, pubkey); -impl_pubkey_getter!(KickReq, operator_wallet); -impl_pubkey_getter!(BanUserReq, operator_wallet); - -impl_pubkey_getter!(VmNodeFilters); -impl_pubkey_getter!(Empty); -impl_pubkey_getter!(AirdropReq); -impl_pubkey_getter!(SlashReq); - -// impl_pubkey_getter!(NewAppReq, admin_pubkey); -// impl_pubkey_getter!(DelAppReq, admin_pubkey); -// impl_pubkey_getter!(ListAppContractsReq, admin_pubkey); -// -// impl_pubkey_getter!(RegisterAppNodeReq); -// impl_pubkey_getter!(AppNodeFilters); - -pub fn check_sig_from_req(req: Request) -> Result { - let time = match req.metadata().get("timestamp") { - Some(t) => t.clone(), - None => return Err(Status::unauthenticated("Timestamp not found in metadata.")), - }; - let time = time - .to_str() - .map_err(|_| Status::unauthenticated("Timestamp in metadata is not a string"))?; - - let now = chrono::Utc::now(); - let parsed_time = chrono::DateTime::parse_from_rfc3339(time) - .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; - let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); - if !(-4..=4).contains(&seconds_elapsed) { - return Err(Status::unauthenticated(format!( - "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", - parsed_time, now - ))); - } - - let signature = match req.metadata().get("request-signature") { - Some(t) => t, - None => return Err(Status::unauthenticated("signature not found in metadata.")), - }; - let signature = bs58::decode(signature) - .into_vec() - .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; - let signature = ed25519_dalek::Signature::from_bytes( - signature - .as_slice() - .try_into() - .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, - ); - - let pubkey_value = match req.metadata().get("pubkey") { - Some(p) => p.clone(), - None => return Err(Status::unauthenticated("pubkey not found in metadata.")), - }; - let pubkey = ed25519_dalek::VerifyingKey::from_bytes( - &bs58::decode(&pubkey_value) - .into_vec() - .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? - .try_into() - .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, - ) - .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; - - let req = req.into_inner(); - let message = format!("{time}{req:?}"); - use ed25519_dalek::Verifier; - pubkey - .verify(message.as_bytes(), &signature) - .map_err(|_| Status::unauthenticated("the signature is not valid"))?; - if let Some(req_pubkey) = req.get_pubkey() { - if *pubkey_value.to_str().unwrap() != req_pubkey { - return Err(Status::unauthenticated( - "pubkey of signature does not match pubkey of request", - )); - } - } - Ok(req) -} - -pub fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> { - let now = chrono::Utc::now(); - let parsed_time = chrono::DateTime::parse_from_rfc3339(time) - .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; - let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); - if !(-4..=4).contains(&seconds_elapsed) { - return Err(Status::unauthenticated(format!( - "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", - parsed_time, now - ))); - } - - let signature = bs58::decode(sig) - .into_vec() - .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; - let signature = ed25519_dalek::Signature::from_bytes( - signature - .as_slice() - .try_into() - .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, - ); - - let pubkey = ed25519_dalek::VerifyingKey::from_bytes( - &bs58::decode(&pubkey) - .into_vec() - .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? - .try_into() - .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, - ) - .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; - - let msg = time.to_string() + msg; - use ed25519_dalek::Verifier; - pubkey - .verify(msg.as_bytes(), &signature) - .map_err(|_| Status::unauthenticated("the signature is not valid"))?; - - Ok(()) -} - -pub fn check_admin_key(req: &Request) -> Result<(), Status> { - let pubkey = match req.metadata().get("pubkey") { - Some(p) => p.clone(), - None => return Err(Status::unauthenticated("pubkey not found in metadata.")), - }; - let pubkey = pubkey - .to_str() - .map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?; - - if !ADMIN_ACCOUNTS.contains(&pubkey) { - return Err(Status::unauthenticated("This operation is reserved to admin accounts")); - } - - Ok(()) -} diff --git a/src/grpc/general.rs b/src/grpc/general.rs index aa30353..9dc6244 100644 --- a/src/grpc/general.rs +++ b/src/grpc/general.rs @@ -1,16 +1,15 @@ use crate::db; -use crate::grpc::prelude::*; +use crate::grpc::{check_admin_key, check_sig_from_req}; use detee_shared::app_proto::AppContract; -use detee_shared::{ - common_proto::{Empty, Pubkey}, - general_proto::{ - brain_general_cli_server::BrainGeneralCli, Account, AccountBalance, AirdropReq, BanUserReq, - InspectOperatorResp, KickReq, KickResp, ListOperatorsResp, RegOperatorReq, ReportNodeReq, - SlashReq, - }, - vm_proto::VmContract, +use detee_shared::common_proto::{Empty, Pubkey}; +use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCli; +use detee_shared::general_proto::{ + Account, AccountBalance, AirdropReq, BanUserReq, InspectOperatorResp, KickReq, KickResp, + ListOperatorsResp, RegOperatorReq, ReportNodeReq, SlashReq, }; -use surrealdb::{engine::remote::ws::Client, Surreal}; +use detee_shared::vm_proto::VmContract; +use surrealdb::engine::remote::ws::Client; +use surrealdb::Surreal; use std::pin::Pin; use std::sync::Arc; @@ -19,18 +18,18 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_stream::Stream; use tonic::{Request, Response, Status}; -pub struct BrainGeneralCliForReal { +pub struct GeneralCliServer { pub db: Arc>, } -impl BrainGeneralCliForReal { +impl GeneralCliServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] -impl BrainGeneralCli for BrainGeneralCliForReal { +impl BrainGeneralCli for GeneralCliServer { type ListAccountsStream = Pin> + Send>>; type ListAllAppContractsStream = Pin> + Send>>; @@ -40,20 +39,20 @@ impl BrainGeneralCli for BrainGeneralCliForReal { async fn get_balance(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; - Ok(Response::new(db::Account::get(&self.db, &req.pubkey).await?.into())) + Ok(Response::new(db::general::Account::get(&self.db, &req.pubkey).await?.into())) } async fn report_node(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; let (account, node, contract_id) = - match db::ActiveVmWithNode::get_by_uuid(&self.db, &req.contract).await? { + match db::vm::ActiveVmWithNode::get_by_uuid(&self.db, &req.contract).await? { Some(vm_contract) if vm_contract.admin.key().to_string() == req.admin_pubkey && vm_contract.vm_node.id.key().to_string() == req.node_pubkey => { (vm_contract.admin, vm_contract.vm_node.id, vm_contract.id.to_string()) } - _ => match db::ActiveAppWithNode::get_by_uuid(&self.db, &req.contract).await? { + _ => match db::app::ActiveAppWithNode::get_by_uuid(&self.db, &req.contract).await? { Some(app_contract) if app_contract.admin.key().to_string() == req.admin_pubkey && app_contract.app_node.id.key().to_string() == req.node_pubkey => @@ -65,7 +64,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal { } }, }; - db::Report::create(&self.db, account, node, req.reason, contract_id).await?; + db::general::Report::create(&self.db, account, node, req.reason, contract_id).await?; Ok(Response::new(Empty {})) } @@ -74,7 +73,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal { req: Request, ) -> Result, Status> { let _ = check_sig_from_req(req)?; - let operators = db::Operator::list(&self.db).await?; + let operators = db::general::Operator::list(&self.db).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for op in operators { @@ -89,7 +88,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal { &self, req: Request, ) -> Result, Status> { - match db::Operator::inspect_nodes(&self.db, &req.into_inner().pubkey).await? { + match db::general::Operator::inspect_nodes(&self.db, &req.into_inner().pubkey).await? { (Some(op), vm_nodes, app_nodes) => Ok(Response::new(InspectOperatorResp { operator: Some(op.into()), vm_nodes: vm_nodes.into_iter().map(|n| n.into()).collect(), @@ -133,7 +132,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal { async fn airdrop(&self, req: Request) -> Result, Status> { check_admin_key(&req)?; let req = check_sig_from_req(req)?; - db::Account::airdrop(&self.db, &req.pubkey, req.tokens).await?; + db::general::Account::airdrop(&self.db, &req.pubkey, req.tokens).await?; Ok(Response::new(Empty {})) } diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs index c825978..7833c5b 100644 --- a/src/grpc/mod.rs +++ b/src/grpc/mod.rs @@ -1,12 +1,176 @@ pub mod app; -pub mod common; pub mod general; +pub mod types; pub mod vm; -pub mod prelude { - pub use super::common::{ - check_admin_key, check_sig_from_parts, check_sig_from_req, PubkeyGetter, - }; - pub use super::general::BrainGeneralCliForReal; - pub use super::vm::{BrainVmCliForReal, BrainVmDaemonForReal}; +use crate::constants::ADMIN_ACCOUNTS; +use detee_shared::common_proto::{Empty, Pubkey}; +use detee_shared::general_proto::{ + AirdropReq, BanUserReq, KickReq, RegOperatorReq, ReportNodeReq, SlashReq, +}; +use detee_shared::vm_proto::{ListVmContractsReq, *}; +use tonic::{Request, Status}; + +pub trait PubkeyGetter { + fn get_pubkey(&self) -> Option; +} + +macro_rules! impl_pubkey_getter { + ($t:ty, $field:ident) => { + impl PubkeyGetter for $t { + fn get_pubkey(&self) -> Option { + Some(self.$field.clone()) + } + } + }; + ($t:ty) => { + impl PubkeyGetter for $t { + fn get_pubkey(&self) -> Option { + None + } + } + }; +} + +impl_pubkey_getter!(Pubkey, pubkey); +impl_pubkey_getter!(NewVmReq, admin_pubkey); +impl_pubkey_getter!(DeleteVmReq, admin_pubkey); +impl_pubkey_getter!(UpdateVmReq, admin_pubkey); +impl_pubkey_getter!(ExtendVmReq, admin_pubkey); +impl_pubkey_getter!(ReportNodeReq, admin_pubkey); +impl_pubkey_getter!(ListVmContractsReq, wallet); +impl_pubkey_getter!(RegisterVmNodeReq, node_pubkey); +impl_pubkey_getter!(RegOperatorReq, pubkey); +impl_pubkey_getter!(KickReq, operator_wallet); +impl_pubkey_getter!(BanUserReq, operator_wallet); + +impl_pubkey_getter!(VmNodeFilters); +impl_pubkey_getter!(Empty); +impl_pubkey_getter!(AirdropReq); +impl_pubkey_getter!(SlashReq); + +// impl_pubkey_getter!(NewAppReq, admin_pubkey); +// impl_pubkey_getter!(DelAppReq, admin_pubkey); +// impl_pubkey_getter!(ListAppContractsReq, admin_pubkey); +// +// impl_pubkey_getter!(RegisterAppNodeReq); +// impl_pubkey_getter!(AppNodeFilters); + +pub fn check_sig_from_req(req: Request) -> Result { + let time = match req.metadata().get("timestamp") { + Some(t) => t.clone(), + None => return Err(Status::unauthenticated("Timestamp not found in metadata.")), + }; + let time = time + .to_str() + .map_err(|_| Status::unauthenticated("Timestamp in metadata is not a string"))?; + + let now = chrono::Utc::now(); + let parsed_time = chrono::DateTime::parse_from_rfc3339(time) + .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; + let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); + if !(-4..=4).contains(&seconds_elapsed) { + return Err(Status::unauthenticated(format!( + "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", + parsed_time, now + ))); + } + + let signature = match req.metadata().get("request-signature") { + Some(t) => t, + None => return Err(Status::unauthenticated("signature not found in metadata.")), + }; + let signature = bs58::decode(signature) + .into_vec() + .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; + let signature = ed25519_dalek::Signature::from_bytes( + signature + .as_slice() + .try_into() + .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, + ); + + let pubkey_value = match req.metadata().get("pubkey") { + Some(p) => p.clone(), + None => return Err(Status::unauthenticated("pubkey not found in metadata.")), + }; + let pubkey = ed25519_dalek::VerifyingKey::from_bytes( + &bs58::decode(&pubkey_value) + .into_vec() + .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? + .try_into() + .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, + ) + .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; + + let req = req.into_inner(); + let message = format!("{time}{req:?}"); + use ed25519_dalek::Verifier; + pubkey + .verify(message.as_bytes(), &signature) + .map_err(|_| Status::unauthenticated("the signature is not valid"))?; + if let Some(req_pubkey) = req.get_pubkey() { + if *pubkey_value.to_str().unwrap() != req_pubkey { + return Err(Status::unauthenticated( + "pubkey of signature does not match pubkey of request", + )); + } + } + Ok(req) +} + +pub fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> { + let now = chrono::Utc::now(); + let parsed_time = chrono::DateTime::parse_from_rfc3339(time) + .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; + let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); + if !(-4..=4).contains(&seconds_elapsed) { + return Err(Status::unauthenticated(format!( + "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", + parsed_time, now + ))); + } + + let signature = bs58::decode(sig) + .into_vec() + .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; + let signature = ed25519_dalek::Signature::from_bytes( + signature + .as_slice() + .try_into() + .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, + ); + + let pubkey = ed25519_dalek::VerifyingKey::from_bytes( + &bs58::decode(&pubkey) + .into_vec() + .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? + .try_into() + .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, + ) + .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; + + let msg = time.to_string() + msg; + use ed25519_dalek::Verifier; + pubkey + .verify(msg.as_bytes(), &signature) + .map_err(|_| Status::unauthenticated("the signature is not valid"))?; + + Ok(()) +} + +pub fn check_admin_key(req: &Request) -> Result<(), Status> { + let pubkey = match req.metadata().get("pubkey") { + Some(p) => p.clone(), + None => return Err(Status::unauthenticated("pubkey not found in metadata.")), + }; + let pubkey = pubkey + .to_str() + .map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?; + + if !ADMIN_ACCOUNTS.contains(&pubkey) { + return Err(Status::unauthenticated("This operation is reserved to admin accounts")); + } + + Ok(()) } diff --git a/src/grpc/common/types.rs b/src/grpc/types.rs similarity index 90% rename from src/grpc/common/types.rs rename to src/grpc/types.rs index cb34d97..0b22164 100644 --- a/src/grpc/common/types.rs +++ b/src/grpc/types.rs @@ -1,9 +1,8 @@ -use crate::db; +use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE}; +use crate::db::prelude as db; use detee_shared::app_proto::AppNodeListResp; -use detee_shared::{ - general_proto::{AccountBalance, ListOperatorsResp}, - vm_proto::*, -}; +use detee_shared::general_proto::{AccountBalance, ListOperatorsResp}; +use detee_shared::vm_proto::*; use nanoid::nanoid; use surrealdb::RecordId; @@ -17,9 +16,9 @@ impl From for AccountBalance { impl From for db::NewVmReq { fn from(new_vm_req: NewVmReq) -> Self { Self { - id: RecordId::from((db::NEW_VM_REQ, nanoid!(40, &db::ID_ALPHABET))), - admin: RecordId::from((db::ACCOUNT, new_vm_req.admin_pubkey)), - vm_node: RecordId::from((db::VM_NODE, new_vm_req.node_pubkey)), + id: RecordId::from((NEW_VM_REQ, nanoid!(40, &ID_ALPHABET))), + admin: RecordId::from((ACCOUNT, new_vm_req.admin_pubkey)), + vm_node: RecordId::from((VM_NODE, new_vm_req.node_pubkey)), hostname: new_vm_req.hostname, extra_ports: new_vm_req.extra_ports, public_ipv4: new_vm_req.public_ipv4, @@ -102,16 +101,16 @@ impl From for DeleteVmReq { } } -impl From for BrainVmMessage { - fn from(notification: db::DaemonNotification) -> Self { +impl From for BrainVmMessage { + fn from(notification: db::VmDaemonNotification) -> Self { match notification { - db::DaemonNotification::Create(new_vm_req) => { + db::VmDaemonNotification::Create(new_vm_req) => { BrainVmMessage { msg: Some(brain_vm_message::Msg::NewVmReq(new_vm_req.into())) } } - db::DaemonNotification::Update(update_vm_req) => BrainVmMessage { + db::VmDaemonNotification::Update(update_vm_req) => BrainVmMessage { msg: Some(brain_vm_message::Msg::UpdateVmReq(update_vm_req.into())), }, - db::DaemonNotification::Delete(deleted_vm) => { + db::VmDaemonNotification::Delete(deleted_vm) => { BrainVmMessage { msg: Some(brain_vm_message::Msg::DeleteVm(deleted_vm.into())) } } } diff --git a/src/grpc/vm.rs b/src/grpc/vm.rs index e1bba01..961eed1 100644 --- a/src/grpc/vm.rs +++ b/src/grpc/vm.rs @@ -1,15 +1,13 @@ #![allow(dead_code)] use crate::constants::{ACCOUNT, VM_NODE}; -use crate::db; -use crate::grpc::prelude::*; -use detee_shared::{ - common_proto::Empty, - vm_proto::{ - brain_vm_cli_server::BrainVmCli, brain_vm_daemon_server::BrainVmDaemon, ListVmContractsReq, - *, - }, -}; -use surrealdb::{engine::remote::ws::Client, Surreal}; +use crate::db::prelude as db; +use crate::grpc::{check_sig_from_parts, check_sig_from_req}; +use detee_shared::common_proto::Empty; +use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli; +use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemon; +use detee_shared::vm_proto::{ListVmContractsReq, *}; +use surrealdb::engine::remote::ws::Client; +use surrealdb::Surreal; use log::info; use std::pin::Pin; @@ -19,19 +17,21 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; -pub struct BrainVmDaemonForReal { +pub struct VmDaemonServer { pub db: Arc>, } -impl BrainVmDaemonForReal { +impl VmDaemonServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] -impl BrainVmDaemon for BrainVmDaemonForReal { +impl BrainVmDaemon for VmDaemonServer { + type BrainMessagesStream = Pin> + Send>>; type RegisterVmNodeStream = Pin> + Send>>; + async fn register_vm_node( &self, req: Request, @@ -70,7 +70,6 @@ impl BrainVmDaemon for BrainVmDaemonForReal { Ok(Response::new(Box::pin(output_stream) as Self::RegisterVmNodeStream)) } - type BrainMessagesStream = Pin> + Send>>; async fn brain_messages( &self, req: Request, @@ -91,7 +90,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal { let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { - match db::listen_for_node::(&db, &pubkey, tx).await { + match db::listen_for_vm_node::(&db, &pubkey, tx).await { Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"), Err(e) => { log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}") @@ -104,7 +103,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal { let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { - let _ = db::listen_for_node::(&db, &pubkey, tx.clone()).await; + let _ = db::listen_for_vm_node::(&db, &pubkey, tx.clone()).await; }); } { @@ -112,7 +111,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal { let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { - let _ = db::listen_for_node::(&db, &pubkey, tx.clone()).await; + let _ = db::listen_for_vm_node::(&db, &pubkey, tx.clone()).await; }); } @@ -190,25 +189,25 @@ impl BrainVmDaemon for BrainVmDaemonForReal { } } -pub struct BrainVmCliForReal { +pub struct VmCliServer { pub db: Arc>, } -impl BrainVmCliForReal { +impl VmCliServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] -impl BrainVmCli for BrainVmCliForReal { +impl BrainVmCli for VmCliServer { type ListVmContractsStream = Pin> + Send>>; type ListVmNodesStream = Pin> + Send>>; async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("New VM requested via CLI: {req:?}"); - if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { + if db::general::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); } diff --git a/tests/common/prepare_test_env.rs b/tests/common/prepare_test_env.rs index 96bc2b6..d3e8b06 100644 --- a/tests/common/prepare_test_env.rs +++ b/tests/common/prepare_test_env.rs @@ -1,17 +1,19 @@ +use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; -use detee_shared::{ - general_proto::brain_general_cli_server::BrainGeneralCliServer, - vm_proto::brain_vm_daemon_server::BrainVmDaemonServer, -}; +use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; use dotenv::dotenv; use hyper_util::rt::TokioIo; use std::net::SocketAddr; use std::sync::Arc; -use surreal_brain::grpc::{BrainGeneralCliForReal, BrainVmCliForReal, BrainVmDaemonForReal}; +use surreal_brain::grpc::{ + general::GeneralCliServer, + vm::{VmCliServer, VmDaemonServer}, +}; use surrealdb::engine::remote::ws::Client; use surrealdb::Surreal; use tokio::io::DuplexStream; -use tokio::{net::TcpListener, sync::OnceCell}; +use tokio::net::TcpListener; +use tokio::sync::OnceCell; use tonic::transport::{Channel, Endpoint, Server, Uri}; use tower::service_fn; @@ -58,9 +60,9 @@ pub async fn run_service_in_background() -> SocketAddr { let db_arc = Arc::new(db); Server::builder() - .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone()))) - .add_service(BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone()))) - .add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone()))) + .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) + .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) + .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) .await .unwrap(); @@ -88,9 +90,9 @@ pub async fn run_service_for_stream_server() -> DuplexStream { let db_arc = Arc::new(db); tonic::transport::Server::builder() - .add_service(BrainGeneralCliServer::new(BrainGeneralCliForReal::new(db_arc.clone()))) - .add_service(BrainVmCliServer::new(BrainVmCliForReal::new(db_arc.clone()))) - .add_service(BrainVmDaemonServer::new(BrainVmDaemonForReal::new(db_arc.clone()))) + .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) + .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) + .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await }); diff --git a/tests/common/test_utils.rs b/tests/common/test_utils.rs index 04a5b13..af25ecd 100644 --- a/tests/common/test_utils.rs +++ b/tests/common/test_utils.rs @@ -1,7 +1,6 @@ use anyhow::Result; use detee_shared::vm_proto as snp_proto; -use ed25519_dalek::Signer; -use ed25519_dalek::SigningKey; +use ed25519_dalek::{Signer, SigningKey}; use tonic::metadata::AsciiMetadataValue; use tonic::Request; diff --git a/tests/common/vm_cli_utils.rs b/tests/common/vm_cli_utils.rs index 6c8d059..55b0853 100644 --- a/tests/common/vm_cli_utils.rs +++ b/tests/common/vm_cli_utils.rs @@ -2,7 +2,7 @@ use super::test_utils::Key; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; -use surreal_brain::db; +use surreal_brain::db::prelude as db; use surrealdb::engine::remote::ws::Client; use surrealdb::Surreal; use tonic::transport::Channel; diff --git a/tests/grpc_test.rs b/tests/grpc_test.rs index 824b951..33d6ce5 100644 --- a/tests/grpc_test.rs +++ b/tests/grpc_test.rs @@ -1,19 +1,17 @@ -use common::{ - prepare_test_env::{prepare_test_db, run_service_for_stream, run_service_in_background}, - test_utils::Key, - vm_cli_utils::create_new_vm, - vm_daemon_utils::mock_vm_daemon, +use common::prepare_test_env::{ + prepare_test_db, run_service_for_stream, run_service_in_background, }; -use detee_shared::common_proto::Empty; +use common::test_utils::Key; +use common::vm_cli_utils::create_new_vm; +use common::vm_daemon_utils::mock_vm_daemon; +use detee_shared::common_proto::{Empty, Pubkey}; +use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient; use detee_shared::general_proto::ReportNodeReq; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::ListVmContractsReq; -use detee_shared::{ - common_proto::Pubkey, general_proto::brain_general_cli_client::BrainGeneralCliClient, -}; use futures::StreamExt; use surreal_brain::constants::VM_NODE; -use surreal_brain::db::VmNodeWithReports; +use surreal_brain::db::vm::VmNodeWithReports; mod common; diff --git a/tests/grpc_vm_daemon_test.rs b/tests/grpc_vm_daemon_test.rs index 02da943..0d7ab9b 100644 --- a/tests/grpc_vm_daemon_test.rs +++ b/tests/grpc_vm_daemon_test.rs @@ -1,8 +1,8 @@ -use common::{ - prepare_test_env::{prepare_test_db, run_service_for_stream, run_service_in_background}, - test_utils::Key, - vm_daemon_utils::{mock_vm_daemon, register_vm_node}, +use common::prepare_test_env::{ + prepare_test_db, run_service_for_stream, run_service_in_background, }; +use common::test_utils::Key; +use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;