Compare commits

...

2 Commits

Author SHA1 Message Date
8caa55e325
App deployment
rename rpc method app deploy to new_app
improved is_banned_by_node checking app nodes also
moved common ErrorFromTable type to db module
2025-05-14 20:55:13 +05:30
92f4e15412
App contract delete 2025-05-14 13:31:12 +05:30
7 changed files with 221 additions and 55 deletions

18
Cargo.lock generated

@ -1000,7 +1000,7 @@ dependencies = [
[[package]] [[package]]
name = "detee-shared" name = "detee-shared"
version = "0.1.0" version = "0.1.0"
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#637610196f708475230802fa67f1b3ee4d8d0679" source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#da0f3269a31e0ebfb7328e2115e212aabe4d984a"
dependencies = [ dependencies = [
"bincode 2.0.1", "bincode 2.0.1",
"prost", "prost",
@ -2291,9 +2291,9 @@ dependencies = [
[[package]] [[package]]
name = "multimap" name = "multimap"
version = "0.10.0" version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
[[package]] [[package]]
name = "nanoid" name = "nanoid"
@ -2756,7 +2756,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
dependencies = [ dependencies = [
"heck 0.5.0", "heck 0.5.0",
"itertools 0.13.0", "itertools 0.11.0",
"log", "log",
"multimap", "multimap",
"once_cell", "once_cell",
@ -2776,7 +2776,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"itertools 0.13.0", "itertools 0.11.0",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.100", "syn 2.0.100",
@ -3315,9 +3315,9 @@ dependencies = [
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "1.0.5" version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf" checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"errno", "errno",
@ -4016,9 +4016,9 @@ dependencies = [
[[package]] [[package]]
name = "tempfile" name = "tempfile"
version = "3.19.1" version = "3.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf" checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1"
dependencies = [ dependencies = [
"fastrand", "fastrand",
"getrandom 0.3.2", "getrandom 0.3.2",

@ -1,5 +1,7 @@
use std::time::Duration;
use super::Error; use super::Error;
use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, NEW_APP_REQ}; use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ};
use crate::db; use crate::db;
use crate::db::general::Report; use crate::db::general::Report;
use crate::old_brain; use crate::old_brain;
@ -7,7 +9,8 @@ use detee_shared::app_proto;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::Client; use surrealdb::engine::remote::ws::Client;
use surrealdb::sql::Datetime; use surrealdb::sql::Datetime;
use surrealdb::{RecordId, Surreal}; use surrealdb::{Notification, RecordId, Surreal};
use tokio_stream::StreamExt;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct AppNode { pub struct AppNode {
@ -92,6 +95,11 @@ impl NewAppReq {
Ok(record) Ok(record)
} }
pub async fn submit(self, db: &Surreal<Client>) -> Result<Vec<Self>, Error> {
let new_app_req: Vec<Self> = db.insert(NEW_APP_REQ).relation(self).await?;
Ok(new_app_req)
}
} }
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
@ -158,24 +166,47 @@ impl AppNodeWithReports {
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct ActiveApp { pub struct ActiveApp {
id: RecordId, pub id: RecordId,
#[serde(rename = "in")] #[serde(rename = "in")]
admin: RecordId, pub admin: RecordId,
#[serde(rename = "out")] #[serde(rename = "out")]
app_node: RecordId, pub app_node: RecordId,
app_name: String, pub app_name: String,
mapped_ports: Vec<(u64, u64)>, pub mapped_ports: Vec<(u64, u64)>,
host_ipv4: String, pub host_ipv4: String,
vcpus: u64, pub vcpus: u64,
memory_mb: u64, pub memory_mb: u64,
disk_size_gb: u64, pub disk_size_gb: u64,
created_at: Datetime, pub created_at: Datetime,
price_per_unit: u64, pub price_per_unit: u64,
locked_nano: u64, pub locked_nano: u64,
collected_at: Datetime, pub collected_at: Datetime,
mr_enclave: String, pub mr_enclave: String,
package_url: String, pub package_url: String,
hratls_pubkey: String, pub hratls_pubkey: String,
}
impl From<ActiveApp> for DeletedApp {
fn from(value: ActiveApp) -> Self {
Self {
id: value.id,
admin: value.admin,
app_node: value.app_node,
app_name: value.app_name,
mapped_ports: value.mapped_ports,
host_ipv4: value.host_ipv4,
vcpus: value.vcpus,
memory_mb: value.memory_mb,
disk_size_gb: value.disk_size_gb,
created_at: value.created_at,
price_per_unit: value.price_per_unit,
locked_nano: value.locked_nano,
collected_at: value.collected_at,
mr_enclave: value.mr_enclave,
package_url: value.package_url,
hratls_pubkey: value.hratls_pubkey,
}
}
} }
impl ActiveApp { impl ActiveApp {
@ -208,6 +239,58 @@ impl ActiveApp {
Ok(()) Ok(())
} }
pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<bool, Error> {
let deleted_app: Option<Self> = db.delete((ACTIVE_APP, id)).await?;
if let Some(deleted_app) = deleted_app {
let deleted_app: DeletedApp = deleted_app.into();
let _: Vec<DeletedApp> = db.insert(DELETED_APP).relation(deleted_app).await?;
Ok(true)
} else {
Ok(false)
}
}
pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<ActiveApp, Error> {
let mut query_response = db
.query(format!(
"live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};"
))
.query(format!("live select * from {ACTIVE_APP} where id = {ACTIVE_APP}:{app_id};"))
.await?;
let mut error_stream = query_response.stream::<Notification<db::ErrorFromTable>>(0)?;
let mut active_app_stream = query_response.stream::<Notification<ActiveApp>>(1)?;
tokio::time::timeout(Duration::from_secs(30), async {
loop {
tokio::select! {
Some(err_notif) = error_stream.next() =>{
match err_notif{
Ok(err_notif) =>{
if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){
return Err(Error::NewAppDaemonResp(err_notif.data.error))
}
}
Err(e) => return Err(e.into())
}
}
Some(active_app_notif) = active_app_stream.next() => {
match active_app_notif {
Ok(active_app_notif) =>{
if active_app_notif.action == surrealdb::Action::Create {
let _: Option<NewAppReq> = db.delete((NEW_APP_REQ, app_id)).await?;
return Ok(active_app_notif.data);
}
}
Err(e) => return Err(e.into())
}
}
}
}
})
.await?
}
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]

@ -57,16 +57,25 @@ impl Account {
user: &str, user: &str,
node: &str, node: &str,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let ban: Option<Self> = db let mut query_response = db
.query(format!( .query(format!(
"(select operator->ban[0] as ban "(select operator->ban[0] as ban
from vm_node:{node} from vm_node:{node}
where operator->ban->account contains account:{user} where operator->ban->account contains account:{user}
).ban;" ).ban;"
)) ))
.await? .query(format!(
.take(0)?; "(select operator->ban[0] as ban
Ok(ban.is_some()) from app_node:{node}
where operator->ban->account contains account:{user}
).ban;"
))
.await?;
let vm_node_ban: Option<Self> = query_response.take(0)?;
let app_node_ban: Option<Self> = query_response.take(1)?;
Ok(vm_node_ban.is_some() || app_node_ban.is_some())
} }
} }

@ -28,6 +28,8 @@ pub enum Error {
UnknownTable(String), UnknownTable(String),
#[error("Daemon channel got closed: {0}")] #[error("Daemon channel got closed: {0}")]
AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>), AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>),
#[error("AppDaemon Error {0}")]
NewAppDaemonResp(String),
} }
pub mod prelude { pub mod prelude {
@ -163,3 +165,8 @@ pub async fn live_appnode_msgs<
Ok(()) Ok(())
} }
#[derive(Deserialize)]
pub struct ErrorFromTable {
pub error: String,
}

