diff --git a/interim_tables.surql b/interim_tables.surql index 8c4d0ba..a0cf525 100644 --- a/interim_tables.surql +++ b/interim_tables.surql @@ -91,6 +91,20 @@ DEFINE FIELD max_ports_per_app ON TABLE app_node TYPE int; DEFINE FIELD price ON TABLE app_node TYPE int; DEFINE FIELD offline_minutes ON TABLE app_node TYPE int; +DEFINE TABLE new_app_req Type RELATION FROM account to app_node SCHEMAFULL; +DEFINE FIELD app_name ON TABLE new_app_req TYPE string; +DEFINE FIELD package_url ON TABLE new_app_req TYPE string; +DEFINE FIELD mr_enclave ON TABLE new_app_req TYPE string; +DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string; +DEFINE FIELD ports ON TABLE new_app_req TYPE array; +DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int; +DEFINE FIELD vcpu ON TABLE new_app_req TYPE int; +DEFINE FIELD disk_mb ON TABLE new_app_req TYPE int; +DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int; +DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int; +DEFINE FIELD error ON TABLE new_app_req TYPE string; +DEFINE FIELD created_at ON TABLE new_app_req TYPE datetime; + DEFINE TABLE active_app TYPE RELATION FROM account TO app_node SCHEMAFULL; DEFINE FIELD app_name ON TABLE active_app TYPE string; DEFINE FIELD mapped_ports ON TABLE active_app TYPE array<[int, int]>; diff --git a/src/constants.rs b/src/constants.rs index d398407..751f441 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -33,6 +33,8 @@ pub const VM_CONTRACT: &str = "vm_contract"; pub const ACTIVE_APP: &str = "active_app"; pub const APP_NODE: &str = "app_node"; +pub const NEW_APP_REQ: &str = "new_app_req"; +pub const DELETED_APP: &str = "deleted_app"; pub const ID_ALPHABET: [char; 62] = [ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', diff --git a/src/db/app.rs b/src/db/app.rs index 9ef5009..44f49f4 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -34,6 +34,44 @@ impl AppNode { } } +pub enum AppDaemonMsg { + Create(NewAppReq), + Delete(DeletedApp), +} + +impl From for AppDaemonMsg { + fn from(value: NewAppReq) -> Self { + Self::Create(value) + } +} + +impl From for AppDaemonMsg { + fn from(value: DeletedApp) -> Self { + Self::Delete(value) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct NewAppReq { + pub id: RecordId, + #[serde(rename = "in")] + pub admin: RecordId, + #[serde(rename = "out")] + pub app_node: RecordId, + pub app_name: String, + pub package_url: String, + pub mr_enclave: String, + pub hratls_pubkey: String, + pub ports: Vec, + pub memory_mb: u32, + pub vcpu: u32, + pub disk_mb: u32, + pub locked_nano: u64, + pub price_per_unit: u64, + pub error: String, + pub created_at: Datetime, +} + #[derive(Debug, Serialize, Deserialize)] pub struct AppNodeWithReports { pub id: RecordId, @@ -138,3 +176,25 @@ impl From<&old_brain::BrainData> for Vec { nodes } } + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeletedApp { + pub id: RecordId, + #[serde(rename = "in")] + pub admin: RecordId, + #[serde(rename = "out")] + pub app_node: RecordId, + pub app_name: String, + pub mapped_ports: Vec<(u64, u64)>, + pub host_ipv4: String, + pub vcpus: u64, + pub memory_mb: u64, + pub disk_size_gb: u64, + pub created_at: Datetime, + pub price_per_unit: u64, + pub locked_nano: u64, + pub collected_at: Datetime, + pub mr_enclave: String, + pub package_url: String, + pub hratls_pubkey: String, +} diff --git a/src/db/mod.rs b/src/db/mod.rs index 6b3f6c4..725d27a 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -2,7 +2,7 @@ pub mod app; pub mod general; pub mod vm; -use crate::constants::{DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ}; +use crate::constants::{APP_NODE, DELETED_APP, DELETED_VM, NEW_APP_REQ, NEW_VM_REQ, UPDATE_VM_REQ}; use crate::old_brain; use prelude::*; use serde::{Deserialize, Serialize}; @@ -24,6 +24,10 @@ pub enum Error { TimeOut(#[from] tokio::time::error::Elapsed), #[error("Failed to create account")] FailedToCreateDBEntry, + #[error("Unknown Table: {0}")] + UnknownTable(String), + #[error("Daemon channel got closed: {0}")] + AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), } pub mod prelude { @@ -95,9 +99,9 @@ pub async fn live_vmnode_msgs< t if t == std::any::type_name::() => NEW_VM_REQ.to_string(), t if t == std::any::type_name::() => UPDATE_VM_REQ.to_string(), t if t == std::any::type_name::() => DELETED_VM.to_string(), - wat => { - log::error!("listen_for_node: T has type {wat}"); - String::from("wat") + t => { + log::error!("live_vmnode_msgs type {t} not supported",); + return Err(Error::UnknownTable(t.to_string())); } }; let mut resp = @@ -119,3 +123,43 @@ pub async fn live_vmnode_msgs< } Ok(()) } + +pub async fn live_appnode_msgs< + T: std::fmt::Debug + Into + std::marker::Unpin + for<'de> Deserialize<'de>, +>( + db: &Surreal, + node_pubkey: &str, + tx: Sender, +) -> Result<(), Error> { + let table_name = match std::any::type_name::() { + t if t == std::any::type_name::() => NEW_APP_REQ.to_string(), + t if t == std::any::type_name::() => DELETED_APP.to_string(), + t => { + log::error!("live_appnode_msgs type {t} not supported",); + return Err(Error::UnknownTable(t.to_string())); + } + }; + let mut query_resp = db + .query(format!("live select * from {table_name} where out = {APP_NODE}:{node_pubkey};")) + .await?; + + let mut live_stream = query_resp.stream::>(0)?; + while let Some(result) = live_stream.next().await { + match result { + Ok(notification) => { + log::debug!("Got notification for node {node_pubkey}: {notification:?}"); + if notification.action == surrealdb::Action::Create { + tx.send(notification.data.into()).await? + } + } + Err(e) => { + log::error!( + "live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}" + ); + return Err(Error::from(e)); + } + } + } + + Ok(()) +} diff --git a/src/grpc/app.rs b/src/grpc/app.rs index d8e2fac..3fbdafa 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -89,7 +89,26 @@ impl BrainAppDaemon for AppDaemonServer { info!("App Daemon {} connected to receive brain messages", pubkey); - todo!() + let (tx, rx) = mpsc::channel(6); + { + let db = self.db.clone(); + let pubkey = pubkey.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + let _ = db::live_appnode_msgs::(&db, &pubkey, tx).await; + }); + } + { + let db = self.db.clone(); + let pubkey = pubkey.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + let _ = db::live_appnode_msgs::(&db, &pubkey, tx).await; + }); + } + + let resp_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into())); + Ok(Response::new(Box::pin(resp_stream))) } async fn daemon_messages( diff --git a/src/grpc/types.rs b/src/grpc/types.rs index ee85a69..58ee2b5 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -285,3 +285,47 @@ impl From for AppContract { } } } + +impl From for NewAppReq { + fn from(value: db::NewAppReq) -> Self { + let resource = AppResource { + vcpu: value.vcpu, + memory_mb: value.memory_mb, + disk_mb: value.disk_mb, + ports: value.ports, + }; + let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default()); + + Self { + package_url: value.package_url, + node_pubkey: value.app_node.key().to_string(), + resource: Some(resource), + uuid: value.id.key().to_string(), + admin_pubkey: value.admin.key().to_string(), + price_per_unit: value.price_per_unit, + locked_nano: value.locked_nano, + hratls_pubkey: value.hratls_pubkey, + public_package_mr_enclave: mr_enclave, + app_name: value.app_name, + } + } +} + +impl From for DelAppReq { + fn from(value: db::DeletedApp) -> Self { + Self { uuid: value.id.key().to_string(), admin_pubkey: value.admin.key().to_string() } + } +} + +impl From for BrainMessageApp { + fn from(value: db::AppDaemonMsg) -> Self { + match value { + db::AppDaemonMsg::Create(new_app_req) => { + BrainMessageApp { msg: Some(brain_message_app::Msg::NewAppReq(new_app_req.into())) } + } + db::AppDaemonMsg::Delete(del_app_req) => BrainMessageApp { + msg: Some(brain_message_app::Msg::DeleteAppReq(del_app_req.into())), + }, + } + } +}