pub mod app; pub mod general; pub mod vm; use crate::constants::{DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ}; use crate::old_brain; use prelude::*; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::{Client, Ws}; use surrealdb::opt::auth::Root; use surrealdb::{Notification, Surreal}; use tokio::sync::mpsc::Sender; use tokio_stream::StreamExt; #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Internal DB error: {0}")] DataBase(#[from] surrealdb::Error), #[error("Daemon channel got closed: {0}")] VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), #[error(transparent)] StdIo(#[from] std::io::Error), #[error(transparent)] TimeOut(#[from] tokio::time::error::Elapsed), } pub mod prelude { pub use super::app::*; pub use super::general::*; pub use super::vm::*; pub use super::*; } pub async fn db_connection( db_address: &str, username: &str, password: &str, ns: &str, db: &str, ) -> Result, Error> { let db_connection: Surreal = Surreal::init(); db_connection.connect::(db_address).await?; // Sign in to the server db_connection.signin(Root { username, password }).await?; db_connection.use_ns(ns).use_db(db).await?; Ok(db_connection) } pub async fn migration0( db: &Surreal, old_data: &old_brain::BrainData, ) -> Result<(), Error> { let accounts: Vec = old_data.into(); let vm_nodes: Vec = old_data.into(); let app_nodes: Vec = old_data.into(); let vm_contracts: Vec = old_data.into(); let schema = std::fs::read_to_string(crate::constants::DB_SCHEMA_FILE)?; db.query(schema).await?; println!("Inserting accounts..."); let _: Vec = db.insert(()).content(accounts).await?; println!("Inserting vm nodes..."); let _: Vec = db.insert(()).content(vm_nodes).await?; println!("Inserting app nodes..."); let _: Vec = db.insert(()).content(app_nodes).await?; println!("Inserting vm contracts..."); let _: Vec = db.insert("vm_contract").relation(vm_contracts).await?; Ok(()) } pub async fn upsert_record( db: &Surreal, table: &str, id: &str, my_record: SomeRecord, ) -> Result<(), Error> { #[derive(Deserialize)] struct Wrapper {} let _: Option = db.create((table, id)).content(my_record).await?; Ok(()) } pub async fn live_vmnode_msgs< T: Into + std::marker::Unpin + for<'de> Deserialize<'de>, >( db: &Surreal, node: &str, tx: Sender, ) -> Result<(), Error> { let table_name = match std::any::type_name::() { t if t == std::any::type_name::() => NEW_VM_REQ.to_string(), t if t == std::any::type_name::() => UPDATE_VM_REQ.to_string(), t if t == std::any::type_name::() => DELETED_VM.to_string(), wat => { log::error!("listen_for_node: T has type {wat}"); String::from("wat") } }; let mut resp = db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; let mut live_stream = resp.stream::>(0)?; while let Some(result) = live_stream.next().await { match result { Ok(notification) => { if notification.action == surrealdb::Action::Create { tx.send(notification.data.into()).await? } } Err(e) => { log::error!("listen_for_{table_name} DB stream failed for {node}: {e}"); return Err(Error::from(e)); } } } Ok(()) }