reverted stream query checking err on db before listening error stream refactor app grpc code
203 lines
6.8 KiB
Rust
203 lines
6.8 KiB
Rust
pub mod app;
|
|
pub mod general;
|
|
pub mod vm;
|
|
|
|
use crate::constants::{
|
|
APP_NODE, DB_SCHEMA_FILES, DELETED_APP, DELETED_VM, MIN_ESCROW, NEW_APP_REQ, NEW_VM_REQ,
|
|
UPDATE_VM_REQ,
|
|
};
|
|
use crate::old_brain;
|
|
use prelude::*;
|
|
use serde::{Deserialize, Serialize};
|
|
use surrealdb::engine::remote::ws::{Client, Ws};
|
|
use surrealdb::opt::auth::Root;
|
|
use surrealdb::{Notification, Surreal};
|
|
use tokio::sync::mpsc::Sender;
|
|
use tokio_stream::StreamExt;
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub enum Error {
|
|
#[error("Internal DB error: {0}")]
|
|
DataBase(#[from] surrealdb::Error),
|
|
#[error("Daemon channel got closed: {0}")]
|
|
VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<VmDaemonMsg>),
|
|
#[error(transparent)]
|
|
StdIo(#[from] std::io::Error),
|
|
#[error(transparent)]
|
|
TimeOut(#[from] tokio::time::error::Elapsed),
|
|
#[error("Failed to create {0}")]
|
|
FailedToCreateDBEntry(String),
|
|
#[error("Unknown Table: {0}")]
|
|
UnknownTable(String),
|
|
#[error("Daemon channel got closed: {0}")]
|
|
AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>),
|
|
#[error("AppDaemon Error {0}")]
|
|
NewAppDaemonResp(String),
|
|
#[error("Minimum escrow amount is {MIN_ESCROW}")]
|
|
MinimalEscrow,
|
|
#[error("Insufficient funds, deposit more tokens")]
|
|
InsufficientFunds,
|
|
#[error("Contract not found")]
|
|
ContractNotFound,
|
|
#[error("Access denied")]
|
|
AccessDenied,
|
|
#[error("Failed to delete contract {0}")]
|
|
FailedToDeleteContract(String),
|
|
#[error("Failed to kick contract {0}")]
|
|
FailedKickContract(String),
|
|
#[error("Already banned {0}")]
|
|
AlreadyBanned(String),
|
|
#[error("Failed to slash operator {0}")]
|
|
FailedToSlashOperator(String),
|
|
#[error("Transation Too Big {0}")]
|
|
TooBigTransaction(String),
|
|
#[error("Unknown: {0}")]
|
|
Unknown(String),
|
|
}
|
|
|
|
pub mod prelude {
|
|
pub use super::app::*;
|
|
pub use super::general::*;
|
|
pub use super::vm::*;
|
|
pub use super::*;
|
|
}
|
|
|
|
pub async fn db_connection(
|
|
db_address: &str,
|
|
username: &str,
|
|
password: &str,
|
|
ns: &str,
|
|
db: &str,
|
|
) -> Result<Surreal<Client>, Error> {
|
|
let db_connection: Surreal<Client> = Surreal::init();
|
|
db_connection.connect::<Ws>(db_address).await?;
|
|
// Sign in to the server
|
|
db_connection.signin(Root { username, password }).await?;
|
|
db_connection.use_ns(ns).use_db(db).await?;
|
|
Ok(db_connection)
|
|
}
|
|
|
|
pub async fn migration0(
|
|
db: &Surreal<Client>,
|
|
old_data: &old_brain::BrainData,
|
|
) -> Result<(), Error> {
|
|
let accounts: Vec<Account> = old_data.into();
|
|
let vm_nodes: Vec<VmNode> = old_data.into();
|
|
let app_nodes: Vec<AppNode> = old_data.into();
|
|
let active_vm: Vec<ActiveVm> = old_data.into();
|
|
let active_app: Vec<ActiveApp> = old_data.into();
|
|
|
|
for schema_data in DB_SCHEMA_FILES.map(|path| (std::fs::read_to_string(path), path)) {
|
|
let schema_file = schema_data.1;
|
|
println!("Loading schema from {schema_file}");
|
|
let schema = schema_data.0?;
|
|
db.query(schema).await?;
|
|
}
|
|
|
|
println!("Inserting accounts...");
|
|
let _: Vec<Account> = db.insert(()).content(accounts).await?;
|
|
println!("Inserting vm nodes...");
|
|
let _: Vec<VmNode> = db.insert(()).content(vm_nodes).await?;
|
|
println!("Inserting app nodes...");
|
|
let _: Vec<AppNode> = db.insert(()).content(app_nodes).await?;
|
|
println!("Inserting active vm contracts...");
|
|
let _: Vec<ActiveVm> = db.insert(()).relation(active_vm).await?;
|
|
println!("Inserting app contracts...");
|
|
let _: Vec<ActiveApp> = db.insert(()).relation(active_app).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn upsert_record<SomeRecord: Serialize + 'static>(
|
|
db: &Surreal<Client>,
|
|
table: &str,
|
|
id: &str,
|
|
my_record: SomeRecord,
|
|
) -> Result<(), Error> {
|
|
#[derive(Deserialize)]
|
|
struct Wrapper {}
|
|
let _: Option<Wrapper> = db.upsert((table, id)).content(my_record).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn live_vmnode_msgs<
|
|
T: std::fmt::Debug + Into<vm::VmDaemonMsg> + std::marker::Unpin + for<'de> Deserialize<'de>,
|
|
>(
|
|
db: &Surreal<Client>,
|
|
node: &str,
|
|
tx: Sender<vm::VmDaemonMsg>,
|
|
) -> Result<(), Error> {
|
|
let table_name = match std::any::type_name::<T>() {
|
|
t if t == std::any::type_name::<crate::db::vm::NewVmReq>() => NEW_VM_REQ.to_string(),
|
|
t if t == std::any::type_name::<crate::db::vm::UpdateVmReq>() => UPDATE_VM_REQ.to_string(),
|
|
t if t == std::any::type_name::<crate::db::vm::DeletedVm>() => DELETED_VM.to_string(),
|
|
t => {
|
|
log::error!("live_vmnode_msgs type {t} not supported",);
|
|
return Err(Error::UnknownTable(t.to_string()));
|
|
}
|
|
};
|
|
let mut resp =
|
|
db.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?;
|
|
let mut live_stream = resp.stream::<Notification<T>>(0)?;
|
|
while let Some(result) = live_stream.next().await {
|
|
match result {
|
|
Ok(notification) => {
|
|
log::debug!("Got notification for node {node}: {notification:?}");
|
|
if notification.action == surrealdb::Action::Create {
|
|
tx.send(notification.data.into()).await?
|
|
}
|
|
}
|
|
Err(e) => {
|
|
log::error!("live_vmnode_msgs for {table_name} DB stream failed for {node}: {e}");
|
|
return Err(Error::from(e));
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn live_appnode_msgs<
|
|
T: std::fmt::Debug + Into<app::AppDaemonMsg> + std::marker::Unpin + for<'de> Deserialize<'de>,
|
|
>(
|
|
db: &Surreal<Client>,
|
|
node_pubkey: &str,
|
|
tx: Sender<app::AppDaemonMsg>,
|
|
) -> Result<(), Error> {
|
|
let table_name = match std::any::type_name::<T>() {
|
|
t if t == std::any::type_name::<crate::db::app::NewAppReq>() => NEW_APP_REQ.to_string(),
|
|
t if t == std::any::type_name::<crate::db::app::DeletedApp>() => DELETED_APP.to_string(),
|
|
t => {
|
|
log::error!("live_appnode_msgs type {t} not supported",);
|
|
return Err(Error::UnknownTable(t.to_string()));
|
|
}
|
|
};
|
|
let mut query_resp = db
|
|
.query(format!("live select * from {table_name} where out = {APP_NODE}:{node_pubkey};"))
|
|
.await?;
|
|
|
|
let mut live_stream = query_resp.stream::<Notification<T>>(0)?;
|
|
while let Some(result) = live_stream.next().await {
|
|
match result {
|
|
Ok(notification) => {
|
|
log::debug!("Got notification for node {node_pubkey}: {notification:?}");
|
|
if notification.action == surrealdb::Action::Create {
|
|
tx.send(notification.data.into()).await?
|
|
}
|
|
}
|
|
Err(e) => {
|
|
log::error!(
|
|
"live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}"
|
|
);
|
|
return Err(Error::from(e));
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Deserialize, Debug, Clone)]
|
|
pub struct ErrorFromTable {
|
|
pub error: String,
|
|
}
|