pub mod app; pub mod general; pub mod vm; use crate::constants::{ APP_NODE, DB_SCHEMA_FILES, DELETED_APP, DELETED_VM, MIN_ESCROW, NEW_APP_REQ, NEW_VM_REQ, UPDATE_VM_REQ, }; use crate::old_brain; use prelude::*; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::{Client, Ws}; use surrealdb::opt::auth::Root; use surrealdb::{Notification, Surreal}; use tokio::sync::mpsc::Sender; use tokio_stream::StreamExt; #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Internal DB error: {0}")] DataBase(#[from] surrealdb::Error), #[error("Daemon channel got closed: {0}")] VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), #[error(transparent)] StdIo(#[from] std::io::Error), #[error(transparent)] TimeOut(#[from] tokio::time::error::Elapsed), #[error("Failed to create {0}")] FailedToCreateDBEntry(String), #[error("Unknown Table: {0}")] UnknownTable(String), #[error("Daemon channel got closed: {0}")] AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), #[error("AppDaemon Error {0}")] NewAppDaemonResp(String), #[error("Minimum escrow amount is {MIN_ESCROW}")] MinimalEscrow, #[error("Insufficient funds, deposit more tokens")] InsufficientFunds, #[error("Contract not found")] ContractNotFound, #[error("Access denied")] AccessDenied, #[error("Failed to delete contract {0}")] FailedToDeleteContract(String), #[error("Failed to kick contract {0}")] FailedKickContract(String), #[error("Already banned {0}")] AlreadyBanned(String), #[error("Failed to slash operator {0}")] FailedToSlashOperator(String), } pub mod prelude { pub use super::app::*; pub use super::general::*; pub use super::vm::*; pub use super::*; } pub async fn db_connection( db_address: &str, username: &str, password: &str, ns: &str, db: &str, ) -> Result, Error> { let db_connection: Surreal = Surreal::init(); db_connection.connect::(db_address).await?; // Sign in to the server db_connection.signin(Root { username, password }).await?; db_connection.use_ns(ns).use_db(db).await?; Ok(db_connection) } pub async fn migration0( db: &Surreal, old_data: &old_brain::BrainData, ) -> Result<(), Error> { let accounts: Vec = old_data.into(); let vm_nodes: Vec = old_data.into(); let app_nodes: Vec = old_data.into(); let active_vm: Vec = old_data.into(); let active_app: Vec = old_data.into(); for schema_data in DB_SCHEMA_FILES.map(|path| (std::fs::read_to_string(path), path)) { let schema_file = schema_data.1; println!("Loading schema from {schema_file}"); let schema = schema_data.0?; db.query(schema).await?; } println!("Inserting accounts..."); let _: Vec = db.insert(()).content(accounts).await?; println!("Inserting vm nodes..."); let _: Vec = db.insert(()).content(vm_nodes).await?; println!("Inserting app nodes..."); let _: Vec = db.insert(()).content(app_nodes).await?; println!("Inserting active vm contracts..."); let _: Vec = db.insert(()).relation(active_vm).await?; println!("Inserting app contracts..."); let _: Vec = db.insert(()).relation(active_app).await?; Ok(()) } pub async fn upsert_record( db: &Surreal, table: &str, id: &str, my_record: SomeRecord, ) -> Result<(), Error> { #[derive(Deserialize)] struct Wrapper {} let _: Option = db.upsert((table, id)).content(my_record).await?; Ok(()) } pub async fn live_vmnode_msgs< T: std::fmt::Debug + Into + std::marker::Unpin + for<'de> Deserialize<'de>, >( db: &Surreal, node: &str, tx: Sender, ) -> Result<(), Error> { let table_name = match std::any::type_name::() { t if t == std::any::type_name::() => NEW_VM_REQ.to_string(), t if t == std::any::type_name::() => UPDATE_VM_REQ.to_string(), t if t == std::any::type_name::() => DELETED_VM.to_string(), t => { log::error!("live_vmnode_msgs type {t} not supported",); return Err(Error::UnknownTable(t.to_string())); } }; let mut resp = db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; let mut live_stream = resp.stream::>(0)?; while let Some(result) = live_stream.next().await { match result { Ok(notification) => { log::debug!("Got notification for node {node}: {notification:?}"); if notification.action == surrealdb::Action::Create { tx.send(notification.data.into()).await? } } Err(e) => { log::error!("live_vmnode_msgs for {table_name} DB stream failed for {node}: {e}"); return Err(Error::from(e)); } } } Ok(()) } pub async fn live_appnode_msgs< T: std::fmt::Debug + Into + std::marker::Unpin + for<'de> Deserialize<'de>, >( db: &Surreal, node_pubkey: &str, tx: Sender, ) -> Result<(), Error> { let table_name = match std::any::type_name::() { t if t == std::any::type_name::() => NEW_APP_REQ.to_string(), t if t == std::any::type_name::() => DELETED_APP.to_string(), t => { log::error!("live_appnode_msgs type {t} not supported",); return Err(Error::UnknownTable(t.to_string())); } }; let mut query_resp = db .query(format!("live select * from {table_name} where out = {APP_NODE}:{node_pubkey};")) .await?; let mut live_stream = query_resp.stream::>(0)?; while let Some(result) = live_stream.next().await { match result { Ok(notification) => { log::debug!("Got notification for node {node_pubkey}: {notification:?}"); if notification.action == surrealdb::Action::Create { tx.send(notification.data.into()).await? } } Err(e) => { log::error!( "live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" ); return Err(Error::from(e)); } } } Ok(()) } #[derive(Deserialize)] pub struct ErrorFromTable { pub error: String, }