brain/src/db/mod.rs
Noor 39ee3cd84b
Merge bug-fix: Fixes on app engine
fixed app node filter query
updated proto change mb to gb on app node resource
tests for get one node for app and vm
tests for node resource updates on vm and app
fix test for app node filtering implemented ip based node filtering
mocking random public ip for node
local brain binary for test without tls
refactor test grpc services to use grpc only stream
2025-06-16 18:39:07 +05:30

236 lines
7.8 KiB
Rust

// SPDX-License-Identifier: Apache-2.0
pub mod app;
pub mod general;
pub mod vm;
use crate::constants::{
APP_NODE, DB_SCHEMA_FILES, DEFAULT_ENDPOINT, DELETED_APP, DELETED_VM, MIN_ESCROW, NEW_APP_REQ,
NEW_VM_REQ, UPDATE_VM_REQ,
};
use crate::old_brain;
use prelude::*;
use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::{Client, Ws};
use surrealdb::opt::auth::Root;
use surrealdb::{Notification, RecordId, Surreal};
use tokio::sync::mpsc::Sender;
use tokio_stream::StreamExt;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Internal DB error: {0}")]
DataBase(#[from] surrealdb::Error),
#[error("Daemon channel got closed: {0}")]
VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<VmDaemonMsg>),
#[error(transparent)]
StdIo(#[from] std::io::Error),
#[error(transparent)]
TimeOut(#[from] tokio::time::error::Elapsed),
#[error("Failed to create {0}")]
FailedToCreateDBEntry(String),
#[error("Unknown Table: {0}")]
UnknownTable(String),
#[error("Daemon channel got closed: {0}")]
AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>),
#[error("Minimum escrow amount is {MIN_ESCROW}")]
MinimalEscrow,
#[error("Insufficient funds, deposit more tokens")]
InsufficientFunds,
#[error("Contract not found")]
ContractNotFound,
#[error("Access denied")]
AccessDenied,
#[error("Failed to delete contract {0}")]
FailedToDeleteContract(String),
#[error("Failed to kick contract {0}")]
FailedKickContract(String),
#[error("Already banned {0}")]
AlreadyBanned(String),
#[error("Failed to slash operator {0}")]
FailedToSlashOperator(String),
#[error("Transation Too Big {0}")]
TooBigTransaction(String),
#[error("Unknown: {0}")]
Unknown(String),
#[error(transparent)]
Tonic(#[from] tonic::Status),
}
pub mod prelude {
pub use super::app::*;
pub use super::general::*;
pub use super::vm::*;
pub use super::*;
}
pub async fn db_connection(
db_address: &str,
username: &str,
password: &str,
ns: &str,
db: &str,
) -> Result<Surreal<Client>, Error> {
let db_connection: Surreal<Client> = Surreal::init();
db_connection.connect::<Ws>(db_address).await?;
// Sign in to the server
db_connection.signin(Root { username, password }).await?;
db_connection.use_ns(ns).use_db(db).await?;
Ok(db_connection)
}
pub async fn migration0(
db: &Surreal<Client>,
old_data: &old_brain::BrainData,
) -> Result<(), Error> {
let accounts: Vec<Account> = old_data.into();
let vm_nodes: Vec<VmNode> = old_data.into();
let app_nodes: Vec<AppNode> = old_data.into();
let active_vm: Vec<ActiveVm> = old_data.into();
let active_app: Vec<ActiveApp> = old_data.into();
for schema_data in DB_SCHEMA_FILES.map(|path| (std::fs::read_to_string(path), path)) {
let schema_file = schema_data.1;
println!("Loading schema from {schema_file}");
let schema = schema_data.0?;
db.query(schema).await?;
}
println!("Inserting accounts...");
let _: Vec<Account> = db.insert(()).content(accounts).await?;
println!("Inserting vm nodes...");
let _: Vec<VmNode> = db.insert(()).content(vm_nodes).await?;
println!("Inserting app nodes...");
let _: Vec<AppNode> = db.insert(()).content(app_nodes).await?;
println!("Inserting active vm contracts...");
let _: Vec<ActiveVm> = db.insert(()).relation(active_vm).await?;
println!("Inserting app contracts...");
let _: Vec<ActiveApp> = db.insert(()).relation(active_app).await?;
Ok(())
}
pub async fn upsert_record<SomeRecord: Serialize + 'static>(
db: &Surreal<Client>,
table: &str,
id: &str,
my_record: SomeRecord,
) -> Result<(), Error> {
#[derive(Deserialize)]
struct Wrapper {}
let _: Option<Wrapper> = db.upsert((table, id)).content(my_record).await?;
Ok(())
}
pub async fn live_vmnode_msgs<
T: std::fmt::Debug + Into<vm::VmDaemonMsg> + std::marker::Unpin + for<'de> Deserialize<'de>,
>(
db: &Surreal<Client>,
node: &str,
tx: Sender<vm::VmDaemonMsg>,
) -> Result<(), Error> {
let table_name = match std::any::type_name::<T>() {
t if t == std::any::type_name::<crate::db::vm::NewVmReq>() => NEW_VM_REQ.to_string(),
t if t == std::any::type_name::<crate::db::vm::UpdateVmReq>() => UPDATE_VM_REQ.to_string(),
t if t == std::any::type_name::<crate::db::vm::DeletedVm>() => DELETED_VM.to_string(),
t => {
log::error!("live_vmnode_msgs type {t} not supported",);
return Err(Error::UnknownTable(t.to_string()));
}
};
let mut resp =
db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?;
let mut live_stream = resp.stream::<Notification<T>>(0)?;
while let Some(result) = live_stream.next().await {
match result {
Ok(notification) => {
log::debug!("Got notification for node {node}: {notification:?}");
if notification.action == surrealdb::Action::Create {
tx.send(notification.data.into()).await?
}
}
Err(e) => {
log::error!("live_vmnode_msgs for {table_name} DB stream failed for {node}: {e}");
return Err(Error::from(e));
}
}
}
Ok(())
}
pub async fn live_appnode_msgs<
T: std::fmt::Debug + Into<app::AppDaemonMsg> + std::marker::Unpin + for<'de> Deserialize<'de>,
>(
db: &Surreal<Client>,
node_pubkey: &str,
tx: Sender<app::AppDaemonMsg>,
) -> Result<(), Error> {
let table_name = match std::any::type_name::<T>() {
t if t == std::any::type_name::<crate::db::app::NewAppReq>() => NEW_APP_REQ.to_string(),
t if t == std::any::type_name::<crate::db::app::DeletedApp>() => DELETED_APP.to_string(),
t => {
log::error!("live_appnode_msgs type {t} not supported",);
return Err(Error::UnknownTable(t.to_string()));
}
};
log::trace!("live_appnode_msgs for {table_name} DB stream for node {node_pubkey}");
// TODO: bind node_pubkey
let mut query_resp = db
.query(format!("live select * from {table_name} where out = {APP_NODE}:{node_pubkey};"))
.await?;
let mut live_stream = query_resp.stream::<Notification<T>>(0)?;
while let Some(result) = live_stream.next().await {
match result {
Ok(notification) => {
log::debug!("Got notification for node {node_pubkey}: {notification:?}");
if notification.action == surrealdb::Action::Create {
tx.send(notification.data.into()).await?
}
}
Err(e) => {
log::error!(
"live_appnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}"
);
return Err(Error::from(e));
}
}
}
Ok(())
}
pub async fn check_pubsub_node(
db: &Surreal<Client>,
id: RecordId,
) -> Result<Option<tonic::Status>, Error> {
let pub_endpoint =
std::env::var("BRAIN_PUBLIC_ENDPOINT").unwrap_or(DEFAULT_ENDPOINT.to_string());
let mut query_resp =
db.query("select pub_sub_node from $record_id").bind(("record_id", id)).await?;
let node_endpoint = query_resp
.take::<Option<PubSubNode>>(0)?
.ok_or(tonic::Status::internal("Could not get current brain endpoint"))?
.pub_sub_node;
if pub_endpoint == node_endpoint {
return Ok(None);
}
let mut status = tonic::Status::new(tonic::Code::Unavailable, "moved");
status.metadata_mut().insert("location", node_endpoint.parse().unwrap());
Ok(Some(status))
}
#[derive(Deserialize, Debug, Clone)]
pub struct ErrorFromTable {
pub error: String,
}
#[derive(Deserialize, Debug)]
struct PubSubNode {
pub pub_sub_node: String,
}