features: app engine #1
19
Cargo.lock
generated
19
Cargo.lock
generated
@ -1000,7 +1000,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "detee-shared"
|
||||
version = "0.1.0"
|
||||
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d0d4622c52efdf74ed6582fbac23a6159986ade3"
|
||||
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#da0f3269a31e0ebfb7328e2115e212aabe4d984a"
|
||||
dependencies = [
|
||||
"bincode 2.0.1",
|
||||
"prost",
|
||||
@ -2291,9 +2291,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "multimap"
|
||||
version = "0.10.0"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
|
||||
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
|
||||
|
||||
[[package]]
|
||||
name = "nanoid"
|
||||
@ -2756,7 +2756,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
|
||||
dependencies = [
|
||||
"heck 0.5.0",
|
||||
"itertools 0.13.0",
|
||||
"itertools 0.11.0",
|
||||
"log",
|
||||
"multimap",
|
||||
"once_cell",
|
||||
@ -2776,7 +2776,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.13.0",
|
||||
"itertools 0.11.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
@ -3315,9 +3315,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "1.0.5"
|
||||
version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf"
|
||||
checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"errno",
|
||||
@ -3796,6 +3796,7 @@ dependencies = [
|
||||
"ed25519-dalek",
|
||||
"env_logger",
|
||||
"futures",
|
||||
"hex",
|
||||
"hyper-util",
|
||||
"itertools 0.14.0",
|
||||
"log",
|
||||
@ -4015,9 +4016,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tempfile"
|
||||
version = "3.19.1"
|
||||
version = "3.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf"
|
||||
checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1"
|
||||
dependencies = [
|
||||
"fastrand",
|
||||
"getrandom 0.3.2",
|
||||
|
@ -13,7 +13,7 @@ serde_yaml = "0.9.34"
|
||||
surrealdb = "2.2.2"
|
||||
tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] }
|
||||
tonic = { version = "0.12", features = ["tls"] }
|
||||
detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain" }
|
||||
detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" }
|
||||
ed25519-dalek = "2.1.1"
|
||||
bs58 = "0.5.1"
|
||||
tokio-stream = "0.1.17"
|
||||
@ -22,6 +22,7 @@ env_logger = "0.11.8"
|
||||
thiserror = "2.0.12"
|
||||
nanoid = "0.4.0"
|
||||
dotenv = "0.15.0"
|
||||
hex = "0.4.3"
|
||||
|
||||
[profile.release]
|
||||
lto = true
|
||||
|
@ -91,6 +91,20 @@ DEFINE FIELD max_ports_per_app ON TABLE app_node TYPE int;
|
||||
DEFINE FIELD price ON TABLE app_node TYPE int;
|
||||
DEFINE FIELD offline_minutes ON TABLE app_node TYPE int;
|
||||
|
||||
DEFINE TABLE new_app_req Type RELATION FROM account to app_node SCHEMAFULL;
|
||||
DEFINE FIELD app_name ON TABLE new_app_req TYPE string;
|
||||
DEFINE FIELD package_url ON TABLE new_app_req TYPE string;
|
||||
DEFINE FIELD mr_enclave ON TABLE new_app_req TYPE string;
|
||||
DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string;
|
||||
DEFINE FIELD ports ON TABLE new_app_req TYPE array<int>;
|
||||
DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD vcpu ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD disk_mb ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD error ON TABLE new_app_req TYPE string;
|
||||
DEFINE FIELD created_at ON TABLE new_app_req TYPE datetime;
|
||||
|
||||
DEFINE TABLE active_app TYPE RELATION FROM account TO app_node SCHEMAFULL;
|
||||
DEFINE FIELD app_name ON TABLE active_app TYPE string;
|
||||
DEFINE FIELD mapped_ports ON TABLE active_app TYPE array<[int, int]>;
|
||||
|
@ -1,11 +1,13 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use detee_shared::app_proto::brain_app_cli_server::BrainAppCliServer;
|
||||
use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemonServer;
|
||||
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
|
||||
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
|
||||
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
|
||||
use dotenv::dotenv;
|
||||
use std::sync::Arc;
|
||||
use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH};
|
||||
use surreal_brain::db;
|
||||
use surreal_brain::grpc::app::{AppCliServer, AppDaemonServer};
|
||||
use surreal_brain::grpc::general::GeneralCliServer;
|
||||
use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer};
|
||||
use tonic::transport::{Identity, Server, ServerTlsConfig};
|
||||
@ -15,7 +17,11 @@ async fn main() {
|
||||
if dotenv::from_filename("/etc/detee/brain/config.ini").is_err() {
|
||||
dotenv().ok();
|
||||
}
|
||||
env_logger::builder().filter_level(log::LevelFilter::Debug).init();
|
||||
env_logger::builder()
|
||||
.filter_level(log::LevelFilter::Trace)
|
||||
.filter_module("tungstenite", log::LevelFilter::Debug)
|
||||
.filter_module("tokio_tungstenite", log::LevelFilter::Debug)
|
||||
.init();
|
||||
|
||||
let db_url = std::env::var("DB_URL").expect("the environment variable DB_URL is not set");
|
||||
let db_user = std::env::var("DB_USER").expect("the environment variable DB_USER is not set");
|
||||
@ -31,6 +37,8 @@ async fn main() {
|
||||
let snp_daemon_server = BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()));
|
||||
let snp_cli_server = BrainVmCliServer::new(VmCliServer::new(db_arc.clone()));
|
||||
let general_service_server = BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()));
|
||||
let sgx_daemon_server = BrainAppDaemonServer::new(AppDaemonServer::new(db_arc.clone()));
|
||||
let sgx_cli_server = BrainAppCliServer::new(AppCliServer::new(db_arc.clone()));
|
||||
|
||||
let cert_path = std::env::var("CERT_PATH").unwrap_or(CERT_PATH.to_string());
|
||||
let key_path = std::env::var("CERT_KEY_PATH").unwrap_or(CERT_KEY_PATH.to_string());
|
||||
@ -45,6 +53,8 @@ async fn main() {
|
||||
.add_service(snp_daemon_server)
|
||||
.add_service(snp_cli_server)
|
||||
.add_service(general_service_server)
|
||||
.add_service(sgx_daemon_server)
|
||||
.add_service(sgx_cli_server)
|
||||
.serve(addr)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -32,6 +32,9 @@ pub const DELETED_VM: &str = "deleted_vm";
|
||||
pub const VM_CONTRACT: &str = "vm_contract";
|
||||
|
||||
pub const ACTIVE_APP: &str = "active_app";
|
||||
pub const APP_NODE: &str = "app_node";
|
||||
pub const NEW_APP_REQ: &str = "new_app_req";
|
||||
pub const DELETED_APP: &str = "deleted_app";
|
||||
|
||||
pub const ID_ALPHABET: [char; 62] = [
|
||||
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
|
||||
|
353
src/db/app.rs
353
src/db/app.rs
@ -1,12 +1,16 @@
|
||||
use crate::constants::{ACCOUNT, ACTIVE_APP};
|
||||
use crate::db::general::Report;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::Error;
|
||||
use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ};
|
||||
use crate::db;
|
||||
use crate::db::general::Report;
|
||||
use crate::old_brain;
|
||||
use detee_shared::app_proto;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use surrealdb::engine::remote::ws::Client;
|
||||
use surrealdb::sql::Datetime;
|
||||
use surrealdb::{RecordId, Surreal};
|
||||
use surrealdb::{Notification, RecordId, Surreal};
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AppNode {
|
||||
@ -25,7 +29,80 @@ pub struct AppNode {
|
||||
pub offline_minutes: u64,
|
||||
}
|
||||
|
||||
impl AppNode {
|
||||
pub async fn register(self, db: &Surreal<Client>) -> Result<AppNode, Error> {
|
||||
db::Account::get_or_create(db, &self.operator.key().to_string()).await?;
|
||||
let app_node: Option<AppNode> = db.upsert(self.id.clone()).content(self).await?;
|
||||
app_node.ok_or(Error::FailedToCreateDBEntry)
|
||||
}
|
||||
}
|
||||
|
||||
pub enum AppDaemonMsg {
|
||||
Create(NewAppReq),
|
||||
Delete(DeletedApp),
|
||||
}
|
||||
|
||||
impl From<NewAppReq> for AppDaemonMsg {
|
||||
fn from(value: NewAppReq) -> Self {
|
||||
Self::Create(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DeletedApp> for AppDaemonMsg {
|
||||
fn from(value: DeletedApp) -> Self {
|
||||
Self::Delete(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct NewAppReq {
|
||||
pub id: RecordId,
|
||||
#[serde(rename = "in")]
|
||||
pub admin: RecordId,
|
||||
#[serde(rename = "out")]
|
||||
pub app_node: RecordId,
|
||||
pub app_name: String,
|
||||
pub package_url: String,
|
||||
pub mr_enclave: String,
|
||||
pub hratls_pubkey: String,
|
||||
pub ports: Vec<u32>,
|
||||
pub memory_mb: u32,
|
||||
pub vcpu: u32,
|
||||
pub disk_mb: u32,
|
||||
pub locked_nano: u64,
|
||||
pub price_per_unit: u64,
|
||||
pub error: String,
|
||||
pub created_at: Datetime,
|
||||
}
|
||||
|
||||
impl NewAppReq {
|
||||
pub async fn get(db: &Surreal<Client>, id: &str) -> Result<Option<Self>, Error> {
|
||||
let new_app_req: Option<Self> = db.select((NEW_APP_REQ, id)).await?;
|
||||
Ok(new_app_req)
|
||||
}
|
||||
pub async fn submit_error(
|
||||
db: &Surreal<Client>,
|
||||
id: &str,
|
||||
error: String,
|
||||
) -> Result<Option<Self>, Error> {
|
||||
#[derive(Serialize)]
|
||||
struct NewAppError {
|
||||
error: String,
|
||||
}
|
||||
|
||||
let record: Option<Self> =
|
||||
db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?;
|
||||
|
||||
Ok(record)
|
||||
}
|
||||
|
||||
pub async fn submit(self, db: &Surreal<Client>) -> Result<Vec<Self>, Error> {
|
||||
let new_app_req: Vec<Self> = db.insert(NEW_APP_REQ).relation(self).await?;
|
||||
Ok(new_app_req)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct AppNodeWithReports {
|
||||
pub id: RecordId,
|
||||
pub operator: RecordId,
|
||||
@ -43,26 +120,177 @@ pub struct AppNodeWithReports {
|
||||
pub reports: Vec<Report>,
|
||||
}
|
||||
|
||||
impl AppNodeWithReports {
|
||||
pub async fn find_by_filters(
|
||||
db: &Surreal<Client>,
|
||||
filters: app_proto::AppNodeFilters,
|
||||
limit_one: bool,
|
||||
) -> Result<Vec<Self>, Error> {
|
||||
let mut filter_query = format!(
|
||||
"select *, <-report.* from {APP_NODE} where
|
||||
avail_ports >= {} &&
|
||||
max_ports_per_app >= {} &&
|
||||
avail_vcpus >= {} &&
|
||||
avail_mem_mb >= {} &&
|
||||
avail_storage_gbs >= {} ",
|
||||
filters.free_ports,
|
||||
filters.free_ports,
|
||||
filters.vcpus,
|
||||
filters.memory_mb,
|
||||
filters.storage_mb
|
||||
);
|
||||
|
||||
if !filters.city.is_empty() {
|
||||
filter_query += &format!("&& city = '{}' ", filters.city);
|
||||
}
|
||||
if !filters.region.is_empty() {
|
||||
filter_query += &format!("&& region = '{}' ", filters.region);
|
||||
}
|
||||
if !filters.country.is_empty() {
|
||||
filter_query += &format!("&& country = '{}' ", filters.country);
|
||||
}
|
||||
if !filters.ip.is_empty() {
|
||||
filter_query += &format!("&& ip = '{}' ", filters.ip);
|
||||
}
|
||||
|
||||
if limit_one {
|
||||
filter_query += "limit 1";
|
||||
}
|
||||
|
||||
filter_query += ";";
|
||||
let mut query_resp = db.query(filter_query).await?;
|
||||
let app_nodes: Vec<Self> = query_resp.take(0)?;
|
||||
Ok(app_nodes)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ActiveApp {
|
||||
id: RecordId,
|
||||
pub id: RecordId,
|
||||
#[serde(rename = "in")]
|
||||
admin: RecordId,
|
||||
pub admin: RecordId,
|
||||
#[serde(rename = "out")]
|
||||
app_node: RecordId,
|
||||
app_name: String,
|
||||
mapped_ports: Vec<(u64, u64)>,
|
||||
host_ipv4: String,
|
||||
vcpus: u64,
|
||||
memory_mb: u64,
|
||||
disk_size_gb: u64,
|
||||
created_at: Datetime,
|
||||
price_per_unit: u64,
|
||||
locked_nano: u64,
|
||||
collected_at: Datetime,
|
||||
mr_enclave: String,
|
||||
package_url: String,
|
||||
hratls_pubkey: String,
|
||||
pub app_node: RecordId,
|
||||
pub app_name: String,
|
||||
pub mapped_ports: Vec<(u64, u64)>,
|
||||
pub host_ipv4: String,
|
||||
pub vcpus: u64,
|
||||
pub memory_mb: u64,
|
||||
pub disk_size_gb: u64,
|
||||
pub created_at: Datetime,
|
||||
pub price_per_unit: u64,
|
||||
pub locked_nano: u64,
|
||||
pub collected_at: Datetime,
|
||||
pub mr_enclave: String,
|
||||
pub package_url: String,
|
||||
pub hratls_pubkey: String,
|
||||
}
|
||||
|
||||
impl From<ActiveApp> for DeletedApp {
|
||||
fn from(value: ActiveApp) -> Self {
|
||||
Self {
|
||||
id: value.id,
|
||||
admin: value.admin,
|
||||
app_node: value.app_node,
|
||||
app_name: value.app_name,
|
||||
mapped_ports: value.mapped_ports,
|
||||
host_ipv4: value.host_ipv4,
|
||||
vcpus: value.vcpus,
|
||||
memory_mb: value.memory_mb,
|
||||
disk_size_gb: value.disk_size_gb,
|
||||
created_at: value.created_at,
|
||||
price_per_unit: value.price_per_unit,
|
||||
locked_nano: value.locked_nano,
|
||||
collected_at: value.collected_at,
|
||||
mr_enclave: value.mr_enclave,
|
||||
package_url: value.package_url,
|
||||
hratls_pubkey: value.hratls_pubkey,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveApp {
|
||||
pub async fn activate(db: &Surreal<Client>, id: &str) -> Result<(), Error> {
|
||||
let new_app_req = match NewAppReq::get(db, id).await? {
|
||||
Some(r) => r,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let active_app = Self {
|
||||
id: RecordId::from((ACTIVE_APP, id)),
|
||||
admin: new_app_req.admin,
|
||||
app_node: new_app_req.app_node,
|
||||
app_name: new_app_req.app_name,
|
||||
mapped_ports: vec![],
|
||||
host_ipv4: String::new(),
|
||||
vcpus: new_app_req.vcpu as u64,
|
||||
memory_mb: new_app_req.memory_mb as u64,
|
||||
disk_size_gb: new_app_req.disk_mb as u64,
|
||||
created_at: new_app_req.created_at.clone(),
|
||||
price_per_unit: new_app_req.price_per_unit,
|
||||
locked_nano: new_app_req.locked_nano,
|
||||
collected_at: new_app_req.created_at,
|
||||
mr_enclave: new_app_req.mr_enclave.clone(),
|
||||
package_url: new_app_req.package_url.clone(),
|
||||
hratls_pubkey: new_app_req.hratls_pubkey.clone(),
|
||||
};
|
||||
|
||||
let _: Vec<ActiveApp> = db.insert(()).relation(active_app).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<bool, Error> {
|
||||
let deleted_app: Option<Self> = db.delete((ACTIVE_APP, id)).await?;
|
||||
if let Some(deleted_app) = deleted_app {
|
||||
let deleted_app: DeletedApp = deleted_app.into();
|
||||
let _: Vec<DeletedApp> = db.insert(DELETED_APP).relation(deleted_app).await?;
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<ActiveApp, Error> {
|
||||
let mut query_response = db
|
||||
.query(format!(
|
||||
"live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};"
|
||||
))
|
||||
.query(format!("live select * from {ACTIVE_APP} where id = {ACTIVE_APP}:{app_id};"))
|
||||
.await?;
|
||||
|
||||
let mut error_stream = query_response.stream::<Notification<db::ErrorFromTable>>(0)?;
|
||||
let mut active_app_stream = query_response.stream::<Notification<ActiveApp>>(1)?;
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(30), async {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(err_notif) = error_stream.next() =>{
|
||||
match err_notif{
|
||||
Ok(err_notif) =>{
|
||||
if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){
|
||||
return Err(Error::NewAppDaemonResp(err_notif.data.error))
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(e.into())
|
||||
}
|
||||
|
||||
}
|
||||
Some(active_app_notif) = active_app_stream.next() => {
|
||||
match active_app_notif {
|
||||
Ok(active_app_notif) =>{
|
||||
if active_app_notif.action == surrealdb::Action::Create {
|
||||
let _: Option<NewAppReq> = db.delete((NEW_APP_REQ, app_id)).await?;
|
||||
return Ok(active_app_notif.data);
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(e.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@ -93,6 +321,71 @@ impl ActiveAppWithNode {
|
||||
db.query(format!("select * from {ACTIVE_APP}:{uuid} fetch out;")).await?.take(0)?;
|
||||
Ok(contract)
|
||||
}
|
||||
|
||||
pub async fn list_by_node(db: &Surreal<Client>, node_pubkey: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut query_result = db
|
||||
.query(format!(
|
||||
"select * from {ACTIVE_APP} where out = {APP_NODE}:{node_pubkey} fetch out;"
|
||||
))
|
||||
.await?;
|
||||
|
||||
let contract: Vec<Self> = query_result.take(0)?;
|
||||
Ok(contract)
|
||||
}
|
||||
|
||||
pub async fn list_by_admin(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut query_result = db
|
||||
.query(format!("select * from {ACTIVE_APP} where in = {ACCOUNT}:{admin} fetch out;"))
|
||||
.await?;
|
||||
|
||||
let app_contracts: Vec<Self> = query_result.take(0)?;
|
||||
|
||||
Ok(app_contracts)
|
||||
}
|
||||
|
||||
pub async fn list_by_operator(
|
||||
db: &Surreal<Client>,
|
||||
operator: &str,
|
||||
) -> Result<Vec<Self>, Error> {
|
||||
let mut query_result = db
|
||||
.query(format!(
|
||||
"select
|
||||
(select * from ->operator->app_node<-{ACTIVE_APP} fetch out)
|
||||
as app_contracts
|
||||
from {ACCOUNT}:{operator}"
|
||||
))
|
||||
.await?;
|
||||
#[derive(Deserialize)]
|
||||
struct Wrapper {
|
||||
app_contracts: Vec<ActiveAppWithNode>,
|
||||
}
|
||||
|
||||
let c: Option<Wrapper> = query_result.take(0)?;
|
||||
match c {
|
||||
Some(contracts_wrapper) => Ok(contracts_wrapper.app_contracts),
|
||||
None => Ok(vec![]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct AppNodeResources {
|
||||
pub avail_no_of_port: u32,
|
||||
pub avail_vcpus: u32,
|
||||
pub avail_memory_mb: u32,
|
||||
pub avail_storage_mb: u32,
|
||||
pub max_ports_per_app: u32,
|
||||
}
|
||||
|
||||
impl AppNodeResources {
|
||||
pub async fn merge(
|
||||
self,
|
||||
db: &Surreal<Client>,
|
||||
node_pubkey: &str,
|
||||
) -> Result<Option<AppNode>, Error> {
|
||||
let app_node: Option<AppNode> = db.update((APP_NODE, node_pubkey)).merge(self).await?;
|
||||
Ok(app_node)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&old_brain::BrainData> for Vec<AppNode> {
|
||||
@ -118,3 +411,25 @@ impl From<&old_brain::BrainData> for Vec<AppNode> {
|
||||
nodes
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct DeletedApp {
|
||||
pub id: RecordId,
|
||||
#[serde(rename = "in")]
|
||||
pub admin: RecordId,
|
||||
#[serde(rename = "out")]
|
||||
pub app_node: RecordId,
|
||||
pub app_name: String,
|
||||
pub mapped_ports: Vec<(u64, u64)>,
|
||||
pub host_ipv4: String,
|
||||
pub vcpus: u64,
|
||||
pub memory_mb: u64,
|
||||
pub disk_size_gb: u64,
|
||||
pub created_at: Datetime,
|
||||
pub price_per_unit: u64,
|
||||
pub locked_nano: u64,
|
||||
pub collected_at: Datetime,
|
||||
pub mr_enclave: String,
|
||||
pub package_url: String,
|
||||
pub hratls_pubkey: String,
|
||||
}
|
||||
|
@ -57,16 +57,25 @@ impl Account {
|
||||
user: &str,
|
||||
node: &str,
|
||||
) -> Result<bool, Error> {
|
||||
let ban: Option<Self> = db
|
||||
let mut query_response = db
|
||||
.query(format!(
|
||||
"(select operator->ban[0] as ban
|
||||
from vm_node:{node}
|
||||
where operator->ban->account contains account:{user}
|
||||
).ban;"
|
||||
))
|
||||
.await?
|
||||
.take(0)?;
|
||||
Ok(ban.is_some())
|
||||
.query(format!(
|
||||
"(select operator->ban[0] as ban
|
||||
from app_node:{node}
|
||||
where operator->ban->account contains account:{user}
|
||||
).ban;"
|
||||
))
|
||||
.await?;
|
||||
|
||||
let vm_node_ban: Option<Self> = query_response.take(0)?;
|
||||
let app_node_ban: Option<Self> = query_response.take(1)?;
|
||||
|
||||
Ok(vm_node_ban.is_some() || app_node_ban.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
@ -113,7 +122,7 @@ pub struct Kick {
|
||||
contract: RecordId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Report {
|
||||
#[serde(rename = "in")]
|
||||
from_account: RecordId,
|
||||
|
@ -2,7 +2,7 @@ pub mod app;
|
||||
pub mod general;
|
||||
pub mod vm;
|
||||
|
||||
use crate::constants::{DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ};
|
||||
use crate::constants::{APP_NODE, DELETED_APP, DELETED_VM, NEW_APP_REQ, NEW_VM_REQ, UPDATE_VM_REQ};
|
||||
use crate::old_brain;
|
||||
use prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -24,6 +24,12 @@ pub enum Error {
|
||||
TimeOut(#[from] tokio::time::error::Elapsed),
|
||||
#[error("Failed to create account")]
|
||||
FailedToCreateDBEntry,
|
||||
#[error("Unknown Table: {0}")]
|
||||
UnknownTable(String),
|
||||
#[error("Daemon channel got closed: {0}")]
|
||||
AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>),
|
||||
#[error("AppDaemon Error {0}")]
|
||||
NewAppDaemonResp(String),
|
||||
}
|
||||
|
||||
pub mod prelude {
|
||||
@ -95,9 +101,9 @@ pub async fn live_vmnode_msgs<
|
||||
t if t == std::any::type_name::<crate::db::vm::NewVmReq>() => NEW_VM_REQ.to_string(),
|
||||
t if t == std::any::type_name::<crate::db::vm::UpdateVmReq>() => UPDATE_VM_REQ.to_string(),
|
||||
t if t == std::any::type_name::<crate::db::vm::DeletedVm>() => DELETED_VM.to_string(),
|
||||
wat => {
|
||||
log::error!("listen_for_node: T has type {wat}");
|
||||
String::from("wat")
|
||||
t => {
|
||||
log::error!("live_vmnode_msgs type {t} not supported",);
|
||||
return Err(Error::UnknownTable(t.to_string()));
|
||||
}
|
||||
};
|
||||
let mut resp =
|
||||
@ -119,3 +125,48 @@ pub async fn live_vmnode_msgs<
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn live_appnode_msgs<
|
||||
T: std::fmt::Debug + Into<app::AppDaemonMsg> + std::marker::Unpin + for<'de> Deserialize<'de>,
|
||||
>(
|
||||
db: &Surreal<Client>,
|
||||
node_pubkey: &str,
|
||||
tx: Sender<app::AppDaemonMsg>,
|
||||
) -> Result<(), Error> {
|
||||
let table_name = match std::any::type_name::<T>() {
|
||||
t if t == std::any::type_name::<crate::db::app::NewAppReq>() => NEW_APP_REQ.to_string(),
|
||||
t if t == std::any::type_name::<crate::db::app::DeletedApp>() => DELETED_APP.to_string(),
|
||||
t => {
|
||||
log::error!("live_appnode_msgs type {t} not supported",);
|
||||
return Err(Error::UnknownTable(t.to_string()));
|
||||
}
|
||||
};
|
||||
let mut query_resp = db
|
||||
.query(format!("live select * from {table_name} where out = {APP_NODE}:{node_pubkey};"))
|
||||
.await?;
|
||||
|
||||
let mut live_stream = query_resp.stream::<Notification<T>>(0)?;
|
||||
while let Some(result) = live_stream.next().await {
|
||||
match result {
|
||||
Ok(notification) => {
|
||||
log::debug!("Got notification for node {node_pubkey}: {notification:?}");
|
||||
if notification.action == surrealdb::Action::Create {
|
||||
tx.send(notification.data.into()).await?
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
"live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}"
|
||||
);
|
||||
return Err(Error::from(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct ErrorFromTable {
|
||||
pub error: String,
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ use super::Error;
|
||||
use crate::constants::{
|
||||
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT,
|
||||
};
|
||||
use crate::db::{Account, Report};
|
||||
use crate::db::{Account, ErrorFromTable, Report};
|
||||
use crate::old_brain;
|
||||
use detee_shared::vm_proto;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -214,17 +214,13 @@ impl WrappedMeasurement {
|
||||
UPDATE_VM_REQ => UPDATE_VM_REQ,
|
||||
_ => NEW_VM_REQ,
|
||||
};
|
||||
#[derive(Deserialize)]
|
||||
struct ErrorMessage {
|
||||
error: String,
|
||||
}
|
||||
let mut resp = db
|
||||
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
|
||||
.query(format!(
|
||||
"live select * from measurement_args where id = measurement_args:{vm_id};"
|
||||
))
|
||||
.await?;
|
||||
let mut error_stream = resp.stream::<Notification<ErrorMessage>>(0)?;
|
||||
let mut error_stream = resp.stream::<Notification<ErrorFromTable>>(0)?;
|
||||
let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?;
|
||||
|
||||
let args: Option<vm_proto::MeasurementArgs> =
|
||||
|
301
src/grpc/app.rs
301
src/grpc/app.rs
@ -1 +1,302 @@
|
||||
use crate::constants::{ACCOUNT, APP_NODE};
|
||||
use crate::db::app::ActiveApp;
|
||||
use crate::db::prelude as db;
|
||||
use crate::grpc::{check_sig_from_parts, check_sig_from_req};
|
||||
use detee_shared::app_proto::brain_app_cli_server::BrainAppCli;
|
||||
use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon;
|
||||
use detee_shared::app_proto::*;
|
||||
use detee_shared::common_proto::Empty;
|
||||
use log::info;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use surrealdb::engine::remote::ws::Client;
|
||||
use surrealdb::RecordId;
|
||||
use surrealdb::Surreal;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_stream::{Stream, StreamExt};
|
||||
use tonic::{Response, Status, Streaming};
|
||||
|
||||
pub struct AppDaemonServer {
|
||||
pub db: Arc<Surreal<Client>>,
|
||||
}
|
||||
|
||||
impl AppDaemonServer {
|
||||
pub fn new(db: Arc<Surreal<Client>>) -> Self {
|
||||
Self { db }
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl BrainAppDaemon for AppDaemonServer {
|
||||
type RegisterAppNodeStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
|
||||
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessageApp, Status>> + Send>>;
|
||||
|
||||
async fn register_app_node(
|
||||
&self,
|
||||
req: tonic::Request<RegisterAppNodeReq>,
|
||||
) -> Result<tonic::Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("Starting app_node registration process for {:?}", req);
|
||||
|
||||
let app_node = db::AppNode {
|
||||
id: RecordId::from((APP_NODE, req.node_pubkey.clone())),
|
||||
operator: RecordId::from((ACCOUNT, req.operator_wallet)),
|
||||
country: req.country,
|
||||
region: req.region,
|
||||
city: req.city,
|
||||
ip: req.main_ip,
|
||||
price: req.price,
|
||||
|
||||
avail_mem_mb: 0,
|
||||
avail_vcpus: 0,
|
||||
avail_storage_gbs: 0,
|
||||
avail_ports: 0,
|
||||
max_ports_per_app: 0,
|
||||
offline_minutes: 0,
|
||||
};
|
||||
|
||||
app_node.register(&self.db).await?;
|
||||
info!("Sending existing contracts to {}", req.node_pubkey);
|
||||
|
||||
let contracts = db::ActiveAppWithNode::list_by_node(&self.db, &req.node_pubkey).await?;
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for contract in contracts {
|
||||
let _ = tx.send(Ok(contract.into())).await;
|
||||
}
|
||||
});
|
||||
|
||||
let resp_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(Box::pin(resp_stream)))
|
||||
}
|
||||
|
||||
async fn brain_messages(
|
||||
&self,
|
||||
req: tonic::Request<DaemonAuth>,
|
||||
) -> Result<tonic::Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> {
|
||||
let auth = req.into_inner();
|
||||
let pubkey = auth.pubkey.clone();
|
||||
check_sig_from_parts(
|
||||
&pubkey,
|
||||
&auth.timestamp,
|
||||
&format!("{:?}", auth.contracts),
|
||||
&auth.signature,
|
||||
)?;
|
||||
|
||||
info!("App Daemon {} connected to receive brain messages", pubkey);
|
||||
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
{
|
||||
let db = self.db.clone();
|
||||
let pubkey = pubkey.clone();
|
||||
let tx = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = db::live_appnode_msgs::<db::NewAppReq>(&db, &pubkey, tx).await;
|
||||
});
|
||||
}
|
||||
{
|
||||
let db = self.db.clone();
|
||||
let pubkey = pubkey.clone();
|
||||
let tx = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = db::live_appnode_msgs::<db::DeletedApp>(&db, &pubkey, tx).await;
|
||||
});
|
||||
}
|
||||
|
||||
let resp_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into()));
|
||||
Ok(Response::new(Box::pin(resp_stream)))
|
||||
}
|
||||
|
||||
async fn daemon_messages(
|
||||
&self,
|
||||
req: tonic::Request<Streaming<DaemonMessageApp>>,
|
||||
) -> Result<tonic::Response<Empty>, Status> {
|
||||
let mut req_stream = req.into_inner();
|
||||
let pubkey: String;
|
||||
if let Some(Ok(msg)) = req_stream.next().await {
|
||||
log::debug!("App daemon_messages received auth message: {:?}", msg);
|
||||
if let Some(daemon_message_app::Msg::Auth(auth)) = msg.msg {
|
||||
pubkey = auth.pubkey.clone();
|
||||
check_sig_from_parts(
|
||||
&pubkey,
|
||||
&auth.timestamp,
|
||||
&format!("{:?}", &auth.contracts),
|
||||
&auth.signature,
|
||||
)?;
|
||||
} else {
|
||||
return Err(Status::unauthenticated(
|
||||
"Could not authenticate the app daemon: could not extract auth signature",
|
||||
));
|
||||
}
|
||||
} else {
|
||||
return Err(Status::unauthenticated("Could not authenticate the app daemon"));
|
||||
}
|
||||
|
||||
while let Some(daemon_message) = req_stream.next().await {
|
||||
match daemon_message {
|
||||
Ok(msg) => match msg.msg {
|
||||
Some(daemon_message_app::Msg::NewAppRes(new_app_resp)) => {
|
||||
if !new_app_resp.error.is_empty() {
|
||||
db::NewAppReq::submit_error(
|
||||
&self.db,
|
||||
&new_app_resp.uuid,
|
||||
new_app_resp.error,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?;
|
||||
}
|
||||
}
|
||||
Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => {
|
||||
let node_resource: db::AppNodeResources = app_node_resources.into();
|
||||
node_resource.merge(&self.db, &pubkey).await?;
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
|
||||
Err(e) => {
|
||||
log::warn!("App Daemon Disconnected: {e:?}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AppCliServer {
|
||||
pub db: Arc<Surreal<Client>>,
|
||||
}
|
||||
|
||||
impl AppCliServer {
|
||||
pub fn new(db: Arc<Surreal<Client>>) -> Self {
|
||||
Self { db }
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl BrainAppCli for AppCliServer {
|
||||
type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
|
||||
type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>;
|
||||
|
||||
async fn new_app(
|
||||
&self,
|
||||
req: tonic::Request<detee_shared::app_proto::NewAppReq>,
|
||||
) -> Result<tonic::Response<detee_shared::app_proto::NewAppRes>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("deploy_app process starting for {:?}", req);
|
||||
|
||||
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
|
||||
return Err(Status::permission_denied("This operator banned you. What did you do?"));
|
||||
}
|
||||
let new_app_req: db::NewAppReq = req.into();
|
||||
let id = new_app_req.id.to_string();
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
let db = self.db.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = tx.send(db::ActiveApp::listen(&db, &id).await);
|
||||
});
|
||||
|
||||
new_app_req.submit(&self.db).await?;
|
||||
|
||||
match rx.await {
|
||||
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
|
||||
"Network timeout. Please try again later or contact the DeTEE devs team.",
|
||||
)),
|
||||
Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)),
|
||||
Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())),
|
||||
Err(e) => {
|
||||
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
|
||||
Err(Status::unknown(
|
||||
"Unknown error. Please try again or contact the DeTEE devs team.",
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_app(
|
||||
&self,
|
||||
req: tonic::Request<DelAppReq>,
|
||||
) -> Result<tonic::Response<Empty>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("delete_app process starting for {:?}", req);
|
||||
match ActiveApp::delete(&self.db, &req.uuid).await? {
|
||||
true => Ok(Response::new(Empty {})),
|
||||
false => Err(Status::not_found(format!("Could not find App contract {}", &req.uuid))),
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_app_contracts(
|
||||
&self,
|
||||
req: tonic::Request<ListAppContractsReq>,
|
||||
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("list_app_contracts process starting for {:?}", req);
|
||||
|
||||
let mut app_contracts = Vec::new();
|
||||
|
||||
if !req.uuid.is_empty() {
|
||||
if let Some(app_contract) =
|
||||
db::ActiveAppWithNode::get_by_uuid(&self.db, &req.uuid).await?
|
||||
{
|
||||
if app_contract.admin.key().to_string() == req.admin_pubkey
|
||||
|| app_contract.app_node.operator.key().to_string() == req.admin_pubkey
|
||||
{
|
||||
app_contracts.push(app_contract);
|
||||
}
|
||||
}
|
||||
} else if req.as_operator {
|
||||
app_contracts.append(
|
||||
&mut db::ActiveAppWithNode::list_by_operator(&self.db, &req.admin_pubkey).await?,
|
||||
);
|
||||
} else {
|
||||
app_contracts.append(
|
||||
&mut db::ActiveAppWithNode::list_by_admin(&self.db, &req.admin_pubkey).await?,
|
||||
);
|
||||
}
|
||||
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for app_contract in app_contracts {
|
||||
let _ = tx.send(Ok(app_contract.into())).await;
|
||||
}
|
||||
});
|
||||
|
||||
let resp_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(Box::pin(resp_stream)))
|
||||
}
|
||||
|
||||
async fn list_app_nodes(
|
||||
&self,
|
||||
req: tonic::Request<AppNodeFilters>,
|
||||
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("list_app_nodes process starting for {:?}", req);
|
||||
let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?;
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for app_node in app_nodes {
|
||||
let _ = tx.send(Ok(app_node.into())).await;
|
||||
}
|
||||
});
|
||||
let resp_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(Box::pin(resp_stream)))
|
||||
}
|
||||
|
||||
async fn get_one_app_node(
|
||||
&self,
|
||||
req: tonic::Request<AppNodeFilters>,
|
||||
) -> Result<tonic::Response<AppNodeListResp>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("get_one_app_node process starting for {:?}", req);
|
||||
let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true)
|
||||
.await?
|
||||
.first()
|
||||
.ok_or(Status::not_found("No app node found"))?
|
||||
.clone();
|
||||
|
||||
Ok(Response::new(app_node.into()))
|
||||
}
|
||||
}
|
||||
|
@ -4,11 +4,12 @@ pub mod types;
|
||||
pub mod vm;
|
||||
|
||||
use crate::constants::ADMIN_ACCOUNTS;
|
||||
use detee_shared::app_proto::*;
|
||||
use detee_shared::common_proto::{Empty, Pubkey};
|
||||
use detee_shared::general_proto::{
|
||||
AirdropReq, BanUserReq, KickReq, RegOperatorReq, ReportNodeReq, SlashReq,
|
||||
};
|
||||
use detee_shared::vm_proto::{ListVmContractsReq, *};
|
||||
use detee_shared::vm_proto::*;
|
||||
use tonic::{Request, Status};
|
||||
|
||||
pub trait PubkeyGetter {
|
||||
@ -49,14 +50,15 @@ impl_pubkey_getter!(Empty);
|
||||
impl_pubkey_getter!(AirdropReq);
|
||||
impl_pubkey_getter!(SlashReq);
|
||||
|
||||
// impl_pubkey_getter!(NewAppReq, admin_pubkey);
|
||||
// impl_pubkey_getter!(DelAppReq, admin_pubkey);
|
||||
// impl_pubkey_getter!(ListAppContractsReq, admin_pubkey);
|
||||
//
|
||||
// impl_pubkey_getter!(RegisterAppNodeReq);
|
||||
// impl_pubkey_getter!(AppNodeFilters);
|
||||
impl_pubkey_getter!(NewAppReq, admin_pubkey);
|
||||
impl_pubkey_getter!(DelAppReq, admin_pubkey);
|
||||
impl_pubkey_getter!(ListAppContractsReq, admin_pubkey);
|
||||
|
||||
impl_pubkey_getter!(RegisterAppNodeReq);
|
||||
impl_pubkey_getter!(AppNodeFilters);
|
||||
|
||||
pub fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Result<T, Status> {
|
||||
log::trace!("Checking signature from request: {:?}", req);
|
||||
let time = match req.metadata().get("timestamp") {
|
||||
Some(t) => t.clone(),
|
||||
None => return Err(Status::unauthenticated("Timestamp not found in metadata.")),
|
||||
@ -120,6 +122,9 @@ pub fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) ->
|
||||
}
|
||||
|
||||
pub fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> {
|
||||
log::trace!(
|
||||
"Checking signature from parts: pubkey: {pubkey}, time: {time}, msg: {msg}, sig: {sig}"
|
||||
);
|
||||
let now = chrono::Utc::now();
|
||||
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
|
||||
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
|
||||
|
@ -1,8 +1,9 @@
|
||||
use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE};
|
||||
use crate::constants::{ACCOUNT, APP_NODE, ID_ALPHABET, NEW_APP_REQ, NEW_VM_REQ, VM_NODE};
|
||||
use crate::db::prelude as db;
|
||||
use detee_shared::app_proto::AppNodeListResp;
|
||||
use detee_shared::common_proto::MappedPort;
|
||||
use detee_shared::general_proto::{AccountBalance, ListOperatorsResp};
|
||||
use detee_shared::vm_proto::*;
|
||||
use detee_shared::{app_proto::*, vm_proto::*};
|
||||
use nanoid::nanoid;
|
||||
|
||||
use surrealdb::RecordId;
|
||||
@ -249,3 +250,140 @@ impl From<VmNodeResources> for db::VmNodeResources {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::ActiveAppWithNode> for AppContract {
|
||||
fn from(value: db::ActiveAppWithNode) -> Self {
|
||||
let public_package_mr_enclave =
|
||||
Some(hex::decode(value.mr_enclave.clone()).unwrap_or_default());
|
||||
|
||||
AppContract {
|
||||
uuid: value.id.key().to_string(),
|
||||
package_url: value.package_url,
|
||||
admin_pubkey: value.admin.key().to_string(),
|
||||
node_pubkey: value.app_node.id.key().to_string(),
|
||||
public_ipv4: value.host_ipv4,
|
||||
resource: Some(AppResource {
|
||||
memory_mb: value.memory_mb as u32,
|
||||
disk_mb: value.disk_size_gb as u32,
|
||||
vcpu: value.vcpus as u32,
|
||||
ports: value.mapped_ports.iter().map(|(_, g)| *g as u32).collect(),
|
||||
}),
|
||||
mapped_ports: value
|
||||
.mapped_ports
|
||||
.iter()
|
||||
.map(|(h, g)| MappedPort { host_port: *h as u32, guest_port: *g as u32 })
|
||||
.collect(),
|
||||
|
||||
created_at: value.created_at.to_rfc3339(),
|
||||
updated_at: value.created_at.to_rfc3339(),
|
||||
nano_per_minute: value.price_per_unit,
|
||||
locked_nano: value.locked_nano,
|
||||
collected_at: value.collected_at.to_rfc3339(),
|
||||
hratls_pubkey: value.mr_enclave,
|
||||
public_package_mr_enclave,
|
||||
app_name: value.app_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NewAppReq> for db::NewAppReq {
|
||||
fn from(val: NewAppReq) -> Self {
|
||||
let resource = val.resource.unwrap_or_default();
|
||||
|
||||
let mr_enclave = val
|
||||
.public_package_mr_enclave
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.fold(String::new(), |acc, x| acc + &format!("{:02x?}", x));
|
||||
|
||||
Self {
|
||||
id: RecordId::from((NEW_APP_REQ, nanoid!(40, &ID_ALPHABET))),
|
||||
admin: RecordId::from((ACCOUNT, val.admin_pubkey)),
|
||||
app_node: RecordId::from((APP_NODE, val.node_pubkey)),
|
||||
app_name: val.app_name,
|
||||
package_url: val.package_url,
|
||||
mr_enclave,
|
||||
hratls_pubkey: val.hratls_pubkey,
|
||||
ports: resource.ports,
|
||||
memory_mb: resource.memory_mb,
|
||||
vcpu: resource.vcpu,
|
||||
disk_mb: resource.disk_mb,
|
||||
locked_nano: val.locked_nano,
|
||||
price_per_unit: val.price_per_unit,
|
||||
error: String::new(),
|
||||
created_at: surrealdb::sql::Datetime::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::NewAppReq> for NewAppReq {
|
||||
fn from(value: db::NewAppReq) -> Self {
|
||||
let resource = AppResource {
|
||||
vcpu: value.vcpu,
|
||||
memory_mb: value.memory_mb,
|
||||
disk_mb: value.disk_mb,
|
||||
ports: value.ports,
|
||||
};
|
||||
let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default());
|
||||
|
||||
Self {
|
||||
package_url: value.package_url,
|
||||
node_pubkey: value.app_node.key().to_string(),
|
||||
resource: Some(resource),
|
||||
uuid: value.id.key().to_string(),
|
||||
admin_pubkey: value.admin.key().to_string(),
|
||||
price_per_unit: value.price_per_unit,
|
||||
locked_nano: value.locked_nano,
|
||||
hratls_pubkey: value.hratls_pubkey,
|
||||
public_package_mr_enclave: mr_enclave,
|
||||
app_name: value.app_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::DeletedApp> for DelAppReq {
|
||||
fn from(value: db::DeletedApp) -> Self {
|
||||
Self { uuid: value.id.key().to_string(), admin_pubkey: value.admin.key().to_string() }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::AppDaemonMsg> for BrainMessageApp {
|
||||
fn from(value: db::AppDaemonMsg) -> Self {
|
||||
match value {
|
||||
db::AppDaemonMsg::Create(new_app_req) => {
|
||||
BrainMessageApp { msg: Some(brain_message_app::Msg::NewAppReq(new_app_req.into())) }
|
||||
}
|
||||
db::AppDaemonMsg::Delete(del_app_req) => BrainMessageApp {
|
||||
msg: Some(brain_message_app::Msg::DeleteAppReq(del_app_req.into())),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<AppNodeResources> for db::AppNodeResources {
|
||||
fn from(value: AppNodeResources) -> Self {
|
||||
Self {
|
||||
avail_no_of_port: value.avail_no_of_port,
|
||||
avail_vcpus: value.avail_vcpus,
|
||||
avail_memory_mb: value.avail_memory_mb,
|
||||
avail_storage_mb: value.avail_storage_mb,
|
||||
max_ports_per_app: value.max_ports_per_app,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::ActiveApp> for NewAppRes {
|
||||
fn from(val: db::ActiveApp) -> Self {
|
||||
let mapped_ports = val
|
||||
.mapped_ports
|
||||
.iter()
|
||||
.map(|(h, g)| MappedPort { host_port: *h as u32, guest_port: *g as u32 })
|
||||
.collect();
|
||||
Self {
|
||||
uuid: val.id.key().to_string(),
|
||||
ip_address: val.host_ipv4,
|
||||
mapped_ports,
|
||||
error: String::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -343,10 +343,11 @@ impl BrainVmCli for VmCliServer {
|
||||
if let Some(specific_contract) =
|
||||
db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await?
|
||||
{
|
||||
if specific_contract.admin.key().to_string() == req.wallet {
|
||||
if specific_contract.admin.key().to_string() == req.wallet
|
||||
|| specific_contract.vm_node.operator.key().to_string() == req.wallet
|
||||
{
|
||||
contracts.push(specific_contract);
|
||||
}
|
||||
// TODO: allow operator to inspect contracts
|
||||
}
|
||||
} else if req.as_operator {
|
||||
contracts
|
||||
|
Loading…
Reference in New Issue
Block a user