features: app engine #1

Merged
ghe0 merged 11 commits from app_engine into main 2025-05-15 01:39:06 +00:00
7 changed files with 157 additions and 27 deletions
Showing only changes of commit 8caa55e325 - Show all commits

18
Cargo.lock generated

@ -1000,7 +1000,7 @@ dependencies = [
[[package]]
name = "detee-shared"
version = "0.1.0"
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#637610196f708475230802fa67f1b3ee4d8d0679"
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#da0f3269a31e0ebfb7328e2115e212aabe4d984a"
dependencies = [
"bincode 2.0.1",
"prost",
@ -2291,9 +2291,9 @@ dependencies = [
[[package]]
name = "multimap"
version = "0.10.0"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
[[package]]
name = "nanoid"
@ -2756,7 +2756,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
dependencies = [
"heck 0.5.0",
"itertools 0.13.0",
"itertools 0.11.0",
"log",
"multimap",
"once_cell",
@ -2776,7 +2776,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.13.0",
"itertools 0.11.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@ -3315,9 +3315,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "1.0.5"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf"
checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266"
dependencies = [
"bitflags",
"errno",
@ -4016,9 +4016,9 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.19.1"
version = "3.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf"
checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1"
dependencies = [
"fastrand",
"getrandom 0.3.2",

@ -1,3 +1,5 @@
use std::time::Duration;
use super::Error;
use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ};
use crate::db;
@ -7,7 +9,8 @@ use detee_shared::app_proto;
use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::Client;
use surrealdb::sql::Datetime;
use surrealdb::{RecordId, Surreal};
use surrealdb::{Notification, RecordId, Surreal};
use tokio_stream::StreamExt;
#[derive(Debug, Serialize, Deserialize)]
pub struct AppNode {
@ -92,6 +95,11 @@ impl NewAppReq {
Ok(record)
}
pub async fn submit(self, db: &Surreal<Client>) -> Result<Vec<Self>, Error> {
let new_app_req: Vec<Self> = db.insert(NEW_APP_REQ).relation(self).await?;
Ok(new_app_req)
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -242,6 +250,47 @@ impl ActiveApp {
Ok(false)
}
}
pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<ActiveApp, Error> {
let mut query_response = db
.query(format!(
"live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};"
))
.query(format!("live select * from {ACTIVE_APP} where id = {ACTIVE_APP}:{app_id};"))
.await?;
let mut error_stream = query_response.stream::<Notification<db::ErrorFromTable>>(0)?;
let mut active_app_stream = query_response.stream::<Notification<ActiveApp>>(1)?;
tokio::time::timeout(Duration::from_secs(30), async {
loop {
tokio::select! {
Some(err_notif) = error_stream.next() =>{
match err_notif{
Ok(err_notif) =>{
if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){
return Err(Error::NewAppDaemonResp(err_notif.data.error))
}
}
Err(e) => return Err(e.into())
}
}
Some(active_app_notif) = active_app_stream.next() => {
match active_app_notif {
Ok(active_app_notif) =>{
if active_app_notif.action == surrealdb::Action::Create {
let _: Option<NewAppReq> = db.delete((NEW_APP_REQ, app_id)).await?;
return Ok(active_app_notif.data);
}
}
Err(e) => return Err(e.into())
}
}
}
}
})
.await?
}
}
#[derive(Debug, Serialize, Deserialize)]

@ -57,16 +57,25 @@ impl Account {
user: &str,
node: &str,
) -> Result<bool, Error> {
let ban: Option<Self> = db
let mut query_response = db
.query(format!(
"(select operator->ban[0] as ban
from vm_node:{node}
where operator->ban->account contains account:{user}
).ban;"
))
.await?
.take(0)?;
Ok(ban.is_some())
.query(format!(
"(select operator->ban[0] as ban
from app_node:{node}
where operator->ban->account contains account:{user}
).ban;"
))
.await?;
let vm_node_ban: Option<Self> = query_response.take(0)?;
let app_node_ban: Option<Self> = query_response.take(1)?;
Ok(vm_node_ban.is_some() || app_node_ban.is_some())
}
}

@ -28,6 +28,8 @@ pub enum Error {
UnknownTable(String),
#[error("Daemon channel got closed: {0}")]
AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>),
#[error("AppDaemon Error {0}")]
NewAppDaemonResp(String),
}
pub mod prelude {
@ -163,3 +165,8 @@ pub async fn live_appnode_msgs<
Ok(())
}
#[derive(Deserialize)]
pub struct ErrorFromTable {
pub error: String,
}

