// SPDX-License-Identifier: Apache-2.0 use std::time::Duration; use super::Error; use crate::constants::{ ACCOUNT, ACTIVE_APP, APP_DAEMON_TIMEOUT, APP_NODE, DELETED_APP, NEW_APP_REQ, }; use crate::db::general::Report; use crate::db; use detee_shared::app_proto::{self, NewAppRes}; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; use surrealdb::sql::Datetime; use surrealdb::{Notification, RecordId, Surreal}; use tokio_stream::StreamExt; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct AppNode { pub id: RecordId, pub operator: RecordId, pub pub_sub_node: String, pub country: String, pub region: String, pub city: String, pub ip: String, pub avail_mem_mib: u32, pub avail_vcpus: u32, pub avail_storage_mib: u32, pub avail_ports: u32, pub max_ports_per_app: u32, pub price: u64, pub connected_at: Datetime, pub disconnected_at: Datetime, } impl AppNode { pub async fn register(self, db: &Surreal) -> Result { db::Account::get_or_create(db, &self.operator.key().to_string()).await?; let app_node_id = self.id.clone(); let app_node: Option = db.upsert(app_node_id.clone()).content(self).await?; app_node.ok_or(Error::FailedToCreateDBEntry(format!("{APP_NODE}:{app_node_id}"))) } pub async fn set_online(db: &Surreal, app_node_id: &str) -> Result<(), Error> { db.query(format!("UPDATE {APP_NODE}:{app_node_id} SET connected_at = time::now();")) .await?; Ok(()) } pub async fn set_offline(db: &Surreal, app_node_id: &str) -> Result<(), Error> { db.query(format!("UPDATE {APP_NODE}:{app_node_id} SET disconnected_at = time::now();")) .await?; Ok(()) } } pub enum AppDaemonMsg { Create(NewAppReq), Delete(DeletedApp), } impl From for AppDaemonMsg { fn from(value: NewAppReq) -> Self { Self::Create(value) } } impl From for AppDaemonMsg { fn from(value: DeletedApp) -> Self { Self::Delete(value) } } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct NewAppReq { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub app_node: RecordId, pub app_name: String, pub package_url: String, pub mr_enclave: String, pub hratls_pubkey: String, pub ports: Vec, pub memory_mib: u32, pub vcpus: u32, pub disk_size_mib: u32, pub locked_nano: u64, pub price_per_unit: u64, pub error: String, pub created_at: Datetime, } impl NewAppReq { pub async fn get(db: &Surreal, id: &str) -> Result, Error> { let new_app_req: Option = db.select((NEW_APP_REQ, id)).await?; Ok(new_app_req) } pub async fn delete(db: &Surreal, id: &str) -> Result<(), Error> { let _: Option = db.delete((NEW_APP_REQ, id)).await?; Ok(()) } pub async fn submit_error(db: &Surreal, id: &str, error: String) -> Result<(), Error> { let tx_query = String::from( " BEGIN TRANSACTION; LET $new_app_req = $new_app_req_input; LET $error = $error_input; LET $record = (select * from $new_app_req)[0]; LET $admin = $record.in; if $record == None {{ THROW 'app req not exist ' + $new_app_req }}; UPDATE $new_app_req SET error = $error; UPDATE $admin SET tmp_locked -= $record.locked_nano; UPDATE $admin SET balance += $record.locked_nano; COMMIT TRANSACTION;", ); log::trace!("submit_new_app_err query: {tx_query}"); let mut query_resp = db .query(tx_query) .bind(("new_app_req_input", RecordId::from((NEW_APP_REQ, id)))) .bind(("error_input", error)) .await?; let query_err = query_resp.take_errors(); if !query_err.is_empty() { log::trace!("errors in submit_new_app_err: {query_err:?}"); let tx_fail_err_str = String::from("The query was not executed due to a failed transaction"); if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str { log::error!("app req not exist: {}", query_err[&4]); return Err(Error::ContractNotFound); } else { log::error!("Unknown error in submit_new_app_err: {query_err:?}"); return Err(Error::Unknown("submit_new_app_req".to_string())); } } Ok(()) } pub async fn submit(self, db: &Surreal) -> Result<(), Error> { let locked_nano = self.locked_nano; let tx_query = format!(" BEGIN TRANSACTION; LET $account = $account_input; LET $new_app_req = $new_app_req_input; LET $app_node = $app_node_input; LET $package_url = $package_url_input; LET $mr_enclave = $mr_enclave_input; LET $hratls_pubkey = $hratls_pubkey_input; LET $app_name = $app_name_input; UPDATE $account SET balance -= {locked_nano}; IF $account.balance < 0 {{ THROW 'Insufficient funds.' }}; UPDATE $account SET tmp_locked += {locked_nano}; RELATE $account ->$new_app_req ->$app_node CONTENT {{ created_at: time::now(), app_name: $app_name, package_url: $package_url, mr_enclave: $mr_enclave, hratls_pubkey: $hratls_pubkey, ports: {:?}, memory_mib: {}, vcpus: {}, disk_size_mib: {}, locked_nano: {locked_nano}, price_per_unit: {}, error: '', }}; COMMIT TRANSACTION;", self.ports, self.memory_mib, self.vcpus, self.disk_size_mib, self.price_per_unit); log::trace!("submit_new_app_req query: {tx_query}"); let mut query_resp = db .query(tx_query) .bind(("account_input", self.admin)) .bind(("new_app_req_input", self.id)) .bind(("app_node_input", self.app_node)) .bind(("package_url_input", self.package_url)) .bind(("mr_enclave_input", self.mr_enclave)) .bind(("hratls_pubkey_input", self.hratls_pubkey)) .bind(("app_name_input", self.app_name)) .await?; let query_err = query_resp.take_errors(); if !query_err.is_empty() { log::trace!("errors in submit_new_app_req: {query_err:?}"); let tx_fail_err_str = String::from("The query was not executed due to a failed transaction"); if query_err.contains_key(&8) && query_err[&8].to_string() != tx_fail_err_str { log::error!("Transaction error: {}", query_err[&8]); return Err(Error::InsufficientFunds); } else { log::error!("Unknown error in submit_new_app_req: {query_err:?}"); return Err(Error::Unknown("submit_new_app_req".to_string())); } } Ok(()) } } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct AppNodeWithReports { pub id: RecordId, pub operator: RecordId, pub country: String, pub region: String, pub city: String, pub ip: String, pub avail_mem_mib: u32, pub avail_vcpus: u32, pub avail_storage_mib: u32, pub avail_ports: u32, pub max_ports_per_app: u32, pub price: u64, pub reports: Vec, } impl AppNodeWithReports { pub async fn find_by_filters( db: &Surreal, filters: app_proto::AppNodeFilters, limit_one: bool, ) -> Result, Error> { let mut filter_query = format!( "select *, <-report.* as reports from {APP_NODE} where avail_ports >= {} && max_ports_per_app >= {} && avail_vcpus >= {} && avail_mem_mib >= {} && avail_storage_mib >= {} ", filters.free_ports, filters.free_ports, filters.vcpus, filters.memory_mib, filters.storage_mib ); // TODO: bind all strings if !filters.city.is_empty() { filter_query += &format!("&& city = '{}' ", filters.city); } if !filters.region.is_empty() { filter_query += &format!("&& region = '{}' ", filters.region); } if !filters.country.is_empty() { filter_query += &format!("&& country = '{}' ", filters.country); } if !filters.ip.is_empty() { filter_query += &format!("&& ip = '{}' ", filters.ip); } filter_query += " && connected_at > disconnected_at "; if limit_one { filter_query += "limit 1"; } filter_query += ";"; let mut query_resp = db.query(filter_query).await?; let app_nodes: Vec = query_resp.take(0)?; Ok(app_nodes) } } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ActiveApp { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub app_node: RecordId, pub app_name: String, pub mapped_ports: Vec<(u32, u32)>, pub host_ipv4: String, pub vcpus: u32, pub memory_mib: u32, pub disk_size_mib: u32, pub created_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: Datetime, pub mr_enclave: String, pub package_url: String, pub hratls_pubkey: String, } impl From for DeletedApp { fn from(value: ActiveApp) -> Self { Self { id: value.id, admin: value.admin, app_node: value.app_node, app_name: value.app_name, mapped_ports: value.mapped_ports, host_ipv4: value.host_ipv4, vcpus: value.vcpus, memory_mib: value.memory_mib, disk_size_mib: value.disk_size_mib, created_at: value.created_at, price_per_unit: value.price_per_unit, mr_enclave: value.mr_enclave, package_url: value.package_url, hratls_pubkey: value.hratls_pubkey, } } } impl ActiveApp { pub async fn get_by_app_id(db: &Surreal, app_id: &str) -> Result, Error> { let contract: Option = db .query("select * from $active_app_id;".to_string()) .bind(("active_app_id", RecordId::from((ACTIVE_APP, app_id)))) .await? .take(0)?; Ok(contract) } pub async fn activate( db: &Surreal, new_app_res: app_proto::NewAppRes, ) -> Result<(), Error> { let new_app_req = match NewAppReq::get(db, &new_app_res.app_id).await? { Some(r) => r, None => return Ok(()), }; let mapped_ports = new_app_res .mapped_ports .into_iter() .map(|data| (data.host_port, data.guest_port)) .collect::>(); let active_app = Self { id: RecordId::from((ACTIVE_APP, &new_app_res.app_id)), admin: new_app_req.admin, app_node: new_app_req.app_node, app_name: new_app_req.app_name, mapped_ports, host_ipv4: new_app_res.ip_address, vcpus: new_app_req.vcpus, memory_mib: new_app_req.memory_mib, disk_size_mib: new_app_req.disk_size_mib, created_at: new_app_req.created_at.clone(), price_per_unit: new_app_req.price_per_unit, locked_nano: new_app_req.locked_nano, collected_at: new_app_req.created_at, mr_enclave: new_app_req.mr_enclave.clone(), package_url: new_app_req.package_url.clone(), hratls_pubkey: new_app_req.hratls_pubkey.clone(), }; let admin_account = active_app.admin.key().to_string(); let locked_nano = active_app.locked_nano; let _: Vec = db.insert(()).relation(active_app).await?; NewAppReq::delete(db, &new_app_res.app_id).await?; db.query(format!("UPDATE {ACCOUNT}:{admin_account} SET tmp_locked -= {locked_nano};")) .await?; Ok(()) } pub async fn delete(db: &Surreal, admin: &str, id: &str) -> Result<(), Error> { let mut app_del_resp = db .query( " BEGIN TRANSACTION; LET $active_app = $app_id_input; LET $admin = $admin_input; IF $active_app.in != $admin { THROW 'Unauthorized' }; return fn::delete_app($active_app); COMMIT TRANSACTION; ", ) .bind(("app_id_input", RecordId::from((ACTIVE_APP, id)))) .bind(("admin_input", RecordId::from((ACCOUNT, admin)))) .await?; log::trace!("delete_app query response: {app_del_resp:?}"); let query_err = app_del_resp.take_errors(); if !query_err.is_empty() { log::trace!("errors in delete_app: {query_err:?}"); let tx_fail_err_str = String::from("The query was not executed due to a failed transaction"); if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str { log::error!("Unauthorized: {}", query_err[&2]); return Err(Error::AccessDenied); } } Ok(()) } } pub enum WrappedAppResp { NewAppRes(NewAppRes), Error(NewAppRes), } impl WrappedAppResp { pub async fn listen(db: &Surreal, app_id: &str) -> Result { let mut query_response = db .query(format!( "live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};" )) .query(format!("live select * from {ACTIVE_APP} where id = {ACTIVE_APP}:{app_id};")) .await?; let mut error_stream = query_response.stream::>(0)?; let mut active_app_stream = query_response.stream::>(1)?; let mut error = db .query(format!("select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};")) .await?; if let Some(error_on_newapp_req) = error.take::>(0)? { if !error_on_newapp_req.error.is_empty() { let app_daemon_err = NewAppRes { app_id: app_id.to_string(), error: error_on_newapp_req.error, ..Default::default() }; return Ok(Self::Error(app_daemon_err)); } } if let Some(active_app) = db.select::>((ACTIVE_APP, app_id)).await? { return Ok(Self::NewAppRes(active_app.into())); } log::trace!("listening for table: {NEW_APP_REQ}"); tokio::time::timeout(Duration::from_secs(APP_DAEMON_TIMEOUT), async { loop { tokio::select! { Some(err_notif) = error_stream.next() =>{ match err_notif{ Ok(err_notif) =>{ if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){ let app_daemon_err = NewAppRes { app_id: app_id.to_string(), error: err_notif.data.error, ..Default::default() }; return Ok(Self::Error(app_daemon_err)); } } Err(e) => return Err(e.into()) } } Some(active_app_notif) = active_app_stream.next() => { match active_app_notif { Ok(active_app_notif) =>{ if active_app_notif.action == surrealdb::Action::Create { return Ok(Self::NewAppRes(active_app_notif.data.into())); } } Err(e) => return Err(e.into()) } } } } }) .await? } } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ActiveAppWithNode { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub app_node: AppNode, pub app_name: String, pub mapped_ports: Vec<(u32, u32)>, pub host_ipv4: String, pub vcpus: u32, pub memory_mib: u32, pub disk_size_mib: u32, pub created_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: Datetime, pub mr_enclave: String, pub package_url: String, pub hratls_pubkey: String, } impl From for ActiveApp { fn from(val: ActiveAppWithNode) -> Self { Self { id: val.id, admin: val.admin, app_node: val.app_node.id, app_name: val.app_name, mapped_ports: val.mapped_ports, host_ipv4: val.host_ipv4, vcpus: val.vcpus, memory_mib: val.memory_mib, disk_size_mib: val.disk_size_mib, created_at: val.created_at, price_per_unit: val.price_per_unit, locked_nano: val.locked_nano, collected_at: val.collected_at, mr_enclave: val.mr_enclave, package_url: val.package_url, hratls_pubkey: val.hratls_pubkey, } } } impl ActiveAppWithNode { pub async fn get_by_app_id(db: &Surreal, app_id: &str) -> Result, Error> { let contract: Option = db .query(format!("select * from {ACTIVE_APP} where id = $app_id_input fetch out;")) .bind(("app_id_input", RecordId::from((ACTIVE_APP, app_id)))) .await? .take(0)?; Ok(contract) } pub async fn list_by_node(db: &Surreal, node_pubkey: &str) -> Result, Error> { let mut query_result = db .query(format!( "select * from {ACTIVE_APP} where out = {APP_NODE}:{node_pubkey} fetch out;" )) .await?; let contract: Vec = query_result.take(0)?; Ok(contract) } pub async fn list_by_admin(db: &Surreal, admin: &str) -> Result, Error> { let mut query_result = db .query(format!("select * from {ACTIVE_APP} where in = {ACCOUNT}:{admin} fetch out;")) .await?; let app_contracts: Vec = query_result.take(0)?; Ok(app_contracts) } pub async fn list_by_operator( db: &Surreal, operator: &str, ) -> Result, Error> { let mut query_result = db .query(format!( "select (select * from ->operator->app_node<-{ACTIVE_APP} fetch out) as app_contracts from {ACCOUNT}:{operator}" )) .await?; #[derive(Deserialize)] struct Wrapper { app_contracts: Vec, } let c: Option = query_result.take(0)?; match c { Some(contracts_wrapper) => Ok(contracts_wrapper.app_contracts), None => Ok(vec![]), } } fn total_units(&self) -> f64 { // TODO: Optimize this based on price of hardware. (self.vcpus as f64 * 5f64) + (self.memory_mib as f64 / 200f64) + (self.disk_size_mib as f64 / 1024f64 / 10f64) } pub fn price_per_minute(&self) -> u64 { (self.total_units() * self.price_per_unit as f64) as u64 } pub async fn list_all(db: &Surreal) -> Result, Error> { let mut query_response = db.query(format!("SELECT * FROM {ACTIVE_APP} FETCH out;")).await?; let active_apps: Vec = query_response.take(0)?; Ok(active_apps) } } #[derive(Debug, Serialize)] pub struct AppNodeResources { pub avail_ports: u32, pub avail_vcpus: u32, pub avail_mem_mib: u32, pub avail_storage_mib: u32, pub max_ports_per_app: u32, } impl AppNodeResources { pub async fn merge( self, db: &Surreal, node_pubkey: &str, ) -> Result, Error> { let app_node: Option = db.update((APP_NODE, node_pubkey)).merge(self).await?; log::trace!("Merged app node resources for {node_pubkey}: {:?}", app_node); Ok(app_node) } } #[derive(Debug, Serialize, Deserialize)] pub struct DeletedApp { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub app_node: RecordId, pub app_name: String, pub mapped_ports: Vec<(u32, u32)>, pub host_ipv4: String, pub vcpus: u32, pub memory_mib: u32, pub disk_size_mib: u32, pub created_at: Datetime, pub price_per_unit: u64, pub mr_enclave: String, pub package_url: String, pub hratls_pubkey: String, } impl DeletedApp { pub async fn list_by_node(db: &Surreal, node_pubkey: &str) -> Result, Error> { // TODO: bind all strings let mut result = db .query(format!("select * from {DELETED_APP} where out = {APP_NODE}:{node_pubkey};")) .await?; let contracts: Vec = result.take(0)?; Ok(contracts) } }