From 6a99c146cec526249f3ae003edef7229ddc60ce9 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Fri, 25 Apr 2025 04:15:00 +0300 Subject: [PATCH] switch operator from relation to link this makes DB operations easier to write --- interim_tables.surql | 4 +- src/bin/brain.rs | 8 +- src/db.rs | 157 ++++++++++++++++------------------ src/grpc.rs | 197 ++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 267 insertions(+), 99 deletions(-) diff --git a/interim_tables.surql b/interim_tables.surql index 2746abb..b02b0b9 100644 --- a/interim_tables.surql +++ b/interim_tables.surql @@ -5,6 +5,7 @@ DEFINE FIELD escrow ON TABLE account TYPE int DEFAULT 0; DEFINE FIELD email ON TABLE account TYPE string DEFAULT ""; DEFINE TABLE vm_node SCHEMAFULL; +DEFINE FIELD operator ON TABLE vm_node TYPE record; DEFINE FIELD country ON TABLE vm_node TYPE string; DEFINE FIELD region ON TABLE vm_node TYPE string; DEFINE FIELD city ON TABLE vm_node TYPE string; @@ -37,6 +38,7 @@ DEFINE FIELD locked_nano ON TABLE vm_contract TYPE int; DEFINE FIELD collected_at ON TABLE vm_contract TYPE datetime; DEFINE TABLE app_node SCHEMAFULL; +DEFINE FIELD operator ON TABLE app_node TYPE record; DEFINE FIELD country ON TABLE app_node TYPE string; DEFINE FIELD region ON TABLE app_node TYPE string; DEFINE FIELD city ON TABLE app_node TYPE string; @@ -77,5 +79,3 @@ DEFINE FIELD contract ON TABLE kick TYPE record; DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node; DEFINE FIELD created_at ON TABLE report TYPE datetime; DEFINE FIELD reason ON TABLE report TYPE string; - -DEFINE TABLE operator TYPE RELATION FROM account TO vm_node|app_node; diff --git a/src/bin/brain.rs b/src/bin/brain.rs index f90db3d..504ab35 100644 --- a/src/bin/brain.rs +++ b/src/bin/brain.rs @@ -1,7 +1,7 @@ use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; -use surreal_brain::grpc::BrainGeneralCliMock; -use surreal_brain::grpc::BrainVmCliMock; +use surreal_brain::grpc::BrainGeneralCliForReal; +use surreal_brain::grpc::BrainVmCliForReal; use surreal_brain::db; use tonic::transport::{Identity, Server, ServerTlsConfig}; @@ -11,8 +11,8 @@ async fn main() { db::init().await.unwrap(); let addr = "0.0.0.0:31337".parse().unwrap(); - let snp_cli_server = BrainVmCliServer::new(BrainVmCliMock {}); - let general_service_server = BrainGeneralCliServer::new(BrainGeneralCliMock {}); + let snp_cli_server = BrainVmCliServer::new(BrainVmCliForReal {}); + let general_service_server = BrainGeneralCliServer::new(BrainGeneralCliForReal {}); let cert = std::fs::read_to_string("./tmp/brain-crt.pem").unwrap(); let key = std::fs::read_to_string("./tmp/brain-key.pem").unwrap(); diff --git a/src/db.rs b/src/db.rs index d076fe6..fdee374 100644 --- a/src/db.rs +++ b/src/db.rs @@ -9,10 +9,9 @@ use surrealdb::{ }; static DB: LazyLock> = LazyLock::new(Surreal::init); -const ACCOUNT: &str = "account"; -const OPERATOR: &str = "operator"; -const VM_CONTRACT: &str = "vm_contract"; -const VM_NODE: &str = "vm_node"; +pub const ACCOUNT: &str = "account"; +pub const VM_CONTRACT: &str = "vm_contract"; +pub const VM_NODE: &str = "vm_node"; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -33,7 +32,6 @@ pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<() let vm_nodes: Vec = old_data.into(); let app_nodes: Vec = old_data.into(); let vm_contracts: Vec = old_data.into(); - let operators: Vec = old_data.into(); init().await?; @@ -45,8 +43,6 @@ pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<() let _: Vec = DB.insert(()).content(app_nodes).await?; println!("Inserting vm contracts..."); let _: Vec = DB.insert("vm_contract").relation(vm_contracts).await?; - println!("Inserting operators..."); - let _: Vec = DB.insert(OPERATOR).relation(operators).await?; Ok(()) } @@ -85,6 +81,7 @@ impl Account { #[derive(Debug, Serialize, Deserialize)] pub struct VmNode { pub id: RecordId, + pub operator: RecordId, pub country: String, pub region: String, pub city: String, @@ -100,9 +97,17 @@ pub struct VmNode { pub offline_minutes: u64, } +impl VmNode { + pub async fn register(self) -> Result<(), Error> { + let _: Option = DB.upsert(self.id.clone()).content(self).await?; + Ok(()) + } +} + #[derive(Debug, Serialize, Deserialize)] -pub struct VmNodeExtended { +pub struct VmNodeWithReports { pub id: RecordId, + pub operator: RecordId, pub country: String, pub region: String, pub city: String, @@ -117,7 +122,6 @@ pub struct VmNodeExtended { pub price: u64, pub offline_minutes: u64, pub reports: Vec, - pub operator: RecordId, } #[derive(Debug, Serialize, Deserialize)] @@ -162,6 +166,8 @@ impl VmContract { } } +impl VmContract {} + #[derive(Debug, Serialize, Deserialize)] pub struct VmContractWithNode { pub id: RecordId, @@ -201,6 +207,14 @@ impl VmContractWithNode { Ok(contracts) } + pub async fn list_by_node(admin: &str) -> Result, Error> { + let mut result = DB + .query(format!("select * from {VM_CONTRACT} where out = {VM_NODE}:{admin} fetch out;")) + .await?; + let contracts: Vec = result.take(0)?; + Ok(contracts) + } + pub async fn list_by_operator(operator: &str) -> Result, Error> { let mut result = DB .query(format!( @@ -242,6 +256,7 @@ impl VmContractWithNode { #[derive(Debug, Serialize, Deserialize)] pub struct AppNode { pub id: RecordId, + pub operator: RecordId, pub country: String, pub region: String, pub city: String, @@ -256,8 +271,9 @@ pub struct AppNode { } #[derive(Debug, Serialize, Deserialize)] -pub struct AppNodeExtended { +pub struct AppNodeWithReports { pub id: RecordId, + pub operator: RecordId, pub country: String, pub region: String, pub city: String, @@ -270,7 +286,6 @@ pub struct AppNodeExtended { pub price: u64, pub offline_minutes: u64, pub reports: Vec, - pub operator: RecordId, } #[derive(Debug, Serialize, Deserialize)] @@ -344,23 +359,6 @@ impl Report { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct OperatorRelation { - #[serde(rename = "in")] - pub account: RecordId, - #[serde(rename = "out")] - pub node: RecordId, -} - -impl OperatorRelation { - fn new(account: &str, vm_node: &str) -> Self { - Self { - account: RecordId::from(("account", account.to_string())), - node: RecordId::from(("vm_node", vm_node.to_string())), - } - } -} - /// This is the operator obtained from the DB, /// however the relation is defined using OperatorRelation #[derive(Debug, Serialize, Deserialize)] @@ -377,57 +375,59 @@ impl Operator { pub async fn list() -> Result, Error> { let mut result = DB .query(format!( - "select *, - in as account, - <-account.email[0] as email, - <-account.escrow[0] as escrow, - count(->vm_node) as vm_nodes, - count(->app_node) as app_nodes, - (select in from <-account->operator->vm_node<-report).len() + - (select in from <-account->operator->app_node<-report).len() - as reports - from operator group by in;" + "array::distinct(array::flatten( [ + (select operator from vm_node group by operator).operator, + (select operator from app_node group by operator).operator + ]));" )) .await?; - let operators: Vec = result.take(0)?; + let operator_accounts: Vec = result.take(0)?; + let mut operators: Vec = Vec::new(); + for account in operator_accounts.iter() { + if let Some(operator) = Self::inspect(&account.key().to_string()).await? { + operators.push(operator); + } + } Ok(operators) } + pub async fn inspect(account: &str) -> Result, Error> { + let mut result = DB + .query(format!( + "$vm_nodes = (select id from vm_node where operator = account:{account}).id; + $app_nodes = (select id from app_node where operator = account:{account}).id; + select *, + id as account, + email, + escrow, + $vm_nodes.len() as vm_nodes, + $app_nodes.len() as app_nodes, + (select id from report where $vm_nodes contains out).len() + + (select id from report where $app_nodes contains out).len() + as reports + from account where id = account:{account};" + )) + .await?; + let operator: Option = result.take(2)?; + Ok(operator) + } + pub async fn inspect_nodes( account: &str, - ) -> Result<(Option, Vec, Vec), Error> { + ) -> Result<(Option, Vec, Vec), Error> { + let operator = Self::inspect(account).await?; let mut result = DB .query(format!( - "select *, - in as account, - <-account.email[0] as email, - <-account.escrow[0] as escrow, - count(->vm_node) as vm_nodes, - count(->app_node) as app_nodes, - (select in from <-account->operator->vm_node<-report).len() + - (select in from <-account->operator->app_node<-report).len() - as reports - from operator where in = account:{account} group by account;" + "select *, operator, <-report.* as reports from vm_node + where operator = account:{account};" )) .query(format!( - "select *, - (<-operator<-account)[0].id as operator, - <-report.* as reports - from vm_node - where (<-operator<-account)[0].id = account:{account};" - )) - .query(format!( - "select *, - (<-operator<-account)[0].id as operator, - <-report.* as reports - from app_node - where (<-operator<-account)[0].id = account:{account};" + "select *, operator, <-report.* as reports from app_node + where operator = account:{account};" )) .await?; - - let operator: Option = result.take(0)?; - let vm_nodes: Vec = result.take(1)?; - let app_nodes: Vec = result.take(2)?; + let vm_nodes: Vec = result.take(0)?; + let app_nodes: Vec = result.take(1)?; Ok((operator, vm_nodes, app_nodes)) } @@ -440,7 +440,8 @@ impl From<&old_brain::BrainData> for Vec { let mut nodes = Vec::new(); for old_node in old_data.vm_nodes.iter() { nodes.push(VmNode { - id: RecordId::from(("vm_node", old_node.public_key.clone())), + id: RecordId::from((VM_NODE, old_node.public_key.clone())), + operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), country: old_node.country.clone(), region: old_node.region.clone(), city: old_node.city.clone(), @@ -469,7 +470,11 @@ impl From<&old_brain::BrainData> for Vec { mapped_ports.push((*port, 8080 as u32)); } contracts.push(VmContract { - id: RecordId::from((VM_CONTRACT, old_c.uuid.replace("-", ""))), + id: RecordId::from(( + VM_CONTRACT, + old_c.node_pubkey.chars().take(20).collect::() + + &old_c.uuid.replace("-", "").chars().take(20).collect::(), + )), admin: RecordId::from((ACCOUNT, old_c.admin_pubkey.clone())), vm_node: RecordId::from((VM_NODE, old_c.node_pubkey.clone())), state: "active".to_string(), @@ -499,6 +504,7 @@ impl From<&old_brain::BrainData> for Vec { for old_node in old_data.app_nodes.iter() { nodes.push(AppNode { id: RecordId::from(("app_node", old_node.node_pubkey.clone())), + operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), country: old_node.country.clone(), region: old_node.region.clone(), city: old_node.city.clone(), @@ -536,18 +542,3 @@ impl From<&old_brain::BrainData> for Vec { accounts } } - -impl From<&old_brain::BrainData> for Vec { - fn from(old_data: &old_brain::BrainData) -> Self { - let mut operator_entries = Vec::new(); - for operator in old_data.operators.clone() { - for vm_node in operator.1.vm_nodes.iter() { - operator_entries.push(OperatorRelation::new(&operator.0, vm_node)); - } - for app_node in operator.1.app_nodes.iter() { - operator_entries.push(OperatorRelation::new(&operator.0, app_node)); - } - } - operator_entries - } -} diff --git a/src/grpc.rs b/src/grpc.rs index 8db2cc8..15baba9 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -8,7 +8,10 @@ use detee_shared::{ InspectOperatorResp, KickReq, KickResp, ListOperatorsResp, RegOperatorReq, ReportNodeReq, SlashReq, }, - vm_proto::{brain_vm_cli_server::BrainVmCli, ListVmContractsReq, *}, + vm_proto::{ + brain_vm_cli_server::BrainVmCli, brain_vm_daemon_server::BrainVmDaemon, ListVmContractsReq, + *, + }, }; use log::info; @@ -18,9 +21,9 @@ use tokio_stream::wrappers::ReceiverStream; // use tokio::sync::mpsc; // use tokio_stream::{wrappers::ReceiverStream, Stream}; use tokio_stream::Stream; -use tonic::{Request, Response, Status}; +use tonic::{Request, Response, Status, Streaming}; -pub struct BrainGeneralCliMock {} +pub struct BrainGeneralCliForReal {} impl From for AccountBalance { fn from(account: db::Account) -> Self { @@ -84,8 +87,8 @@ impl From for ListOperatorsResp { } } -impl From for VmNodeListResp { - fn from(vm_node: db::VmNodeExtended) -> Self { +impl From for VmNodeListResp { + fn from(vm_node: db::VmNodeWithReports) -> Self { Self { operator: vm_node.operator.key().to_string(), node_pubkey: vm_node.id.key().to_string(), @@ -99,8 +102,8 @@ impl From for VmNodeListResp { } } -impl From for AppNodeListResp { - fn from(app_node: db::AppNodeExtended) -> Self { +impl From for AppNodeListResp { + fn from(app_node: db::AppNodeWithReports) -> Self { Self { operator: app_node.operator.key().to_string(), node_pubkey: app_node.id.key().to_string(), @@ -114,8 +117,142 @@ impl From for AppNodeListResp { } } +struct BrainVmDaemonForReal {} + #[tonic::async_trait] -impl BrainGeneralCli for BrainGeneralCliMock { +impl BrainVmDaemon for BrainVmDaemonForReal { + type RegisterVmNodeStream = Pin> + Send>>; + async fn register_vm_node( + &self, + req: Request, + ) -> Result, Status> { + let req = check_sig_from_req(req)?; + info!("Starting registration process for {:?}", req); + db::VmNode { + id: surrealdb::RecordId::from((db::VM_NODE, req.node_pubkey.clone())), + operator: surrealdb::RecordId::from((db::ACCOUNT, req.operator_wallet)), + country: req.country, + region: req.region, + city: req.city, + ip: req.main_ip, + price: req.price, + avail_mem_mb: 0, + avail_vcpus: 0, + avail_storage_gbs: 0, + avail_ipv4: 0, + avail_ipv6: 0, + avail_ports: 0, + max_ports_per_vm: 0, + offline_minutes: 0, + } + .register() + .await?; + + info!("Sending existing contracts to {}", req.node_pubkey); + let contracts = db::VmContractWithNode::list_by_node(&req.node_pubkey).await?; + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for contract in contracts { + let _ = tx.send(Ok(contract.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(output_stream) as Self::RegisterVmNodeStream)) + } + + type BrainMessagesStream = Pin> + Send>>; + async fn brain_messages( + &self, + req: Request, + ) -> Result, Status> { + todo!(); + let auth = req.into_inner(); + let pubkey = auth.pubkey.clone(); + check_sig_from_parts( + &pubkey, + &auth.timestamp, + &format!("{:?}", auth.contracts), + &auth.signature, + )?; + info!("Daemon {} connected to receive brain messages", pubkey); + // A surreql query that listens live for changes: + // live select * from vm_contract where meta::id(id) in "3zRxiGRnf46vd3zAEmpa".."3zRxiGRnf46vd3zAEmpb"; + // + // And the same from Rust: + // let mut stream = db + // .select("vm_contract") + // .range("3zRxiGRnf46vd3zAEmpa".."3zRxiGRnf46vd3zAEmpb") + // .live() + // .await?; + // + // fn handle(result: Result, surrealdb::Error>) { + // println!("Received notification: {:?}", result); + // } + // + // while let Some(result) = stream.next().await { + // handle(result); + // } + // + + // let (tx, rx) = mpsc::channel(6); + //self.data.add_daemon_tx(&pubkey, tx); + //let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg)); + //Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream)) + } + + async fn daemon_messages( + &self, + _req: Request>, + ) -> Result, Status> { + todo!(); + // let mut req_stream = req.into_inner(); + // let pubkey: String; + // if let Some(Ok(msg)) = req_stream.next().await { + // log::debug!("demon_messages received the following auth message: {:?}", msg.msg); + // if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg { + // pubkey = auth.pubkey.clone(); + // check_sig_from_parts( + // &pubkey, + // &auth.timestamp, + // &format!("{:?}", auth.contracts), + // &auth.signature, + // )?; + // } else { + // return Err(Status::unauthenticated( + // "Could not authenticate the daemon: could not extract auth signature", + // )); + // } + // } else { + // return Err(Status::unauthenticated("Could not authenticate the daemon")); + // } + + // // info!("Received a message from daemon {pubkey}: {daemon_message:?}"); + // while let Some(daemon_message) = req_stream.next().await { + // match daemon_message { + // Ok(msg) => match msg.msg { + // Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { + // self.data.submit_newvm_resp(new_vm_resp).await; + // } + // Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { + // self.data.submit_updatevm_resp(update_vm_resp).await; + // } + // Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { + // self.data.submit_node_resources(node_resources); + // } + // _ => {} + // }, + // Err(e) => { + // log::warn!("Daemon disconnected: {e:?}"); + // self.data.del_daemon_tx(&pubkey); + // } + // } + // } + // Ok(Response::new(Empty {})) + } +} + +#[tonic::async_trait] +impl BrainGeneralCli for BrainGeneralCliForReal { type ListAccountsStream = Pin> + Send>>; type ListAllAppContractsStream = Pin> + Send>>; @@ -277,10 +414,10 @@ impl BrainGeneralCli for BrainGeneralCliMock { } } -pub struct BrainVmCliMock {} +pub struct BrainVmCliForReal {} #[tonic::async_trait] -impl BrainVmCli for BrainVmCliMock { +impl BrainVmCli for BrainVmCliForReal { type ListVmContractsStream = Pin> + Send>>; type ListVmNodesStream = Pin> + Send>>; @@ -532,6 +669,46 @@ fn check_sig_from_req(req: Request) -> Res Ok(req) } +fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> { + let now = chrono::Utc::now(); + let parsed_time = chrono::DateTime::parse_from_rfc3339(time) + .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; + let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); + if seconds_elapsed > 4 || seconds_elapsed < -4 { + return Err(Status::unauthenticated(format!( + "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", + parsed_time, now + ))); + } + + let signature = bs58::decode(sig) + .into_vec() + .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; + let signature = ed25519_dalek::Signature::from_bytes( + signature + .as_slice() + .try_into() + .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, + ); + + let pubkey = ed25519_dalek::VerifyingKey::from_bytes( + &bs58::decode(&pubkey) + .into_vec() + .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? + .try_into() + .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, + ) + .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; + + let msg = time.to_string() + msg; + use ed25519_dalek::Verifier; + pubkey + .verify(msg.as_bytes(), &signature) + .map_err(|_| Status::unauthenticated("the signature is not valid"))?; + + Ok(()) +} + const ADMIN_ACCOUNTS: &[&str] = &[ "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK", "FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL",