brain/src/db/app.rs
Noor 0ec1b61d8b
Admin and opertor features (#3)
all admin features listing accounts and contracts with access controll and test
ban a user by operator
tests for all features
mock data for test

Reviewed-on: #3
Co-authored-by: Noor <noormohammedb@protonmail.com>
Co-committed-by: Noor <noormohammedb@protonmail.com>
Signed-off-by: Noor <noormohammedb@protonmail.com>
2025-05-26 03:17:27 +05:30

520 lines
17 KiB
Rust

use std::time::Duration;
use super::Error;
use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ};
use crate::db;
use crate::db::general::Report;
use crate::old_brain;
use detee_shared::app_proto;
use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::Client;
use surrealdb::sql::Datetime;
use surrealdb::{Notification, RecordId, Surreal};
use tokio_stream::StreamExt;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AppNode {
pub id: RecordId,
pub operator: RecordId,
pub country: String,
pub region: String,
pub city: String,
pub ip: String,
pub avail_mem_mb: u32,
pub avail_vcpus: u32,
pub avail_storage_gbs: u32,
pub avail_ports: u32,
pub max_ports_per_app: u32,
pub price: u64,
pub offline_minutes: u64,
}
impl AppNode {
pub async fn register(self, db: &Surreal<Client>) -> Result<AppNode, Error> {
db::Account::get_or_create(db, &self.operator.key().to_string()).await?;
let app_node_id = self.id.clone();
let app_node: Option<AppNode> = db.upsert(app_node_id.clone()).content(self).await?;
app_node.ok_or(Error::FailedToCreateDBEntry(format!("{APP_NODE}:{app_node_id}")))
}
}
pub enum AppDaemonMsg {
Create(NewAppReq),
Delete(DeletedApp),
}
impl From<NewAppReq> for AppDaemonMsg {
fn from(value: NewAppReq) -> Self {
Self::Create(value)
}
}
impl From<DeletedApp> for AppDaemonMsg {
fn from(value: DeletedApp) -> Self {
Self::Delete(value)
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct NewAppReq {
pub id: RecordId,
#[serde(rename = "in")]
pub admin: RecordId,
#[serde(rename = "out")]
pub app_node: RecordId,
pub app_name: String,
pub package_url: String,
pub mr_enclave: String,
pub hratls_pubkey: String,
pub ports: Vec<u32>,
pub memory_mb: u32,
pub vcpu: u32,
pub disk_mb: u32,
pub locked_nano: u64,
pub price_per_unit: u64,
pub error: String,
pub created_at: Datetime,
}
impl NewAppReq {
pub async fn get(db: &Surreal<Client>, id: &str) -> Result<Option<Self>, Error> {
let new_app_req: Option<Self> = db.select((NEW_APP_REQ, id)).await?;
Ok(new_app_req)
}
pub async fn submit_error(
db: &Surreal<Client>,
id: &str,
error: String,
) -> Result<Option<Self>, Error> {
#[derive(Serialize)]
struct NewAppError {
error: String,
}
let record: Option<Self> =
db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?;
Ok(record)
}
pub async fn submit(self, db: &Surreal<Client>) -> Result<Vec<Self>, Error> {
// TODO: handle financial transaction
let new_app_req: Vec<Self> = db.insert(NEW_APP_REQ).relation(self).await?;
Ok(new_app_req)
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AppNodeWithReports {
pub id: RecordId,
pub operator: RecordId,
pub country: String,
pub region: String,
pub city: String,
pub ip: String,
pub avail_mem_mb: u32,
pub avail_vcpus: u32,
pub avail_storage_gbs: u32,
pub avail_ports: u32,
pub max_ports_per_app: u32,
pub price: u64,
pub offline_minutes: u64,
pub reports: Vec<Report>,
}
impl AppNodeWithReports {
pub async fn find_by_filters(
db: &Surreal<Client>,
filters: app_proto::AppNodeFilters,
limit_one: bool,
) -> Result<Vec<Self>, Error> {
let mut filter_query = format!(
"select *, <-report.* from {APP_NODE} where
avail_ports >= {} &&
max_ports_per_app >= {} &&
avail_vcpus >= {} &&
avail_mem_mb >= {} &&
avail_storage_gbs >= {} ",
filters.free_ports,
filters.free_ports,
filters.vcpus,
filters.memory_mb,
filters.storage_mb
);
if !filters.city.is_empty() {
filter_query += &format!("&& city = '{}' ", filters.city);
}
if !filters.region.is_empty() {
filter_query += &format!("&& region = '{}' ", filters.region);
}
if !filters.country.is_empty() {
filter_query += &format!("&& country = '{}' ", filters.country);
}
if !filters.ip.is_empty() {
filter_query += &format!("&& ip = '{}' ", filters.ip);
}
if limit_one {
filter_query += "limit 1";
}
filter_query += ";";
let mut query_resp = db.query(filter_query).await?;
let app_nodes: Vec<Self> = query_resp.take(0)?;
Ok(app_nodes)
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ActiveApp {
pub id: RecordId,
#[serde(rename = "in")]
pub admin: RecordId,
#[serde(rename = "out")]
pub app_node: RecordId,
pub app_name: String,
pub mapped_ports: Vec<(u32, u32)>,
pub host_ipv4: String,
pub vcpus: u32,
pub memory_mb: u32,
pub disk_size_gb: u32,
pub created_at: Datetime,
pub price_per_unit: u64,
pub locked_nano: u64,
pub collected_at: Datetime,
pub mr_enclave: String,
pub package_url: String,
pub hratls_pubkey: String,
}
impl From<ActiveApp> for DeletedApp {
fn from(value: ActiveApp) -> Self {
Self {
id: value.id,
admin: value.admin,
app_node: value.app_node,
app_name: value.app_name,
mapped_ports: value.mapped_ports,
host_ipv4: value.host_ipv4,
vcpus: value.vcpus,
memory_mb: value.memory_mb,
disk_size_gb: value.disk_size_gb,
created_at: value.created_at,
price_per_unit: value.price_per_unit,
locked_nano: value.locked_nano,
collected_at: value.collected_at,
mr_enclave: value.mr_enclave,
package_url: value.package_url,
hratls_pubkey: value.hratls_pubkey,
}
}
}
impl ActiveApp {
pub fn price_per_minute(&self) -> u64 {
(self.total_units() * self.price_per_unit as f64) as u64
}
fn total_units(&self) -> f64 {
// TODO: Optimize this based on price of hardware.
(self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64)
}
pub async fn activate(db: &Surreal<Client>, id: &str) -> Result<(), Error> {
let new_app_req = match NewAppReq::get(db, id).await? {
Some(r) => r,
None => return Ok(()),
};
let active_app = Self {
id: RecordId::from((ACTIVE_APP, id)),
admin: new_app_req.admin,
app_node: new_app_req.app_node,
app_name: new_app_req.app_name,
mapped_ports: vec![],
host_ipv4: String::new(),
vcpus: new_app_req.vcpu,
memory_mb: new_app_req.memory_mb,
disk_size_gb: new_app_req.disk_mb,
created_at: new_app_req.created_at.clone(),
price_per_unit: new_app_req.price_per_unit,
locked_nano: new_app_req.locked_nano,
collected_at: new_app_req.created_at,
mr_enclave: new_app_req.mr_enclave.clone(),
package_url: new_app_req.package_url.clone(),
hratls_pubkey: new_app_req.hratls_pubkey.clone(),
};
let _: Vec<ActiveApp> = db.insert(()).relation(active_app).await?;
Ok(())
}
pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<bool, Error> {
let deleted_app: Option<Self> = db.delete((ACTIVE_APP, id)).await?;
if let Some(deleted_app) = deleted_app {
let deleted_app: DeletedApp = deleted_app.into();
let _: Vec<DeletedApp> = db.insert(DELETED_APP).relation(deleted_app).await?;
Ok(true)
} else {
Ok(false)
}
}
pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<ActiveApp, Error> {
let mut query_response = db
.query(format!(
"live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};"
))
.query(format!("live select * from {ACTIVE_APP} where id = {ACTIVE_APP}:{app_id};"))
.await?;
let mut error_stream = query_response.stream::<Notification<db::ErrorFromTable>>(0)?;
let mut active_app_stream = query_response.stream::<Notification<ActiveApp>>(1)?;
tokio::time::timeout(Duration::from_secs(30), async {
loop {
tokio::select! {
Some(err_notif) = error_stream.next() =>{
match err_notif{
Ok(err_notif) =>{
if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){
return Err(Error::NewAppDaemonResp(err_notif.data.error))
}
}
Err(e) => return Err(e.into())
}
}
Some(active_app_notif) = active_app_stream.next() => {
match active_app_notif {
Ok(active_app_notif) =>{
if active_app_notif.action == surrealdb::Action::Create {
let _: Option<NewAppReq> = db.delete((NEW_APP_REQ, app_id)).await?;
return Ok(active_app_notif.data);
}
}
Err(e) => return Err(e.into())
}
}
}
}
})
.await?
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ActiveAppWithNode {
pub id: RecordId,
#[serde(rename = "in")]
pub admin: RecordId,
#[serde(rename = "out")]
pub app_node: AppNode,
pub app_name: String,
pub mapped_ports: Vec<(u32, u32)>,
pub host_ipv4: String,
pub vcpus: u32,
pub memory_mb: u32,
pub disk_size_gb: u32,
pub created_at: Datetime,
pub price_per_unit: u64,
pub locked_nano: u64,
pub collected_at: Datetime,
pub mr_enclave: String,
pub package_url: String,
pub hratls_pubkey: String,
}
impl From<ActiveAppWithNode> for ActiveApp {
fn from(val: ActiveAppWithNode) -> Self {
Self {
id: val.id,
admin: val.admin,
app_node: val.app_node.id,
app_name: val.app_name,
mapped_ports: val.mapped_ports,
host_ipv4: val.host_ipv4,
vcpus: val.vcpus,
memory_mb: val.memory_mb,
disk_size_gb: val.disk_size_gb,
created_at: val.created_at,
price_per_unit: val.price_per_unit,
locked_nano: val.locked_nano,
collected_at: val.collected_at,
mr_enclave: val.mr_enclave,
package_url: val.package_url,
hratls_pubkey: val.hratls_pubkey,
}
}
}
impl ActiveAppWithNode {
pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> {
let contract: Option<Self> = db
.query(format!("select * from {ACTIVE_APP} where id = $uuid_input fetch out;"))
.bind(("uuid_input", RecordId::from((ACTIVE_APP, uuid))))
.await?
.take(0)?;
Ok(contract)
}
pub async fn list_by_node(db: &Surreal<Client>, node_pubkey: &str) -> Result<Vec<Self>, Error> {
let mut query_result = db
.query(format!(
"select * from {ACTIVE_APP} where out = {APP_NODE}:{node_pubkey} fetch out;"
))
.await?;
let contract: Vec<Self> = query_result.take(0)?;
Ok(contract)
}
pub async fn list_by_admin(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> {
let mut query_result = db
.query(format!("select * from {ACTIVE_APP} where in = {ACCOUNT}:{admin} fetch out;"))
.await?;
let app_contracts: Vec<Self> = query_result.take(0)?;
Ok(app_contracts)
}
pub async fn list_by_operator(
db: &Surreal<Client>,
operator: &str,
) -> Result<Vec<Self>, Error> {
let mut query_result = db
.query(format!(
"select
(select * from ->operator->app_node<-{ACTIVE_APP} fetch out)
as app_contracts
from {ACCOUNT}:{operator}"
))
.await?;
#[derive(Deserialize)]
struct Wrapper {
app_contracts: Vec<ActiveAppWithNode>,
}
let c: Option<Wrapper> = query_result.take(0)?;
match c {
Some(contracts_wrapper) => Ok(contracts_wrapper.app_contracts),
None => Ok(vec![]),
}
}
pub async fn list_all(db: &Surreal<Client>) -> Result<Vec<Self>, Error> {
let mut query_response = db.query(format!("SELECT * FROM {ACTIVE_APP} FETCH out;")).await?;
let active_apps: Vec<Self> = query_response.take(0)?;
Ok(active_apps)
}
}
#[derive(Debug, Serialize)]
pub struct AppNodeResources {
pub avail_no_of_port: u32,
pub avail_vcpus: u32,
pub avail_memory_mb: u32,
pub avail_storage_mb: u32,
pub max_ports_per_app: u32,
}
impl AppNodeResources {
pub async fn merge(
self,
db: &Surreal<Client>,
node_pubkey: &str,
) -> Result<Option<AppNode>, Error> {
let app_node: Option<AppNode> = db.update((APP_NODE, node_pubkey)).merge(self).await?;
Ok(app_node)
}
}
impl From<&old_brain::BrainData> for Vec<AppNode> {
fn from(old_data: &old_brain::BrainData) -> Self {
let mut nodes = Vec::new();
for old_node in old_data.app_nodes.iter() {
nodes.push(AppNode {
id: RecordId::from((APP_NODE, old_node.node_pubkey.clone())),
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
country: old_node.country.clone(),
region: old_node.region.clone(),
city: old_node.city.clone(),
ip: old_node.ip.clone(),
avail_mem_mb: old_node.avail_mem_mb,
avail_vcpus: old_node.avail_vcpus,
avail_storage_gbs: old_node.avail_storage_mb,
avail_ports: old_node.avail_no_of_port,
max_ports_per_app: old_node.max_ports_per_app,
price: old_node.price,
offline_minutes: old_node.offline_minutes,
});
}
nodes
}
}
impl From<&old_brain::BrainData> for Vec<ActiveApp> {
fn from(old_data: &old_brain::BrainData) -> Self {
let mut contracts = Vec::new();
for old_c in old_data.app_contracts.iter() {
let mut mapped_ports = Vec::new();
for port in old_c.mapped_ports.clone().into_iter().map(|(b, c)| (b as u32, c as u32)) {
mapped_ports.push(port);
}
let mr_enclave_hex = old_c
.public_package_mr_enclave
.clone()
.unwrap_or_default()
.iter()
.map(|byte| format!("{byte:02X}"))
.collect();
contracts.push(ActiveApp {
id: RecordId::from((ACTIVE_APP, old_c.uuid.replace("-", ""))),
admin: RecordId::from((ACCOUNT, old_c.admin_pubkey.clone())),
app_node: RecordId::from((APP_NODE, old_c.node_pubkey.clone())),
mapped_ports,
host_ipv4: old_c.host_ipv4.clone(),
disk_size_gb: old_c.disk_size_mb * 1024,
vcpus: old_c.vcpus,
memory_mb: old_c.memory_mb,
price_per_unit: old_c.price_per_unit,
locked_nano: old_c.locked_nano,
created_at: old_c.created_at.into(),
collected_at: old_c.collected_at.into(),
app_name: old_c.app_name.clone(),
mr_enclave: mr_enclave_hex,
package_url: old_c.package_url.clone(),
hratls_pubkey: old_c.hratls_pubkey.clone(),
});
}
contracts
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DeletedApp {
pub id: RecordId,
#[serde(rename = "in")]
pub admin: RecordId,
#[serde(rename = "out")]
pub app_node: RecordId,
pub app_name: String,
pub mapped_ports: Vec<(u32, u32)>,
pub host_ipv4: String,
pub vcpus: u32,
pub memory_mb: u32,
pub disk_size_gb: u32,
pub created_at: Datetime,
pub price_per_unit: u64,
pub locked_nano: u64,
pub collected_at: Datetime,
pub mr_enclave: String,
pub package_url: String,
pub hratls_pubkey: String,
}