@ -5,7 +5,7 @@ use super::Error;
use crate::constants::{
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT,
};
use crate::db::{Account, Report};
use crate::db::{Account, ErrorFromTable, Report};
use crate::old_brain;
use detee_shared::vm_proto;
use serde::{Deserialize, Serialize};
@ -214,17 +214,13 @@ impl WrappedMeasurement {
UPDATE_VM_REQ => UPDATE_VM_REQ,
_ => NEW_VM_REQ,
};
#[derive(Deserialize)]
struct ErrorMessage {
error: String,
}
let mut resp = db
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
.query(format!(
"live select * from measurement_args where id = measurement_args:{vm_id};"
))
.await?;
let mut error_stream = resp.stream::<Notification<ErrorMessage>>(0)?;
let mut error_stream = resp.stream::<Notification<ErrorFromTable>>(0)?;
let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?;
let args: Option<vm_proto::MeasurementArgs> =

@ -4,10 +4,7 @@ use crate::db::prelude as db;
use crate::grpc::{check_sig_from_parts, check_sig_from_req};
use detee_shared::app_proto::brain_app_cli_server::BrainAppCli;
use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon;
use detee_shared::app_proto::{
daemon_message_app, AppContract, AppNodeFilters, AppNodeListResp, BrainMessageApp, DaemonAuth,
DaemonMessageApp, DelAppReq, ListAppContractsReq, RegisterAppNodeReq,
};
use detee_shared::app_proto::*;
use detee_shared::common_proto::Empty;
use log::info;
use std::pin::Pin;
@ -183,14 +180,40 @@ impl BrainAppCli for AppCliServer {
type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>;
async fn deploy_app(
async fn new_app(
&self,
req: tonic::Request<detee_shared::app_proto::NewAppReq>,
) -> Result<tonic::Response<detee_shared::app_proto::NewAppRes>, Status> {
let req = check_sig_from_req(req)?;
info!("deploy_app process starting for {:?}", req);
todo!()
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
return Err(Status::permission_denied("This operator banned you. What did you do?"));
}
let new_app_req: db::NewAppReq = req.into();
let id = new_app_req.id.to_string();
let (tx, rx) = tokio::sync::oneshot::channel();
let db = self.db.clone();
tokio::spawn(async move {
let _ = tx.send(db::ActiveApp::listen(&db, &id).await);
});
new_app_req.submit(&self.db).await?;
match rx.await {
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
"Network timeout. Please try again later or contact the DeTEE devs team.",
)),
Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)),
Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())),
Err(e) => {
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn delete_app(

@ -1,4 +1,4 @@
use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE};
use crate::constants::{ACCOUNT, APP_NODE, ID_ALPHABET, NEW_APP_REQ, NEW_VM_REQ, VM_NODE};
use crate::db::prelude as db;
use detee_shared::app_proto::AppNodeListResp;
use detee_shared::common_proto::MappedPort;
@ -286,6 +286,36 @@ impl From<db::ActiveAppWithNode> for AppContract {
}
}
impl From<NewAppReq> for db::NewAppReq {
fn from(val: NewAppReq) -> Self {
let resource = val.resource.unwrap_or_default();
let mr_enclave = val
.public_package_mr_enclave
.unwrap_or_default()
.iter()
.fold(String::new(), |acc, x| acc + &format!("{:02x?}", x));
Self {
id: RecordId::from((NEW_APP_REQ, nanoid!(40, &ID_ALPHABET))),
admin: RecordId::from((ACCOUNT, val.admin_pubkey)),
app_node: RecordId::from((APP_NODE, val.node_pubkey)),
app_name: val.app_name,
package_url: val.package_url,
mr_enclave,
hratls_pubkey: val.hratls_pubkey,
ports: resource.ports,
memory_mb: resource.memory_mb,
vcpu: resource.vcpu,
disk_mb: resource.disk_mb,
locked_nano: val.locked_nano,
price_per_unit: val.price_per_unit,
error: String::new(),
created_at: surrealdb::sql::Datetime::default(),
}
}
}
impl From<db::NewAppReq> for NewAppReq {
fn from(value: db::NewAppReq) -> Self {
let resource = AppResource {
@ -341,3 +371,19 @@ impl From<AppNodeResources> for db::AppNodeResources {
}
}
}
impl From<db::ActiveApp> for NewAppRes {
fn from(val: db::ActiveApp) -> Self {
let mapped_ports = val
.mapped_ports
.iter()
.map(|(h, g)| MappedPort { host_port: *h as u32, guest_port: *g as u32 })
.collect();
Self {
uuid: val.id.key().to_string(),
ip_address: val.host_ipv4,
mapped_ports,
error: String::new(),
}
}
}