use std::time::Duration; use super::Error; use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ}; use crate::db; use crate::db::general::Report; use crate::old_brain; use detee_shared::app_proto; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; use surrealdb::sql::Datetime; use surrealdb::{Notification, RecordId, Surreal}; use tokio_stream::StreamExt; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct AppNode { pub id: RecordId, pub operator: RecordId, pub country: String, pub region: String, pub city: String, pub ip: String, pub avail_mem_mb: u32, pub avail_vcpus: u32, pub avail_storage_gbs: u32, pub avail_ports: u32, pub max_ports_per_app: u32, pub price: u64, pub offline_minutes: u64, } impl AppNode { pub async fn register(self, db: &Surreal) -> Result { db::Account::get_or_create(db, &self.operator.key().to_string()).await?; let app_node_id = self.id.clone(); let app_node: Option = db.upsert(app_node_id.clone()).content(self).await?; app_node.ok_or(Error::FailedToCreateDBEntry(format!("{APP_NODE}:{app_node_id}"))) } } pub enum AppDaemonMsg { Create(NewAppReq), Delete(DeletedApp), } impl From for AppDaemonMsg { fn from(value: NewAppReq) -> Self { Self::Create(value) } } impl From for AppDaemonMsg { fn from(value: DeletedApp) -> Self { Self::Delete(value) } } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct NewAppReq { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub app_node: RecordId, pub app_name: String, pub package_url: String, pub mr_enclave: String, pub hratls_pubkey: String, pub ports: Vec, pub memory_mb: u32, pub vcpu: u32, pub disk_mb: u32, pub locked_nano: u64, pub price_per_unit: u64, pub error: String, pub created_at: Datetime, } impl NewAppReq { pub async fn get(db: &Surreal, id: &str) -> Result, Error> { let new_app_req: Option = db.select((NEW_APP_REQ, id)).await?; Ok(new_app_req) } pub async fn submit_error( db: &Surreal, id: &str, error: String, ) -> Result, Error> { #[derive(Serialize)] struct NewAppError { error: String, } let record: Option = db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?; Ok(record) } pub async fn submit(self, db: &Surreal) -> Result, Error> { let new_app_req: Vec = db.insert(NEW_APP_REQ).relation(self).await?; Ok(new_app_req) } } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct AppNodeWithReports { pub id: RecordId, pub operator: RecordId, pub country: String, pub region: String, pub city: String, pub ip: String, pub avail_mem_mb: u32, pub avail_vcpus: u32, pub avail_storage_gbs: u32, pub avail_ports: u32, pub max_ports_per_app: u32, pub price: u64, pub offline_minutes: u64, pub reports: Vec, } impl AppNodeWithReports { pub async fn find_by_filters( db: &Surreal, filters: app_proto::AppNodeFilters, limit_one: bool, ) -> Result, Error> { let mut filter_query = format!( "select *, <-report.* from {APP_NODE} where avail_ports >= {} && max_ports_per_app >= {} && avail_vcpus >= {} && avail_mem_mb >= {} && avail_storage_gbs >= {} ", filters.free_ports, filters.free_ports, filters.vcpus, filters.memory_mb, filters.storage_mb ); if !filters.city.is_empty() { filter_query += &format!("&& city = '{}' ", filters.city); } if !filters.region.is_empty() { filter_query += &format!("&& region = '{}' ", filters.region); } if !filters.country.is_empty() { filter_query += &format!("&& country = '{}' ", filters.country); } if !filters.ip.is_empty() { filter_query += &format!("&& ip = '{}' ", filters.ip); } if limit_one { filter_query += "limit 1"; } filter_query += ";"; let mut query_resp = db.query(filter_query).await?; let app_nodes: Vec = query_resp.take(0)?; Ok(app_nodes) } } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ActiveApp { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub app_node: RecordId, pub app_name: String, pub mapped_ports: Vec<(u64, u64)>, pub host_ipv4: String, pub vcpus: u64, pub memory_mb: u64, pub disk_size_gb: u64, pub created_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: Datetime, pub mr_enclave: String, pub package_url: String, pub hratls_pubkey: String, } impl From for DeletedApp { fn from(value: ActiveApp) -> Self { Self { id: value.id, admin: value.admin, app_node: value.app_node, app_name: value.app_name, mapped_ports: value.mapped_ports, host_ipv4: value.host_ipv4, vcpus: value.vcpus, memory_mb: value.memory_mb, disk_size_gb: value.disk_size_gb, created_at: value.created_at, price_per_unit: value.price_per_unit, locked_nano: value.locked_nano, collected_at: value.collected_at, mr_enclave: value.mr_enclave, package_url: value.package_url, hratls_pubkey: value.hratls_pubkey, } } } impl ActiveApp { pub fn price_per_minute(&self) -> u64 { (self.total_units() * self.price_per_unit as f64) as u64 } fn total_units(&self) -> f64 { // TODO: Optimize this based on price of hardware. (self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64) } pub async fn activate(db: &Surreal, id: &str) -> Result<(), Error> { let new_app_req = match NewAppReq::get(db, id).await? { Some(r) => r, None => return Ok(()), }; let active_app = Self { id: RecordId::from((ACTIVE_APP, id)), admin: new_app_req.admin, app_node: new_app_req.app_node, app_name: new_app_req.app_name, mapped_ports: vec![], host_ipv4: String::new(), vcpus: new_app_req.vcpu as u64, memory_mb: new_app_req.memory_mb as u64, disk_size_gb: new_app_req.disk_mb as u64, created_at: new_app_req.created_at.clone(), price_per_unit: new_app_req.price_per_unit, locked_nano: new_app_req.locked_nano, collected_at: new_app_req.created_at, mr_enclave: new_app_req.mr_enclave.clone(), package_url: new_app_req.package_url.clone(), hratls_pubkey: new_app_req.hratls_pubkey.clone(), }; let _: Vec = db.insert(()).relation(active_app).await?; Ok(()) } pub async fn delete(db: &Surreal, id: &str) -> Result { let deleted_app: Option = db.delete((ACTIVE_APP, id)).await?; if let Some(deleted_app) = deleted_app { let deleted_app: DeletedApp = deleted_app.into(); let _: Vec = db.insert(DELETED_APP).relation(deleted_app).await?; Ok(true) } else { Ok(false) } } pub async fn listen(db: &Surreal, app_id: &str) -> Result { let mut query_response = db .query(format!( "live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};" )) .query(format!("live select * from {ACTIVE_APP} where id = {ACTIVE_APP}:{app_id};")) .await?; let mut error_stream = query_response.stream::>(0)?; let mut active_app_stream = query_response.stream::>(1)?; tokio::time::timeout(Duration::from_secs(30), async { loop { tokio::select! { Some(err_notif) = error_stream.next() =>{ match err_notif{ Ok(err_notif) =>{ if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){ return Err(Error::NewAppDaemonResp(err_notif.data.error)) } } Err(e) => return Err(e.into()) } } Some(active_app_notif) = active_app_stream.next() => { match active_app_notif { Ok(active_app_notif) =>{ if active_app_notif.action == surrealdb::Action::Create { let _: Option = db.delete((NEW_APP_REQ, app_id)).await?; return Ok(active_app_notif.data); } } Err(e) => return Err(e.into()) } } } } }) .await? } } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ActiveAppWithNode { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub app_node: AppNode, pub app_name: String, pub mapped_ports: Vec<(u64, u64)>, pub host_ipv4: String, pub vcpus: u64, pub memory_mb: u64, pub disk_size_gb: u64, pub created_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: Datetime, pub mr_enclave: String, pub package_url: String, pub hratls_pubkey: String, } impl From for ActiveApp { fn from(val: ActiveAppWithNode) -> Self { Self { id: val.id, admin: val.admin, app_node: val.app_node.id, app_name: val.app_name, mapped_ports: val.mapped_ports, host_ipv4: val.host_ipv4, vcpus: val.vcpus, memory_mb: val.memory_mb, disk_size_gb: val.disk_size_gb, created_at: val.created_at, price_per_unit: val.price_per_unit, locked_nano: val.locked_nano, collected_at: val.collected_at, mr_enclave: val.mr_enclave, package_url: val.package_url, hratls_pubkey: val.hratls_pubkey, } } } impl ActiveAppWithNode { pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { let contract: Option = db.query(format!("select * from {ACTIVE_APP}:{uuid} fetch out;")).await?.take(0)?; Ok(contract) } pub async fn list_by_node(db: &Surreal, node_pubkey: &str) -> Result, Error> { let mut query_result = db .query(format!( "select * from {ACTIVE_APP} where out = {APP_NODE}:{node_pubkey} fetch out;" )) .await?; let contract: Vec = query_result.take(0)?; Ok(contract) } pub async fn list_by_admin(db: &Surreal, admin: &str) -> Result, Error> { let mut query_result = db .query(format!("select * from {ACTIVE_APP} where in = {ACCOUNT}:{admin} fetch out;")) .await?; let app_contracts: Vec = query_result.take(0)?; Ok(app_contracts) } pub async fn list_by_operator( db: &Surreal, operator: &str, ) -> Result, Error> { let mut query_result = db .query(format!( "select (select * from ->operator->app_node<-{ACTIVE_APP} fetch out) as app_contracts from {ACCOUNT}:{operator}" )) .await?; #[derive(Deserialize)] struct Wrapper { app_contracts: Vec, } let c: Option = query_result.take(0)?; match c { Some(contracts_wrapper) => Ok(contracts_wrapper.app_contracts), None => Ok(vec![]), } } } #[derive(Debug, Serialize)] pub struct AppNodeResources { pub avail_no_of_port: u32, pub avail_vcpus: u32, pub avail_memory_mb: u32, pub avail_storage_mb: u32, pub max_ports_per_app: u32, } impl AppNodeResources { pub async fn merge( self, db: &Surreal, node_pubkey: &str, ) -> Result, Error> { let app_node: Option = db.update((APP_NODE, node_pubkey)).merge(self).await?; Ok(app_node) } } impl From<&old_brain::BrainData> for Vec { fn from(old_data: &old_brain::BrainData) -> Self { let mut nodes = Vec::new(); for old_node in old_data.app_nodes.iter() { nodes.push(AppNode { id: RecordId::from(("app_node", old_node.node_pubkey.clone())), operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), country: old_node.country.clone(), region: old_node.region.clone(), city: old_node.city.clone(), ip: old_node.ip.clone(), avail_mem_mb: old_node.avail_mem_mb, avail_vcpus: old_node.avail_vcpus, avail_storage_gbs: old_node.avail_storage_mb, avail_ports: old_node.avail_no_of_port, max_ports_per_app: old_node.max_ports_per_app, price: old_node.price, offline_minutes: old_node.offline_minutes, }); } nodes } } #[derive(Debug, Serialize, Deserialize)] pub struct DeletedApp { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub app_node: RecordId, pub app_name: String, pub mapped_ports: Vec<(u64, u64)>, pub host_ipv4: String, pub vcpus: u64, pub memory_mb: u64, pub disk_size_gb: u64, pub created_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: Datetime, pub mr_enclave: String, pub package_url: String, pub hratls_pubkey: String, }