switch operator from relation to link

this makes DB operations easier to write
This commit is contained in:
ghe0 2025-04-25 04:15:00 +03:00
parent d9f4df2c3d
commit 6a99c146ce
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
4 changed files with 267 additions and 99 deletions

@ -5,6 +5,7 @@ DEFINE FIELD escrow ON TABLE account TYPE int DEFAULT 0;
DEFINE FIELD email ON TABLE account TYPE string DEFAULT "";
DEFINE TABLE vm_node SCHEMAFULL;
DEFINE FIELD operator ON TABLE vm_node TYPE record<account>;
DEFINE FIELD country ON TABLE vm_node TYPE string;
DEFINE FIELD region ON TABLE vm_node TYPE string;
DEFINE FIELD city ON TABLE vm_node TYPE string;
@ -37,6 +38,7 @@ DEFINE FIELD locked_nano ON TABLE vm_contract TYPE int;
DEFINE FIELD collected_at ON TABLE vm_contract TYPE datetime;
DEFINE TABLE app_node SCHEMAFULL;
DEFINE FIELD operator ON TABLE app_node TYPE record<account>;
DEFINE FIELD country ON TABLE app_node TYPE string;
DEFINE FIELD region ON TABLE app_node TYPE string;
DEFINE FIELD city ON TABLE app_node TYPE string;
@ -77,5 +79,3 @@ DEFINE FIELD contract ON TABLE kick TYPE record<vm_contract|app_contract>;
DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node;
DEFINE FIELD created_at ON TABLE report TYPE datetime;
DEFINE FIELD reason ON TABLE report TYPE string;
DEFINE TABLE operator TYPE RELATION FROM account TO vm_node|app_node;