@ -5,7 +5,7 @@ use super::Error;
use crate::constants::{ use crate::constants::{
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT, ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT,
}; };
use crate::db::{Account, Report}; use crate::db::{Account, ErrorFromTable, Report};
use crate::old_brain; use crate::old_brain;
use detee_shared::vm_proto; use detee_shared::vm_proto;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -214,17 +214,13 @@ impl WrappedMeasurement {
UPDATE_VM_REQ => UPDATE_VM_REQ, UPDATE_VM_REQ => UPDATE_VM_REQ,
_ => NEW_VM_REQ, _ => NEW_VM_REQ,
}; };
#[derive(Deserialize)]
struct ErrorMessage {
error: String,
}
let mut resp = db let mut resp = db
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
.query(format!( .query(format!(
"live select * from measurement_args where id = measurement_args:{vm_id};" "live select * from measurement_args where id = measurement_args:{vm_id};"
)) ))
.await?; .await?;
let mut error_stream = resp.stream::<Notification<ErrorMessage>>(0)?; let mut error_stream = resp.stream::<Notification<ErrorFromTable>>(0)?;
let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?; let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?;
let args: Option<vm_proto::MeasurementArgs> = let args: Option<vm_proto::MeasurementArgs> =

@ -1,12 +1,10 @@
use crate::constants::{ACCOUNT, APP_NODE}; use crate::constants::{ACCOUNT, APP_NODE};
use crate::db::app::ActiveApp;
use crate::db::prelude as db; use crate::db::prelude as db;
use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use crate::grpc::{check_sig_from_parts, check_sig_from_req};
use detee_shared::app_proto::brain_app_cli_server::BrainAppCli; use detee_shared::app_proto::brain_app_cli_server::BrainAppCli;
use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon; use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon;
use detee_shared::app_proto::{ use detee_shared::app_proto::*;
daemon_message_app, AppContract, AppNodeFilters, AppNodeListResp, BrainMessageApp, DaemonAuth,
DaemonMessageApp, DelAppReq, ListAppContractsReq, RegisterAppNodeReq,
};
use detee_shared::common_proto::Empty; use detee_shared::common_proto::Empty;
use log::info; use log::info;
use std::pin::Pin; use std::pin::Pin;
@ -37,8 +35,7 @@ impl BrainAppDaemon for AppDaemonServer {
async fn register_app_node( async fn register_app_node(
&self, &self,
req: tonic::Request<RegisterAppNodeReq>, req: tonic::Request<RegisterAppNodeReq>,
) -> Result<tonic::Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, tonic::Status> ) -> Result<tonic::Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> {
{
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("Starting app_node registration process for {:?}", req); info!("Starting app_node registration process for {:?}", req);
@ -77,7 +74,7 @@ impl BrainAppDaemon for AppDaemonServer {
async fn brain_messages( async fn brain_messages(
&self, &self,
req: tonic::Request<DaemonAuth>, req: tonic::Request<DaemonAuth>,
) -> Result<tonic::Response<<Self as BrainAppDaemon>::BrainMessagesStream>, tonic::Status> { ) -> Result<tonic::Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> {
let auth = req.into_inner(); let auth = req.into_inner();
let pubkey = auth.pubkey.clone(); let pubkey = auth.pubkey.clone();
check_sig_from_parts( check_sig_from_parts(
@ -114,7 +111,7 @@ impl BrainAppDaemon for AppDaemonServer {
async fn daemon_messages( async fn daemon_messages(
&self, &self,
req: tonic::Request<Streaming<DaemonMessageApp>>, req: tonic::Request<Streaming<DaemonMessageApp>>,
) -> Result<tonic::Response<Empty>, tonic::Status> { ) -> Result<tonic::Response<Empty>, Status> {
let mut req_stream = req.into_inner(); let mut req_stream = req.into_inner();
let pubkey: String; let pubkey: String;
if let Some(Ok(msg)) = req_stream.next().await { if let Some(Ok(msg)) = req_stream.next().await {
@ -183,30 +180,58 @@ impl BrainAppCli for AppCliServer {
type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>; type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>; type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>;
async fn deploy_app( async fn new_app(
&self, &self,
req: tonic::Request<detee_shared::app_proto::NewAppReq>, req: tonic::Request<detee_shared::app_proto::NewAppReq>,
) -> Result<tonic::Response<detee_shared::app_proto::NewAppRes>, tonic::Status> { ) -> Result<tonic::Response<detee_shared::app_proto::NewAppRes>, Status> {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("deploy_app process starting for {:?}", req); info!("deploy_app process starting for {:?}", req);
todo!() if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
return Err(Status::permission_denied("This operator banned you. What did you do?"));
}
let new_app_req: db::NewAppReq = req.into();
let id = new_app_req.id.to_string();
let (tx, rx) = tokio::sync::oneshot::channel();
let db = self.db.clone();
tokio::spawn(async move {
let _ = tx.send(db::ActiveApp::listen(&db, &id).await);
});
new_app_req.submit(&self.db).await?;
match rx.await {
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
"Network timeout. Please try again later or contact the DeTEE devs team.",
)),
Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)),
Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())),
Err(e) => {
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
} }
async fn delete_app( async fn delete_app(
&self, &self,
req: tonic::Request<DelAppReq>, req: tonic::Request<DelAppReq>,
) -> Result<tonic::Response<detee_shared::common_proto::Empty>, tonic::Status> { ) -> Result<tonic::Response<Empty>, Status> {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("delete_app process starting for {:?}", req); info!("delete_app process starting for {:?}", req);
match ActiveApp::delete(&self.db, &req.uuid).await? {
todo!() true => Ok(Response::new(Empty {})),
false => Err(Status::not_found(format!("Could not find App contract {}", &req.uuid))),
}
} }
async fn list_app_contracts( async fn list_app_contracts(
&self, &self,
req: tonic::Request<ListAppContractsReq>, req: tonic::Request<ListAppContractsReq>,
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppContractsStream>, tonic::Status> { ) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("list_app_contracts process starting for {:?}", req); info!("list_app_contracts process starting for {:?}", req);
@ -245,7 +270,7 @@ impl BrainAppCli for AppCliServer {
async fn list_app_nodes( async fn list_app_nodes(
&self, &self,
req: tonic::Request<AppNodeFilters>, req: tonic::Request<AppNodeFilters>,
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppNodesStream>, tonic::Status> { ) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("list_app_nodes process starting for {:?}", req); info!("list_app_nodes process starting for {:?}", req);
let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?; let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?;
@ -262,7 +287,7 @@ impl BrainAppCli for AppCliServer {
async fn get_one_app_node( async fn get_one_app_node(
&self, &self,
req: tonic::Request<AppNodeFilters>, req: tonic::Request<AppNodeFilters>,
) -> Result<tonic::Response<AppNodeListResp>, tonic::Status> { ) -> Result<tonic::Response<AppNodeListResp>, Status> {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("get_one_app_node process starting for {:?}", req); info!("get_one_app_node process starting for {:?}", req);
let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true) let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true)

@ -1,4 +1,4 @@
use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE}; use crate::constants::{ACCOUNT, APP_NODE, ID_ALPHABET, NEW_APP_REQ, NEW_VM_REQ, VM_NODE};
use crate::db::prelude as db; use crate::db::prelude as db;
use detee_shared::app_proto::AppNodeListResp; use detee_shared::app_proto::AppNodeListResp;
use detee_shared::common_proto::MappedPort; use detee_shared::common_proto::MappedPort;
@ -286,6 +286,36 @@ impl From<db::ActiveAppWithNode> for AppContract {
} }
} }
impl From<NewAppReq> for db::NewAppReq {
fn from(val: NewAppReq) -> Self {
let resource = val.resource.unwrap_or_default();
let mr_enclave = val
.public_package_mr_enclave
.unwrap_or_default()
.iter()
.fold(String::new(), |acc, x| acc + &format!("{:02x?}", x));
Self {
id: RecordId::from((NEW_APP_REQ, nanoid!(40, &ID_ALPHABET))),
admin: RecordId::from((ACCOUNT, val.admin_pubkey)),
app_node: RecordId::from((APP_NODE, val.node_pubkey)),
app_name: val.app_name,
package_url: val.package_url,
mr_enclave,
hratls_pubkey: val.hratls_pubkey,
ports: resource.ports,
memory_mb: resource.memory_mb,
vcpu: resource.vcpu,
disk_mb: resource.disk_mb,
locked_nano: val.locked_nano,
price_per_unit: val.price_per_unit,
error: String::new(),
created_at: surrealdb::sql::Datetime::default(),
}
}
}
impl From<db::NewAppReq> for NewAppReq { impl From<db::NewAppReq> for NewAppReq {
fn from(value: db::NewAppReq) -> Self { fn from(value: db::NewAppReq) -> Self {
let resource = AppResource { let resource = AppResource {
@ -341,3 +371,19 @@ impl From<AppNodeResources> for db::AppNodeResources {
} }
} }
} }
impl From<db::ActiveApp> for NewAppRes {
fn from(val: db::ActiveApp) -> Self {
let mapped_ports = val
.mapped_ports
.iter()
.map(|(h, g)| MappedPort { host_port: *h as u32, guest_port: *g as u32 })
.collect();
Self {
uuid: val.id.key().to_string(),
ip_address: val.host_ipv4,
mapped_ports,
error: String::new(),
}
}
}