brain/src/db/mod.rs
Noor e59a3c8fd8 Implement new app contract balance locking
minimum locking balance on deployment
locking balance on app deployment
refunding locked nano while error on daemon
returning appropreate error on app deployment
fixed some typos on logging
new timeout constants for daemon respose
minor change in schema and proto
extensive tests on app deployments
fixed some vm tests
2025-06-04 16:01:42 +00:00

201 lines
6.8 KiB
Rust

pub mod app;
pub mod general;
pub mod vm;
use crate::constants::{
APP_NODE, DB_SCHEMA_FILES, DELETED_APP, DELETED_VM, MIN_ESCROW, NEW_APP_REQ, NEW_VM_REQ,
UPDATE_VM_REQ,
};
use crate::old_brain;
use prelude::*;
use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::{Client, Ws};
use surrealdb::opt::auth::Root;
use surrealdb::{Notification, Surreal};
use tokio::sync::mpsc::Sender;
use tokio_stream::StreamExt;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Internal DB error: {0}")]
DataBase(#[from] surrealdb::Error),
#[error("Daemon channel got closed: {0}")]
VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<VmDaemonMsg>),
#[error(transparent)]
StdIo(#[from] std::io::Error),
#[error(transparent)]
TimeOut(#[from] tokio::time::error::Elapsed),
#[error("Failed to create {0}")]
FailedToCreateDBEntry(String),
#[error("Unknown Table: {0}")]
UnknownTable(String),
#[error("Daemon channel got closed: {0}")]
AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>),
#[error("Minimum escrow amount is {MIN_ESCROW}")]
MinimalEscrow,
#[error("Insufficient funds, deposit more tokens")]
InsufficientFunds,
#[error("Contract not found")]
ContractNotFound,
#[error("Access denied")]
AccessDenied,
#[error("Failed to delete contract {0}")]
FailedToDeleteContract(String),
#[error("Failed to kick contract {0}")]
FailedKickContract(String),
#[error("Already banned {0}")]
AlreadyBanned(String),
#[error("Failed to slash operator {0}")]
FailedToSlashOperator(String),
#[error("Transation Too Big {0}")]
TooBigTransaction(String),
#[error("Unknown: {0}")]
Unknown(String),
}
pub mod prelude {
pub use super::app::*;
pub use super::general::*;
pub use super::vm::*;
pub use super::*;
}
pub async fn db_connection(
db_address: &str,
username: &str,
password: &str,
ns: &str,
db: &str,
) -> Result<Surreal<Client>, Error> {
let db_connection: Surreal<Client> = Surreal::init();
db_connection.connect::<Ws>(db_address).await?;
// Sign in to the server
db_connection.signin(Root { username, password }).await?;
db_connection.use_ns(ns).use_db(db).await?;
Ok(db_connection)
}
pub async fn migration0(
db: &Surreal<Client>,
old_data: &old_brain::BrainData,
) -> Result<(), Error> {
let accounts: Vec<Account> = old_data.into();
let vm_nodes: Vec<VmNode> = old_data.into();
let app_nodes: Vec<AppNode> = old_data.into();
let active_vm: Vec<ActiveVm> = old_data.into();
let active_app: Vec<ActiveApp> = old_data.into();
for schema_data in DB_SCHEMA_FILES.map(|path| (std::fs::read_to_string(path), path)) {
let schema_file = schema_data.1;
println!("Loading schema from {schema_file}");
let schema = schema_data.0?;
db.query(schema).await?;
}
println!("Inserting accounts...");
let _: Vec<Account> = db.insert(()).content(accounts).await?;
println!("Inserting vm nodes...");
let _: Vec<VmNode> = db.insert(()).content(vm_nodes).await?;
println!("Inserting app nodes...");
let _: Vec<AppNode> = db.insert(()).content(app_nodes).await?;
println!("Inserting active vm contracts...");
let _: Vec<ActiveVm> = db.insert(()).relation(active_vm).await?;
println!("Inserting app contracts...");
let _: Vec<ActiveApp> = db.insert(()).relation(active_app).await?;
Ok(())
}
pub async fn upsert_record<SomeRecord: Serialize + 'static>(
db: &Surreal<Client>,
table: &str,
id: &str,
my_record: SomeRecord,
) -> Result<(), Error> {
#[derive(Deserialize)]
struct Wrapper {}
let _: Option<Wrapper> = db.upsert((table, id)).content(my_record).await?;
Ok(())
}
pub async fn live_vmnode_msgs<
T: std::fmt::Debug + Into<vm::VmDaemonMsg> + std::marker::Unpin + for<'de> Deserialize<'de>,
>(
db: &Surreal<Client>,
node: &str,
tx: Sender<vm::VmDaemonMsg>,
) -> Result<(), Error> {
let table_name = match std::any::type_name::<T>() {
t if t == std::any::type_name::<crate::db::vm::NewVmReq>() => NEW_VM_REQ.to_string(),
t if t == std::any::type_name::<crate::db::vm::UpdateVmReq>() => UPDATE_VM_REQ.to_string(),
t if t == std::any::type_name::<crate::db::vm::DeletedVm>() => DELETED_VM.to_string(),
t => {
log::error!("live_vmnode_msgs type {t} not supported",);
return Err(Error::UnknownTable(t.to_string()));
}
};
let mut resp =
db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?;
let mut live_stream = resp.stream::<Notification<T>>(0)?;
while let Some(result) = live_stream.next().await {
match result {
Ok(notification) => {
log::debug!("Got notification for node {node}: {notification:?}");
if notification.action == surrealdb::Action::Create {
tx.send(notification.data.into()).await?
}
}
Err(e) => {
log::error!("live_vmnode_msgs for {table_name} DB stream failed for {node}: {e}");
return Err(Error::from(e));
}
}
}
Ok(())
}
pub async fn live_appnode_msgs<
T: std::fmt::Debug + Into<app::AppDaemonMsg> + std::marker::Unpin + for<'de> Deserialize<'de>,
>(
db: &Surreal<Client>,
node_pubkey: &str,
tx: Sender<app::AppDaemonMsg>,
) -> Result<(), Error> {
let table_name = match std::any::type_name::<T>() {
t if t == std::any::type_name::<crate::db::app::NewAppReq>() => NEW_APP_REQ.to_string(),
t if t == std::any::type_name::<crate::db::app::DeletedApp>() => DELETED_APP.to_string(),
t => {
log::error!("live_appnode_msgs type {t} not supported",);
return Err(Error::UnknownTable(t.to_string()));
}
};
let mut query_resp = db
.query(format!("live select * from {table_name} where out = {APP_NODE}:{node_pubkey};"))
.await?;
let mut live_stream = query_resp.stream::<Notification<T>>(0)?;
while let Some(result) = live_stream.next().await {
match result {
Ok(notification) => {
log::debug!("Got notification for node {node_pubkey}: {notification:?}");
if notification.action == surrealdb::Action::Create {
tx.send(notification.data.into()).await?
}
}
Err(e) => {
log::error!(
"live_appnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}"
);
return Err(Error::from(e));
}
}
}
Ok(())
}
#[derive(Deserialize, Debug, Clone)]
pub struct ErrorFromTable {
pub error: String,
}