@ -1,7 +1,7 @@
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
use surreal_brain::grpc::BrainGeneralCliMock;
use surreal_brain::grpc::BrainVmCliMock;
use surreal_brain::grpc::BrainGeneralCliForReal;
use surreal_brain::grpc::BrainVmCliForReal;
use surreal_brain::db;
use tonic::transport::{Identity, Server, ServerTlsConfig};
@ -11,8 +11,8 @@ async fn main() {
db::init().await.unwrap();
let addr = "0.0.0.0:31337".parse().unwrap();
let snp_cli_server = BrainVmCliServer::new(BrainVmCliMock {});
let general_service_server = BrainGeneralCliServer::new(BrainGeneralCliMock {});
let snp_cli_server = BrainVmCliServer::new(BrainVmCliForReal {});
let general_service_server = BrainGeneralCliServer::new(BrainGeneralCliForReal {});
let cert = std::fs::read_to_string("./tmp/brain-crt.pem").unwrap();
let key = std::fs::read_to_string("./tmp/brain-key.pem").unwrap();

157
src/db.rs

@ -9,10 +9,9 @@ use surrealdb::{
};
static DB: LazyLock<Surreal<Client>> = LazyLock::new(Surreal::init);
const ACCOUNT: &str = "account";
const OPERATOR: &str = "operator";
const VM_CONTRACT: &str = "vm_contract";
const VM_NODE: &str = "vm_node";
pub const ACCOUNT: &str = "account";
pub const VM_CONTRACT: &str = "vm_contract";
pub const VM_NODE: &str = "vm_node";
#[derive(thiserror::Error, Debug)]
pub enum Error {
@ -33,7 +32,6 @@ pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()
let vm_nodes: Vec<VmNode> = old_data.into();
let app_nodes: Vec<AppNode> = old_data.into();
let vm_contracts: Vec<VmContract> = old_data.into();
let operators: Vec<OperatorRelation> = old_data.into();
init().await?;
@ -45,8 +43,6 @@ pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()
let _: Vec<AppNode> = DB.insert(()).content(app_nodes).await?;
println!("Inserting vm contracts...");
let _: Vec<VmContract> = DB.insert("vm_contract").relation(vm_contracts).await?;
println!("Inserting operators...");
let _: Vec<OperatorRelation> = DB.insert(OPERATOR).relation(operators).await?;
Ok(())
}
@ -85,6 +81,7 @@ impl Account {
#[derive(Debug, Serialize, Deserialize)]
pub struct VmNode {
pub id: RecordId,
pub operator: RecordId,
pub country: String,
pub region: String,
pub city: String,
@ -100,9 +97,17 @@ pub struct VmNode {
pub offline_minutes: u64,
}
impl VmNode {
pub async fn register(self) -> Result<(), Error> {
let _: Option<VmNode> = DB.upsert(self.id.clone()).content(self).await?;
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct VmNodeExtended {
pub struct VmNodeWithReports {
pub id: RecordId,
pub operator: RecordId,
pub country: String,
pub region: String,
pub city: String,
@ -117,7 +122,6 @@ pub struct VmNodeExtended {
pub price: u64,
pub offline_minutes: u64,
pub reports: Vec<Report>,
pub operator: RecordId,
}
#[derive(Debug, Serialize, Deserialize)]
@ -162,6 +166,8 @@ impl VmContract {
}
}
impl VmContract {}
#[derive(Debug, Serialize, Deserialize)]
pub struct VmContractWithNode {
pub id: RecordId,
@ -201,6 +207,14 @@ impl VmContractWithNode {
Ok(contracts)
}
pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> {
let mut result = DB
.query(format!("select * from {VM_CONTRACT} where out = {VM_NODE}:{admin} fetch out;"))
.await?;
let contracts: Vec<Self> = result.take(0)?;
Ok(contracts)
}
pub async fn list_by_operator(operator: &str) -> Result<Vec<Self>, Error> {
let mut result = DB
.query(format!(
@ -242,6 +256,7 @@ impl VmContractWithNode {
#[derive(Debug, Serialize, Deserialize)]
pub struct AppNode {
pub id: RecordId,
pub operator: RecordId,
pub country: String,
pub region: String,
pub city: String,
@ -256,8 +271,9 @@ pub struct AppNode {
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AppNodeExtended {
pub struct AppNodeWithReports {
pub id: RecordId,
pub operator: RecordId,
pub country: String,
pub region: String,
pub city: String,
@ -270,7 +286,6 @@ pub struct AppNodeExtended {
pub price: u64,
pub offline_minutes: u64,
pub reports: Vec<Report>,
pub operator: RecordId,
}
#[derive(Debug, Serialize, Deserialize)]
@ -344,23 +359,6 @@ impl Report {
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct OperatorRelation {
#[serde(rename = "in")]
pub account: RecordId,
#[serde(rename = "out")]
pub node: RecordId,
}
impl OperatorRelation {
fn new(account: &str, vm_node: &str) -> Self {
Self {
account: RecordId::from(("account", account.to_string())),
node: RecordId::from(("vm_node", vm_node.to_string())),
}
}
}
/// This is the operator obtained from the DB,
/// however the relation is defined using OperatorRelation
#[derive(Debug, Serialize, Deserialize)]
@ -377,57 +375,59 @@ impl Operator {
pub async fn list() -> Result<Vec<Self>, Error> {
let mut result = DB
.query(format!(
"select *,
in as account,
<-account.email[0] as email,
<-account.escrow[0] as escrow,
count(->vm_node) as vm_nodes,
count(->app_node) as app_nodes,
(select in from <-account->operator->vm_node<-report).len() +
(select in from <-account->operator->app_node<-report).len()
as reports
from operator group by in;"
"array::distinct(array::flatten( [
(select operator from vm_node group by operator).operator,
(select operator from app_node group by operator).operator
]));"
))
.await?;
let operators: Vec<Self> = result.take(0)?;
let operator_accounts: Vec<RecordId> = result.take(0)?;
let mut operators: Vec<Self> = Vec::new();
for account in operator_accounts.iter() {
if let Some(operator) = Self::inspect(&account.key().to_string()).await? {
operators.push(operator);
}
}
Ok(operators)
}
pub async fn inspect(account: &str) -> Result<Option<Self>, Error> {
let mut result = DB
.query(format!(
"$vm_nodes = (select id from vm_node where operator = account:{account}).id;
$app_nodes = (select id from app_node where operator = account:{account}).id;
select *,
id as account,
email,
escrow,
$vm_nodes.len() as vm_nodes,
$app_nodes.len() as app_nodes,
(select id from report where $vm_nodes contains out).len() +
(select id from report where $app_nodes contains out).len()
as reports
from account where id = account:{account};"
))
.await?;
let operator: Option<Self> = result.take(2)?;
Ok(operator)
}
pub async fn inspect_nodes(
account: &str,
) -> Result<(Option<Self>, Vec<VmNodeExtended>, Vec<AppNodeExtended>), Error> {
) -> Result<(Option<Self>, Vec<VmNodeWithReports>, Vec<AppNodeWithReports>), Error> {
let operator = Self::inspect(account).await?;
let mut result = DB
.query(format!(
"select *,
in as account,
<-account.email[0] as email,
<-account.escrow[0] as escrow,
count(->vm_node) as vm_nodes,
count(->app_node) as app_nodes,
(select in from <-account->operator->vm_node<-report).len() +
(select in from <-account->operator->app_node<-report).len()
as reports
from operator where in = account:{account} group by account;"
"select *, operator, <-report.* as reports from vm_node
where operator = account:{account};"
))
.query(format!(
"select *,
(<-operator<-account)[0].id as operator,
<-report.* as reports
from vm_node
where (<-operator<-account)[0].id = account:{account};"
))
.query(format!(
"select *,
(<-operator<-account)[0].id as operator,
<-report.* as reports
from app_node
where (<-operator<-account)[0].id = account:{account};"
"select *, operator, <-report.* as reports from app_node
where operator = account:{account};"
))
.await?;
let operator: Option<Self> = result.take(0)?;
let vm_nodes: Vec<VmNodeExtended> = result.take(1)?;
let app_nodes: Vec<AppNodeExtended> = result.take(2)?;
let vm_nodes: Vec<VmNodeWithReports> = result.take(0)?;
let app_nodes: Vec<AppNodeWithReports> = result.take(1)?;
Ok((operator, vm_nodes, app_nodes))
}
@ -440,7 +440,8 @@ impl From<&old_brain::BrainData> for Vec<VmNode> {
let mut nodes = Vec::new();
for old_node in old_data.vm_nodes.iter() {
nodes.push(VmNode {
id: RecordId::from(("vm_node", old_node.public_key.clone())),
id: RecordId::from((VM_NODE, old_node.public_key.clone())),
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
country: old_node.country.clone(),
region: old_node.region.clone(),
city: old_node.city.clone(),
@ -469,7 +470,11 @@ impl From<&old_brain::BrainData> for Vec<VmContract> {
mapped_ports.push((*port, 8080 as u32));
}
contracts.push(VmContract {
id: RecordId::from((VM_CONTRACT, old_c.uuid.replace("-", ""))),
id: RecordId::from((
VM_CONTRACT,
old_c.node_pubkey.chars().take(20).collect::<String>()
+ &old_c.uuid.replace("-", "").chars().take(20).collect::<String>(),
)),
admin: RecordId::from((ACCOUNT, old_c.admin_pubkey.clone())),
vm_node: RecordId::from((VM_NODE, old_c.node_pubkey.clone())),
state: "active".to_string(),
@ -499,6 +504,7 @@ impl From<&old_brain::BrainData> for Vec<AppNode> {
for old_node in old_data.app_nodes.iter() {
nodes.push(AppNode {
id: RecordId::from(("app_node", old_node.node_pubkey.clone())),
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
country: old_node.country.clone(),
region: old_node.region.clone(),
city: old_node.city.clone(),
@ -536,18 +542,3 @@ impl From<&old_brain::BrainData> for Vec<Account> {
accounts
}
}
impl From<&old_brain::BrainData> for Vec<OperatorRelation> {
fn from(old_data: &old_brain::BrainData) -> Self {
let mut operator_entries = Vec::new();
for operator in old_data.operators.clone() {
for vm_node in operator.1.vm_nodes.iter() {
operator_entries.push(OperatorRelation::new(&operator.0, vm_node));
}
for app_node in operator.1.app_nodes.iter() {
operator_entries.push(OperatorRelation::new(&operator.0, app_node));
}
}
operator_entries
}
}

@ -8,7 +8,10 @@ use detee_shared::{
InspectOperatorResp, KickReq, KickResp, ListOperatorsResp, RegOperatorReq, ReportNodeReq,
SlashReq,
},
vm_proto::{brain_vm_cli_server::BrainVmCli, ListVmContractsReq, *},
vm_proto::{
brain_vm_cli_server::BrainVmCli, brain_vm_daemon_server::BrainVmDaemon, ListVmContractsReq,
*,
},
};
use log::info;
@ -18,9 +21,9 @@ use tokio_stream::wrappers::ReceiverStream;
// use tokio::sync::mpsc;
// use tokio_stream::{wrappers::ReceiverStream, Stream};
use tokio_stream::Stream;
use tonic::{Request, Response, Status};
use tonic::{Request, Response, Status, Streaming};
pub struct BrainGeneralCliMock {}
pub struct BrainGeneralCliForReal {}
impl From<db::Account> for AccountBalance {
fn from(account: db::Account) -> Self {
@ -84,8 +87,8 @@ impl From<db::Operator> for ListOperatorsResp {
}
}
impl From<db::VmNodeExtended> for VmNodeListResp {
fn from(vm_node: db::VmNodeExtended) -> Self {
impl From<db::VmNodeWithReports> for VmNodeListResp {
fn from(vm_node: db::VmNodeWithReports) -> Self {
Self {
operator: vm_node.operator.key().to_string(),
node_pubkey: vm_node.id.key().to_string(),
@ -99,8 +102,8 @@ impl From<db::VmNodeExtended> for VmNodeListResp {
}
}
impl From<db::AppNodeExtended> for AppNodeListResp {
fn from(app_node: db::AppNodeExtended) -> Self {
impl From<db::AppNodeWithReports> for AppNodeListResp {
fn from(app_node: db::AppNodeWithReports) -> Self {
Self {
operator: app_node.operator.key().to_string(),
node_pubkey: app_node.id.key().to_string(),
@ -114,8 +117,142 @@ impl From<db::AppNodeExtended> for AppNodeListResp {
}
}
struct BrainVmDaemonForReal {}
#[tonic::async_trait]
impl BrainGeneralCli for BrainGeneralCliMock {
impl BrainVmDaemon for BrainVmDaemonForReal {
type RegisterVmNodeStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn register_vm_node(
&self,
req: Request<RegisterVmNodeReq>,
) -> Result<Response<Self::RegisterVmNodeStream>, Status> {
let req = check_sig_from_req(req)?;
info!("Starting registration process for {:?}", req);
db::VmNode {
id: surrealdb::RecordId::from((db::VM_NODE, req.node_pubkey.clone())),
operator: surrealdb::RecordId::from((db::ACCOUNT, req.operator_wallet)),
country: req.country,
region: req.region,
city: req.city,
ip: req.main_ip,
price: req.price,
avail_mem_mb: 0,
avail_vcpus: 0,
avail_storage_gbs: 0,
avail_ipv4: 0,
avail_ipv6: 0,
avail_ports: 0,
max_ports_per_vm: 0,
offline_minutes: 0,
}
.register()
.await?;
info!("Sending existing contracts to {}", req.node_pubkey);
let contracts = db::VmContractWithNode::list_by_node(&req.node_pubkey).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream) as Self::RegisterVmNodeStream))
}
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainVmMessage, Status>> + Send>>;
async fn brain_messages(
&self,
req: Request<DaemonStreamAuth>,
) -> Result<Response<Self::BrainMessagesStream>, Status> {
todo!();
let auth = req.into_inner();
let pubkey = auth.pubkey.clone();
check_sig_from_parts(
&pubkey,
&auth.timestamp,
&format!("{:?}", auth.contracts),
&auth.signature,
)?;
info!("Daemon {} connected to receive brain messages", pubkey);
// A surreql query that listens live for changes:
// live select * from vm_contract where meta::id(id) in "3zRxiGRnf46vd3zAEmpa".."3zRxiGRnf46vd3zAEmpb";
//
// And the same from Rust:
// let mut stream = db
// .select("vm_contract")
// .range("3zRxiGRnf46vd3zAEmpa".."3zRxiGRnf46vd3zAEmpb")
// .live()
// .await?;
//
// fn handle(result: Result<Notification<VmContract>, surrealdb::Error>) {
// println!("Received notification: {:?}", result);
// }
//
// while let Some(result) = stream.next().await {
// handle(result);
// }
//
// let (tx, rx) = mpsc::channel(6);
//self.data.add_daemon_tx(&pubkey, tx);
//let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
//Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream))
}
async fn daemon_messages(
&self,
_req: Request<Streaming<VmDaemonMessage>>,
) -> Result<Response<Empty>, Status> {
todo!();
// let mut req_stream = req.into_inner();
// let pubkey: String;
// if let Some(Ok(msg)) = req_stream.next().await {
// log::debug!("demon_messages received the following auth message: {:?}", msg.msg);
// if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg {
// pubkey = auth.pubkey.clone();
// check_sig_from_parts(
// &pubkey,
// &auth.timestamp,
// &format!("{:?}", auth.contracts),
// &auth.signature,
// )?;
// } else {
// return Err(Status::unauthenticated(
// "Could not authenticate the daemon: could not extract auth signature",
// ));
// }
// } else {
// return Err(Status::unauthenticated("Could not authenticate the daemon"));
// }
// // info!("Received a message from daemon {pubkey}: {daemon_message:?}");
// while let Some(daemon_message) = req_stream.next().await {
// match daemon_message {
// Ok(msg) => match msg.msg {
// Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => {
// self.data.submit_newvm_resp(new_vm_resp).await;
// }
// Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {
// self.data.submit_updatevm_resp(update_vm_resp).await;
// }
// Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => {
// self.data.submit_node_resources(node_resources);
// }
// _ => {}
// },
// Err(e) => {
// log::warn!("Daemon disconnected: {e:?}");
// self.data.del_daemon_tx(&pubkey);
// }
// }
// }
// Ok(Response::new(Empty {}))
}
}
#[tonic::async_trait]
impl BrainGeneralCli for BrainGeneralCliForReal {
type ListAccountsStream = Pin<Box<dyn Stream<Item = Result<Account, Status>> + Send>>;
type ListAllAppContractsStream =
Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
@ -277,10 +414,10 @@ impl BrainGeneralCli for BrainGeneralCliMock {
}
}
pub struct BrainVmCliMock {}
pub struct BrainVmCliForReal {}
#[tonic::async_trait]
impl BrainVmCli for BrainVmCliMock {
impl BrainVmCli for BrainVmCliForReal {
type ListVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
type ListVmNodesStream = Pin<Box<dyn Stream<Item = Result<VmNodeListResp, Status>> + Send>>;
@ -532,6 +669,46 @@ fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Res
Ok(req)
}
fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> {
let now = chrono::Utc::now();
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds();
if seconds_elapsed > 4 || seconds_elapsed < -4 {
return Err(Status::unauthenticated(format!(
"Date is not within 4 sec of the time of the server: CLI {} vs Server {}",
parsed_time, now
)));
}
let signature = bs58::decode(sig)
.into_vec()
.map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?;
let signature = ed25519_dalek::Signature::from_bytes(
signature
.as_slice()
.try_into()
.map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?,
);
let pubkey = ed25519_dalek::VerifyingKey::from_bytes(
&bs58::decode(&pubkey)
.into_vec()
.map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))?
.try_into()
.map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?,
)
.map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?;
let msg = time.to_string() + msg;
use ed25519_dalek::Verifier;
pubkey
.verify(msg.as_bytes(), &signature)
.map_err(|_| Status::unauthenticated("the signature is not valid"))?;
Ok(())
}
const ADMIN_ACCOUNTS: &[&str] = &[
"x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK",
"FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL",