payments on app deployment and refund for vm and app deletion #5

Merged
ghe0 merged 3 commits from app_payments into main 2025-06-04 16:01:43 +00:00
26 changed files with 847 additions and 155 deletions

1
.gitignore vendored

@ -1,3 +1,4 @@
/target
secrets
tmp
.env

2
Cargo.lock generated

@ -1000,7 +1000,7 @@ dependencies = [
[[package]]
name = "detee-shared"
version = "0.1.0"
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d6ca058d2de78b5257517034bca2b2c7d5929db8"
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#005677153b3fcd3251b64111a736c806106fdc04"
dependencies = [
"bincode 2.0.1",
"prost",

@ -13,7 +13,7 @@ serde_yaml = "0.9.34"
surrealdb = "2.2.2"
tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] }
tonic = { version = "0.12", features = ["tls"] }
detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain" }
detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" }
ed25519-dalek = "2.1.1"
bs58 = "0.5.1"
tokio-stream = "0.1.17"

@ -17,6 +17,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env");
let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap();
env_logger::builder().filter_level(log::LevelFilter::Trace);
db::migration0(&db, &old_brain_data).await?;

@ -48,3 +48,6 @@ pub const ID_ALPHABET: [char; 62] = [
pub const TOKEN_DECIMAL: u64 = 1_000_000_000;
pub const MIN_ESCROW: u64 = 5000 * TOKEN_DECIMAL;
pub const APP_DAEMON_TIMEOUT: u64 = 20;
pub const VM_DAEMON_TIMEOUT: u64 = 10;

@ -1,11 +1,13 @@
use std::time::Duration;
use super::Error;
use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ};
use crate::constants::{
ACCOUNT, ACTIVE_APP, APP_DAEMON_TIMEOUT, APP_NODE, DELETED_APP, NEW_APP_REQ,
};
use crate::db;
use crate::db::general::Report;
use crate::old_brain;
use detee_shared::app_proto;
use detee_shared::app_proto::{self, NewAppRes};
use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::Client;
use surrealdb::sql::Datetime;
@ -68,8 +70,8 @@ pub struct NewAppReq {
pub hratls_pubkey: String,
pub ports: Vec<u32>,
pub memory_mb: u32,
pub vcpu: u32,
pub disk_mb: u32,
pub vcpus: u32,
pub disk_size_gb: u32,
pub locked_nano: u64,
pub price_per_unit: u64,
pub error: String,
@ -81,26 +83,117 @@ impl NewAppReq {
let new_app_req: Option<Self> = db.select((NEW_APP_REQ, id)).await?;
Ok(new_app_req)
}
pub async fn submit_error(
db: &Surreal<Client>,
id: &str,
error: String,
) -> Result<Option<Self>, Error> {
#[derive(Serialize)]
struct NewAppError {
error: String,
pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> {
let tx_query = String::from(
"
BEGIN TRANSACTION;
LET $new_app_req = $new_app_req_input;
LET $error = $error_input;
LET $record = (select * from $new_app_req)[0];
LET $admin = $record.in;
if $record == None {{
THROW 'app req not exist ' + <string>$new_app_req
}};
UPDATE $new_app_req SET error = $error;
UPDATE $admin SET tmp_locked -= $record.locked_nano;
UPDATE $admin SET balance += $record.locked_nano;
COMMIT TRANSACTION;",
);
log::trace!("submit_new_app_err query: {tx_query}");
let mut query_resp = db
.query(tx_query)
.bind(("new_app_req_input", RecordId::from((NEW_APP_REQ, id))))
.bind(("error_input", error))
.await?;
let query_err = query_resp.take_errors();
if !query_err.is_empty() {
log::trace!("errors in submit_new_app_err: {query_err:?}");
let tx_fail_err_str =
String::from("The query was not executed due to a failed transaction");
if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str {
log::error!("app req not exist: {}", query_err[&4]);
return Err(Error::ContractNotFound);
} else {
log::error!("Unknown error in submit_new_app_err: {query_err:?}");
return Err(Error::Unknown("submit_new_app_req".to_string()));
}
}
let record: Option<Self> =
db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?;
Ok(record)
Ok(())
}
pub async fn submit(self, db: &Surreal<Client>) -> Result<Vec<Self>, Error> {
// TODO: handle financial transaction
let new_app_req: Vec<Self> = db.insert(NEW_APP_REQ).relation(self).await?;
Ok(new_app_req)
pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> {
let locked_nano = self.locked_nano;
let tx_query = format!( "
BEGIN TRANSACTION;
LET $account = $account_input;
LET $new_app_req = $new_app_req_input;
LET $app_node = $app_node_input;
LET $package_url = $package_url_input;
LET $mr_enclave = $mr_enclave_input;
LET $hratls_pubkey = $hratls_pubkey_input;
LET $app_name = $app_name_input;
UPDATE $account SET balance -= {locked_nano};
IF $account.balance < 0 {{
THROW 'Insufficient funds.'
}};
UPDATE $account SET tmp_locked += {locked_nano};
RELATE
$account
->$new_app_req
->$app_node
CONTENT {{
created_at: time::now(), app_name: $app_name, package_url: $package_url,
mr_enclave: $mr_enclave, hratls_pubkey: $hratls_pubkey, ports: {:?}, memory_mb: {},
vcpus: {}, disk_size_gb: {}, locked_nano: {locked_nano}, price_per_unit: {}, error: '',
}};
COMMIT TRANSACTION;
",
self.ports, self.memory_mb, self.vcpus, self.disk_size_gb, self.price_per_unit);
log::trace!("submit_new_app_req query: {tx_query}");
let mut query_resp = db
.query(tx_query)
.bind(("account_input", self.admin))
.bind(("new_app_req_input", self.id))
.bind(("app_node_input", self.app_node))
.bind(("package_url_input", self.package_url))
.bind(("mr_enclave_input", self.mr_enclave))
.bind(("hratls_pubkey_input", self.hratls_pubkey))
.bind(("app_name_input", self.app_name))
.await?;
let query_err = query_resp.take_errors();
if !query_err.is_empty() {
log::trace!("errors in submit_new_app_req: {query_err:?}");
let tx_fail_err_str =
String::from("The query was not executed due to a failed transaction");
if query_err.contains_key(&8) && query_err[&8].to_string() != tx_fail_err_str {
log::error!("Transaction error: {}", query_err[&8]);
return Err(Error::InsufficientFunds);
} else {
log::error!("Unknown error in submit_new_app_req: {query_err:?}");
return Err(Error::Unknown("submit_new_app_req".to_string()));
}
}
Ok(())
}
}
@ -202,8 +295,6 @@ impl From<ActiveApp> for DeletedApp {
disk_size_gb: value.disk_size_gb,
created_at: value.created_at,
price_per_unit: value.price_per_unit,
locked_nano: value.locked_nano,
collected_at: value.collected_at,
mr_enclave: value.mr_enclave,
package_url: value.package_url,
hratls_pubkey: value.hratls_pubkey,
@ -221,22 +312,31 @@ impl ActiveApp {
(self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64)
}
pub async fn activate(db: &Surreal<Client>, id: &str) -> Result<(), Error> {
let new_app_req = match NewAppReq::get(db, id).await? {
pub async fn activate(
db: &Surreal<Client>,
new_app_res: app_proto::NewAppRes,
) -> Result<(), Error> {
let new_app_req = match NewAppReq::get(db, &new_app_res.uuid).await? {
Some(r) => r,
None => return Ok(()),
};
let mapped_ports = new_app_res
.mapped_ports
.into_iter()
.map(|data| (data.host_port, data.guest_port))
.collect::<Vec<(u32, u32)>>();
let active_app = Self {
id: RecordId::from((ACTIVE_APP, id)),
id: RecordId::from((ACTIVE_APP, &new_app_res.uuid)),
admin: new_app_req.admin,
app_node: new_app_req.app_node,
app_name: new_app_req.app_name,
mapped_ports: vec![],
mapped_ports,
host_ipv4: String::new(),
vcpus: new_app_req.vcpu,
vcpus: new_app_req.vcpus,
memory_mb: new_app_req.memory_mb,
disk_size_gb: new_app_req.disk_mb,
disk_size_gb: new_app_req.disk_size_gb,
created_at: new_app_req.created_at.clone(),
price_per_unit: new_app_req.price_per_unit,
locked_nano: new_app_req.locked_nano,
@ -246,22 +346,63 @@ impl ActiveApp {
hratls_pubkey: new_app_req.hratls_pubkey.clone(),
};
let admin_account = active_app.admin.key().to_string();
let locked_nano = active_app.locked_nano;
let _: Vec<ActiveApp> = db.insert(()).relation(active_app).await?;
db.delete::<Option<NewAppReq>>((NEW_APP_REQ, &new_app_res.uuid)).await?;
db.query(format!("UPDATE {ACCOUNT}:{admin_account} SET tmp_locked -= {locked_nano};"))
.await?;
Ok(())
}
pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<bool, Error> {
let deleted_app: Option<Self> = db.delete((ACTIVE_APP, id)).await?;
if let Some(deleted_app) = deleted_app {
let deleted_app: DeletedApp = deleted_app.into();
let _: Vec<DeletedApp> = db.insert(DELETED_APP).relation(deleted_app).await?;
Ok(true)
} else {
Ok(false)
pub async fn delete(db: &Surreal<Client>, admin: &str, id: &str) -> Result<(), Error> {
let mut app_del_resp = db
.query(
"
BEGIN TRANSACTION;
LET $active_app = $app_id_input;
LET $admin = $admin_input;
IF $active_app.in != $admin {
THROW 'Unauthorized'
};
return fn::delete_app($active_app);
COMMIT TRANSACTION;
",
)
.bind(("app_id_input", RecordId::from((ACTIVE_APP, id))))
.bind(("admin_input", RecordId::from((ACCOUNT, admin))))
.await?;
log::trace!("delete_app query response: {app_del_resp:?}");
let query_err = app_del_resp.take_errors();
if !query_err.is_empty() {
log::trace!("errors in delete_app: {query_err:?}");
let tx_fail_err_str =
String::from("The query was not executed due to a failed transaction");
if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str {
log::error!("Unauthorized: {}", query_err[&2]);
return Err(Error::AccessDenied);
}
}
pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<ActiveApp, Error> {
Ok(())
}
}
pub enum WrappedAppResp {
NewAppRes(NewAppRes),
Error(NewAppRes),
}
impl WrappedAppResp {
pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<WrappedAppResp, Error> {
let mut query_response = db
.query(format!(
"live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};"
@ -272,14 +413,40 @@ impl ActiveApp {
let mut error_stream = query_response.stream::<Notification<db::ErrorFromTable>>(0)?;
let mut active_app_stream = query_response.stream::<Notification<ActiveApp>>(1)?;
tokio::time::timeout(Duration::from_secs(30), async {
let mut error = db
.query(format!("select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};"))
.await?;
if let Some(error_on_newapp_req) = error.take::<Option<db::ErrorFromTable>>(0)? {
if !error_on_newapp_req.error.is_empty() {
let app_daemon_err = NewAppRes {
uuid: app_id.to_string(),
error: error_on_newapp_req.error,
..Default::default()
};
return Ok(Self::Error(app_daemon_err));
}
}
if let Some(active_app) = db.select::<Option<ActiveApp>>((ACTIVE_APP, app_id)).await? {
return Ok(Self::NewAppRes(active_app.into()));
}
log::trace!("listening for table: {NEW_APP_REQ}");
tokio::time::timeout(Duration::from_secs(APP_DAEMON_TIMEOUT), async {
loop {
tokio::select! {
Some(err_notif) = error_stream.next() =>{
match err_notif{
Ok(err_notif) =>{
if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){
return Err(Error::NewAppDaemonResp(err_notif.data.error))
let app_daemon_err = NewAppRes {
uuid: app_id.to_string(),
error: err_notif.data.error,
..Default::default()
};
return Ok(Self::Error(app_daemon_err));
}
}
Err(e) => return Err(e.into())
@ -291,7 +458,7 @@ impl ActiveApp {
Ok(active_app_notif) =>{
if active_app_notif.action == surrealdb::Action::Create {
let _: Option<NewAppReq> = db.delete((NEW_APP_REQ, app_id)).await?;
return Ok(active_app_notif.data);
return Ok(Self::NewAppRes(active_app_notif.data.into()));
}
}
Err(e) => return Err(e.into())
@ -511,9 +678,17 @@ pub struct DeletedApp {
pub disk_size_gb: u32,
pub created_at: Datetime,
pub price_per_unit: u64,
pub locked_nano: u64,
pub collected_at: Datetime,
pub mr_enclave: String,
pub package_url: String,
pub hratls_pubkey: String,
}
impl DeletedApp {
pub async fn list_by_node(db: &Surreal<Client>, node_pubkey: &str) -> Result<Vec<Self>, Error> {
let mut result = db
.query(format!("select * from {DELETED_APP} where out = {APP_NODE}:{node_pubkey};"))
.await?;
let contracts: Vec<Self> = result.take(0)?;
Ok(contracts)
}
}

@ -31,8 +31,6 @@ pub enum Error {
UnknownTable(String),
#[error("Daemon channel got closed: {0}")]
AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>),
#[error("AppDaemon Error {0}")]
NewAppDaemonResp(String),
#[error("Minimum escrow amount is {MIN_ESCROW}")]
MinimalEscrow,
#[error("Insufficient funds, deposit more tokens")]
@ -186,7 +184,7 @@ pub async fn live_appnode_msgs<
}
Err(e) => {
log::error!(
"live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}"
"live_appnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}"
);
return Err(Error::from(e));
}
@ -196,7 +194,7 @@ pub async fn live_appnode_msgs<
Ok(())
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug, Clone)]
pub struct ErrorFromTable {
pub error: String,
}

@ -3,7 +3,8 @@ use std::time::Duration;
use super::Error;
use crate::constants::{
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT,
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_DAEMON_TIMEOUT, VM_NODE,
VM_UPDATE_EVENT,
};
use crate::db::{Account, ErrorFromTable, Report};
use crate::old_brain;
@ -329,18 +330,22 @@ impl WrappedMeasurement {
UPDATE_VM_REQ => UPDATE_VM_REQ,
_ => NEW_VM_REQ,
};
let mut args_stream = db
let mut resp = db
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
.query(format!(
"live select * from measurement_args where id = measurement_args:{vm_id};"
))
.await?
.stream::<Notification<vm_proto::MeasurementArgs>>(0)?;
.await?;
let mut error_stream = resp.stream::<Notification<ErrorFromTable>>(0)?;
let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?;
let mut error_stream = db
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
.await?
.stream::<Notification<ErrorFromTable>>(0)?;
let mut error =
db.query(format!("select error from {table} where id = {NEW_VM_REQ}:{vm_id}")).await?;
if let Some(error_on_newvm_req) = error.take::<Option<ErrorFromTable>>(0)? {
if !error_on_newvm_req.error.is_empty() {
return Ok(Self::Error(vm_id.to_string(), error_on_newvm_req.error));
}
}
let args: Option<vm_proto::MeasurementArgs> =
db.delete(("measurement_args", vm_id)).await?;
@ -349,7 +354,7 @@ impl WrappedMeasurement {
}
log::trace!("listening for table: {table}");
tokio::time::timeout(Duration::from_secs(10), async {
tokio::time::timeout(Duration::from_secs(VM_DAEMON_TIMEOUT), async {
loop {
tokio::select! {
error_notification = error_stream.next() => {
@ -358,10 +363,7 @@ impl WrappedMeasurement {
Ok(err_notif) => {
if err_notif.action == surrealdb::Action::Update
&& !err_notif.data.error.is_empty() {
return Ok::<WrappedMeasurement, Error>(
Self::Error(vm_id.to_string(),
err_notif.data.error)
);
return Ok(Self::Error(vm_id.to_string(), err_notif.data.error));
};
},
Err(e) => return Err(e.into()),
@ -544,16 +546,43 @@ impl ActiveVm {
Ok(false)
}
pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<bool, Error> {
let deleted_vm: Option<Self> = db.delete((ACTIVE_VM, id)).await?;
if let Some(deleted_vm) = deleted_vm {
let deleted_vm: DeletedVm = deleted_vm.into();
let _: Vec<DeletedVm> = db.insert(DELETED_VM).relation(deleted_vm).await?;
Ok(true)
} else {
Ok(false)
pub async fn delete(db: &Surreal<Client>, admin: &str, id: &str) -> Result<(), Error> {
let mut vm_del_resp = db
.query(
"
BEGIN TRANSACTION;
LET $active_vm = $vm_id_input;
LET $admin = $admin_input;
IF $active_vm.in != $admin {
THROW 'Unauthorized'
};
return fn::delete_vm($active_vm);
COMMIT TRANSACTION;
",
)
.bind(("vm_id_input", RecordId::from((ACTIVE_VM, id))))
.bind(("admin_input", RecordId::from((ACCOUNT, admin))))
.await?;
log::trace!("delete_vm query response: {vm_del_resp:?}");
let query_err = vm_del_resp.take_errors();
if !query_err.is_empty() {
log::trace!("errors in delete_vm: {query_err:?}");
let tx_fail_err_str =
String::from("The query was not executed due to a failed transaction");
if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str {
log::error!("Unauthorized: {}", query_err[&2]);
return Err(Error::AccessDenied);
}
}
Ok(())
}
pub async fn extend_time(
db: &Surreal<Client>,

@ -15,7 +15,7 @@ use surrealdb::Surreal;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};
use tonic::{Response, Status, Streaming};
use tonic::{Request, Response, Status, Streaming};
pub struct AppDaemonServer {
pub db: Arc<Surreal<Client>>,
@ -29,13 +29,13 @@ impl AppDaemonServer {
#[tonic::async_trait]
impl BrainAppDaemon for AppDaemonServer {
type RegisterAppNodeStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
type RegisterAppNodeStream = Pin<Box<dyn Stream<Item = Result<DelAppReq, Status>> + Send>>;
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessageApp, Status>> + Send>>;
async fn register_app_node(
&self,
req: tonic::Request<RegisterAppNodeReq>,
) -> Result<tonic::Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> {
req: Request<RegisterAppNodeReq>,
) -> Result<Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> {
let req = check_sig_from_req(req)?;
info!("Starting app_node registration process for {:?}", req);
@ -59,11 +59,11 @@ impl BrainAppDaemon for AppDaemonServer {
app_node.register(&self.db).await?;
info!("Sending existing contracts to {}", req.node_pubkey);
let contracts = db::ActiveAppWithNode::list_by_node(&self.db, &req.node_pubkey).await?;
let deleted_apps = db::DeletedApp::list_by_node(&self.db, &req.node_pubkey).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
for deleted_app in deleted_apps {
let _ = tx.send(Ok(deleted_app.into())).await;
}
});
@ -73,8 +73,8 @@ impl BrainAppDaemon for AppDaemonServer {
async fn brain_messages(
&self,
req: tonic::Request<DaemonAuth>,
) -> Result<tonic::Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> {
req: Request<DaemonAuth>,
) -> Result<Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> {
let auth = req.into_inner();
let pubkey = auth.pubkey.clone();
check_sig_from_parts(
@ -110,8 +110,8 @@ impl BrainAppDaemon for AppDaemonServer {
async fn daemon_messages(
&self,
req: tonic::Request<Streaming<DaemonMessageApp>>,
) -> Result<tonic::Response<Empty>, Status> {
req: Request<Streaming<DaemonMessageApp>>,
) -> Result<Response<Empty>, Status> {
let mut req_stream = req.into_inner();
let pubkey: String;
if let Some(Ok(msg)) = req_stream.next().await {
@ -145,7 +145,7 @@ impl BrainAppDaemon for AppDaemonServer {
)
.await?;
} else {
db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?;
db::ActiveApp::activate(&self.db, new_app_resp).await?;
}
}
Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => {
@ -180,33 +180,40 @@ impl BrainAppCli for AppCliServer {
type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>;
async fn new_app(
&self,
req: tonic::Request<detee_shared::app_proto::NewAppReq>,
) -> Result<tonic::Response<detee_shared::app_proto::NewAppRes>, Status> {
async fn new_app(&self, req: Request<NewAppReq>) -> Result<Response<NewAppRes>, Status> {
let req = check_sig_from_req(req)?;
info!("deploy_app process starting for {:?}", req);
// TODO: make it atleast 1 hour
if req.locked_nano < 100 {
log::error!("locking lessthan 100 nano lps: {}", req.locked_nano);
return Err(Status::unknown("lock atleaset 100 nano lps"));
}
info!("new_app process starting for {:?}", req);
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
return Err(Status::permission_denied("This operator banned you. What did you do?"));
}
let new_app_req: db::NewAppReq = req.into();
let id = new_app_req.id.to_string();
let db_req: db::NewAppReq = req.into();
let id = db_req.id.key().to_string();
let (tx, rx) = tokio::sync::oneshot::channel();
let db = self.db.clone();
tokio::spawn(async move {
let _ = tx.send(db::ActiveApp::listen(&db, &id).await);
let _ = tx.send(db::WrappedAppResp::listen(&db, &id).await);
});
new_app_req.submit(&self.db).await?;
db_req.submit(&self.db).await?;
match rx.await {
Ok(Ok(db::WrappedAppResp::NewAppRes(new_app_resp))) => Ok(Response::new(new_app_resp)),
Ok(Ok(db::WrappedAppResp::Error(err))) => Ok(Response::new(err)),
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
"Network timeout. Please try again later or contact the DeTEE devs team.",
)),
Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)),
Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())),
Ok(Err(e)) => {
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
Err(e) => {
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
Err(Status::unknown(
@ -216,22 +223,25 @@ impl BrainAppCli for AppCliServer {
}
}
async fn delete_app(
&self,
req: tonic::Request<DelAppReq>,
) -> Result<tonic::Response<Empty>, Status> {
async fn delete_app(&self, req: Request<DelAppReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
info!("delete_app process starting for {:?}", req);
match ActiveApp::delete(&self.db, &req.uuid).await? {
true => Ok(Response::new(Empty {})),
false => Err(Status::not_found(format!("Could not find App contract {}", &req.uuid))),
match ActiveApp::delete(&self.db, &req.admin_pubkey, &req.uuid).await {
Ok(()) => Ok(Response::new(Empty {})),
Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")),
Err(e) => {
log::error!("Error deleting app contract {}: {e}", &req.uuid);
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn list_app_contracts(
&self,
req: tonic::Request<ListAppContractsReq>,
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> {
req: Request<ListAppContractsReq>,
) -> Result<Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> {
let req = check_sig_from_req(req)?;
info!("list_app_contracts process starting for {:?}", req);
@ -270,8 +280,8 @@ impl BrainAppCli for AppCliServer {
async fn list_app_nodes(
&self,
req: tonic::Request<AppNodeFilters>,
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> {
req: Request<AppNodeFilters>,
) -> Result<Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> {
let req = check_sig_from_req(req)?;
info!("list_app_nodes process starting for {:?}", req);
let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?;
@ -287,8 +297,8 @@ impl BrainAppCli for AppCliServer {
async fn get_one_app_node(
&self,
req: tonic::Request<AppNodeFilters>,
) -> Result<tonic::Response<AppNodeListResp>, Status> {
req: Request<AppNodeFilters>,
) -> Result<Response<AppNodeListResp>, Status> {
let req = check_sig_from_req(req)?;
info!("get_one_app_node process starting for {:?}", req);
let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true)

@ -274,8 +274,8 @@ impl From<db::ActiveAppWithNode> for AppContract {
public_ipv4: value.host_ipv4,
resource: Some(AppResource {
memory_mb: value.memory_mb,
disk_mb: value.disk_size_gb,
vcpu: value.vcpus,
disk_size_gb: value.disk_size_gb,
vcpus: value.vcpus,
ports: value.mapped_ports.iter().map(|(_, g)| *g).collect(),
}),
mapped_ports: value
@ -316,8 +316,8 @@ impl From<NewAppReq> for db::NewAppReq {
hratls_pubkey: val.hratls_pubkey,
ports: resource.ports,
memory_mb: resource.memory_mb,
vcpu: resource.vcpu,
disk_mb: resource.disk_mb,
vcpus: resource.vcpus,
disk_size_gb: resource.disk_size_gb,
locked_nano: val.locked_nano,
price_per_unit: val.price_per_unit,
error: String::new(),
@ -329,9 +329,9 @@ impl From<NewAppReq> for db::NewAppReq {
impl From<db::NewAppReq> for NewAppReq {
fn from(value: db::NewAppReq) -> Self {
let resource = AppResource {
vcpu: value.vcpu,
vcpus: value.vcpus,
memory_mb: value.memory_mb,
disk_mb: value.disk_mb,
disk_size_gb: value.disk_size_gb,
ports: value.ports,
};
let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default());

@ -222,6 +222,11 @@ impl BrainVmCli for VmCliServer {
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
let req = check_sig_from_req(req)?;
// TODO: make it atleast 1 hour
if req.locked_nano < 100 {
log::error!("locking lessthan 100 nano lps: {}", req.locked_nano);
return Err(Status::unknown("lock atleaset 100 nano lps"));
}
info!("New VM requested via CLI: {req:?}");
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
return Err(Status::permission_denied("This operator banned you. What did you do?"));
@ -337,9 +342,15 @@ impl BrainVmCli for VmCliServer {
async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
match db::ActiveVm::delete(&self.db, &req.uuid).await? {
true => Ok(Response::new(Empty {})),
false => Err(Status::not_found(format!("Could not find VM contract {}", &req.uuid))),
match db::ActiveVm::delete(&self.db, &req.admin_pubkey, &req.uuid).await {
Ok(()) => Ok(Response::new(Empty {})),
Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")),
Err(e) => {
log::error!("Error deleting VM contract {}: {e}", &req.uuid);
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}

@ -22,7 +22,7 @@ DEFINE FUNCTION OVERWRITE fn::delete_vm(
UPDATE $account SET balance += $vm.locked_nano;
};
INSERT RELATION INTO deleted_vm ( $deleted_vm );
DELETE $vm.id;
RETURN DELETE $vm.id RETURN BEFORE;
};
DEFINE FUNCTION OVERWRITE fn::app_price_per_minute(
@ -35,3 +35,20 @@ DEFINE FUNCTION OVERWRITE fn::app_price_per_minute(
($app.disk_size_gb / 10))
* $app.price_per_unit;
};
DEFINE FUNCTION OVERWRITE fn::delete_app(
$app_id: record
) {
LET $app = (select * from $app_id)[0];
LET $account = $app.in;
LET $deleted_app = $app.patch([{
'op': 'replace',
'path': 'id',
'value': type::record("deleted_app:" + record::id($app.id))
}]);
IF $app.locked_nano >= 0 {
UPDATE $account SET balance += $app.locked_nano;
};
INSERT RELATION INTO deleted_app ( $deleted_app );
RETURN DELETE $app.id RETURN BEFORE;
};

@ -99,8 +99,8 @@ DEFINE FIELD mr_enclave ON TABLE new_app_req TYPE string;
DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string;
DEFINE FIELD ports ON TABLE new_app_req TYPE array<int>;
DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int;
DEFINE FIELD vcpu ON TABLE new_app_req TYPE int;
DEFINE FIELD disk_mb ON TABLE new_app_req TYPE int;
DEFINE FIELD vcpus ON TABLE new_app_req TYPE int;
DEFINE FIELD disk_size_gb ON TABLE new_app_req TYPE int;
DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int;
DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int;
DEFINE FIELD error ON TABLE new_app_req TYPE string;

@ -27,3 +27,5 @@ FOR $contract IN (select * from active_vm fetch out) {
fn::delete_vm($contract.id);
};
};
-- TODO: implement for active_app

@ -0,0 +1,31 @@
use anyhow::Result;
use detee_shared::app_proto::{
brain_app_cli_client::BrainAppCliClient, AppResource, NewAppReq, NewAppRes,
};
use tonic::transport::Channel;
use crate::common::test_utils::Key;
pub async fn create_new_app(
key: &Key,
node_pubkey: &str,
brain_channel: &Channel,
) -> Result<NewAppRes> {
let new_app_req = NewAppReq {
admin_pubkey: key.pubkey.clone(),
node_pubkey: node_pubkey.to_string(),
price_per_unit: 1200,
resource: Some(AppResource { ports: vec![8080, 8081], ..Default::default() }),
locked_nano: 100,
..Default::default()
};
let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone());
let new_app_resp =
client_app_cli.new_app(key.sign_request(new_app_req.clone())?).await?.into_inner();
assert!(new_app_resp.error.is_empty());
assert!(new_app_resp.uuid.len() == 40);
Ok(new_app_resp)
}

@ -0,0 +1,144 @@
use anyhow::Result;
use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient;
use detee_shared::app_proto::{self, NewAppRes, RegisterAppNodeReq};
use detee_shared::common_proto::MappedPort;
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use super::test_utils::Key;
pub async fn mock_app_daemon(
brain_channel: &Channel,
daemon_error: Option<String>,
) -> Result<String> {
let mut daemon_client = BrainAppDaemonClient::new(brain_channel.clone());
let daemon_key = Key::new();
register_app_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?;
let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx));
let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(daemon_msg_sender(
daemon_client.clone(),
daemon_key.clone(),
daemon_msg_tx.clone(),
rx,
));
tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error));
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Ok(daemon_key.pubkey)
}
pub async fn register_app_node(
client: &mut BrainAppDaemonClient<Channel>,
key: &Key,
operator_wallet: &str,
) -> Result<Vec<app_proto::DelAppReq>> {
log::info!("Registering app_node: {}", key.pubkey);
let node_pubkey = key.pubkey.clone();
let req = RegisterAppNodeReq {
node_pubkey,
operator_wallet: operator_wallet.to_string(),
main_ip: String::from("185.243.218.213"),
city: String::from("Oslo"),
country: String::from("Norway"),
region: String::from("EU"),
price: 1200,
};
let mut grpc_stream = client.register_app_node(key.sign_request(req)?).await?.into_inner();
let mut deleted_app_reqs = Vec::new();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(del_app_req) => {
deleted_app_reqs.push(del_app_req);
}
Err(e) => {
panic!("Received error instead of deleted_app_reqs: {e:?}");
}
}
}
Ok(deleted_app_reqs)
}
pub async fn daemon_listener(
mut client: BrainAppDaemonClient<Channel>,
key: Key,
tx: mpsc::Sender<app_proto::BrainMessageApp>,
) -> Result<()> {
log::info!("listening app_daemon");
let mut grpc_stream =
client.brain_messages(key.sign_stream_auth_app(vec![])?).await?.into_inner();
while let Some(Ok(stream_update)) = grpc_stream.next().await {
log::info!("app deamon got notified: {:?}", &stream_update);
let _ = tx.send(stream_update).await;
}
Ok(())
}
pub async fn daemon_msg_sender(
mut client: BrainAppDaemonClient<Channel>,
key: Key,
tx: mpsc::Sender<app_proto::DaemonMessageApp>,
rx: mpsc::Receiver<app_proto::DaemonMessageApp>,
) -> Result<()> {
log::info!("sender app_daemon");
let rx_stream = ReceiverStream::new(rx);
tx.send(app_proto::DaemonMessageApp {
msg: Some(app_proto::daemon_message_app::Msg::Auth(key.sign_stream_auth_app(vec![])?)),
})
.await?;
client.daemon_messages(rx_stream).await?;
Ok(())
}
pub async fn daemon_engine(
tx: mpsc::Sender<app_proto::DaemonMessageApp>,
mut rx: mpsc::Receiver<app_proto::BrainMessageApp>,
new_app_err: Option<String>,
) -> Result<()> {
log::info!("daemon engine app_daemon");
while let Some(brain_msg) = rx.recv().await {
match brain_msg.msg {
Some(app_proto::brain_message_app::Msg::NewAppReq(new_app_req)) => {
let exposed_ports =
[vec![34500], new_app_req.resource.unwrap_or_default().ports].concat();
let mapped_ports = exposed_ports
.into_iter()
.map(|port| MappedPort { host_port: port, guest_port: port })
.collect::<Vec<MappedPort>>();
let res_data = NewAppRes {
uuid: new_app_req.uuid,
mapped_ports,
ip_address: "127.0.0.1".to_string(),
error: new_app_err.clone().unwrap_or_default(),
};
let res = app_proto::DaemonMessageApp {
msg: Some(app_proto::daemon_message_app::Msg::NewAppRes(res_data)),
};
tx.send(res).await?;
}
Some(app_proto::brain_message_app::Msg::DeleteAppReq(del_app_req)) => {
println!("MOCK_APP_DAEMON:: delete app request for {}", del_app_req.uuid);
}
None => todo!(),
}
}
Ok(())
}

@ -6,3 +6,9 @@ pub mod test_utils;
pub mod vm_cli_utils;
#[allow(dead_code)]
pub mod vm_daemon_utils;
#[allow(dead_code)]
pub mod app_daemon_utils;
#[allow(dead_code)]
pub mod app_cli_utils;

@ -1,4 +1,6 @@
use anyhow::Result;
use detee_shared::app_proto::brain_app_cli_server::BrainAppCliServer;
use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemonServer;
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
@ -7,6 +9,7 @@ use hyper_util::rt::TokioIo;
use std::net::SocketAddr;
use std::sync::Arc;
use surreal_brain::constants::DB_SCHEMA_FILES;
use surreal_brain::grpc::app::{AppCliServer, AppDaemonServer};
use surreal_brain::grpc::general::GeneralCliServer;
use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer};
use surrealdb::engine::remote::ws::Client;
@ -96,6 +99,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream {
.add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone())))
.add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())))
.add_service(BrainAppCliServer::new(AppCliServer::new(db_arc.clone())))
.add_service(BrainAppDaemonServer::new(AppDaemonServer::new(db_arc.clone())))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await?;

@ -1,9 +1,14 @@
use anyhow::Result;
use detee_shared::app_proto as sgx_proto;
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
use detee_shared::general_proto::AirdropReq;
use detee_shared::vm_proto as snp_proto;
use ed25519_dalek::{Signer, SigningKey};
use itertools::Itertools;
use std::sync::OnceLock;
use surreal_brain::constants::TOKEN_DECIMAL;
use tonic::metadata::AsciiMetadataValue;
use tonic::transport::Channel;
use tonic::Request;
pub static ADMIN_KEYS: OnceLock<Vec<Key>> = OnceLock::new();
@ -63,11 +68,33 @@ impl Key {
Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string())
}
pub fn sign_stream_auth(&self, contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> {
pub fn sign_stream_auth_vm(
&self,
contracts: Vec<String>,
) -> Result<snp_proto::DaemonStreamAuth> {
let pubkey = self.pubkey.clone();
let timestamp = chrono::Utc::now().to_rfc3339();
let signature =
self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?;
Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature })
}
pub fn sign_stream_auth_app(&self, contracts: Vec<String>) -> Result<sgx_proto::DaemonAuth> {
let pubkey = self.pubkey.clone();
let timestamp = chrono::Utc::now().to_rfc3339();
let signature =
self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?;
Ok(sgx_proto::DaemonAuth { timestamp, pubkey, contracts, signature })
}
}
pub async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> {
let mut client = BrainGeneralCliClient::new(brain_channel.clone());
let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount * TOKEN_DECIMAL };
let admin_key = admin_keys()[0].clone();
client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?;
Ok(())
}

@ -1,29 +1,18 @@
use super::test_utils::{admin_keys, Key};
use super::test_utils::Key;
use anyhow::{anyhow, Result};
use detee_shared::app_proto;
use detee_shared::common_proto::Empty;
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
use detee_shared::general_proto::{Account, AirdropReq, RegOperatorReq, ReportNodeReq};
use detee_shared::general_proto::{Account, RegOperatorReq, ReportNodeReq};
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use futures::StreamExt;
use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL};
use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ};
use surreal_brain::db::prelude as db;
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use tonic::transport::Channel;
pub async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> {
let mut client = BrainGeneralCliClient::new(brain_channel.clone());
let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount * TOKEN_DECIMAL };
let admin_key = admin_keys()[0].clone();
client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?;
Ok(())
}
pub async fn create_new_vm(
db: &Surreal<Client>,
key: &Key,
@ -35,7 +24,7 @@ pub async fn create_new_vm(
node_pubkey: node_pubkey.to_string(),
price_per_unit: 1200,
extra_ports: vec![8080, 8081],
locked_nano: 0,
locked_nano: 100,
..Default::default()
};

@ -59,8 +59,8 @@ pub async fn register_vm_node(
let mut deleted_vm_reqs = Vec::new();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(del_vm_rq) => {
deleted_vm_reqs.push(del_vm_rq);
Ok(del_vm_req) => {
deleted_vm_reqs.push(del_vm_req);
}
Err(e) => {
panic!("Received error instead of deleted_vm_reqs: {e:?}");
@ -76,7 +76,8 @@ pub async fn daemon_listener(
tx: mpsc::Sender<vm_proto::BrainVmMessage>,
) -> Result<()> {
log::info!("listening vm_daemon");
let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner();
let mut grpc_stream =
client.brain_messages(key.sign_stream_auth_vm(vec![])?).await?.into_inner();
while let Some(Ok(stream_update)) = grpc_stream.next().await {
log::info!("vm deamon got notified: {:?}", &stream_update);
@ -95,7 +96,7 @@ pub async fn daemon_msg_sender(
log::info!("sender vm_daemon");
let rx_stream = ReceiverStream::new(rx);
tx.send(vm_proto::VmDaemonMessage {
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)),
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth_vm(vec![])?)),
})
.await?;
client.daemon_messages(rx_stream).await?;
@ -135,8 +136,8 @@ pub async fn daemon_engine(
Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
todo!()
}
Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => {
todo!()
Some(vm_proto::brain_vm_message::Msg::DeleteVm(del_vm_req)) => {
println!("MOCK_VM_DAEMON:: delete vm request for {}", del_vm_req.uuid);
}
None => todo!(),
}

182
tests/grpc_app_cli_test.rs Normal file

@ -0,0 +1,182 @@
use common::app_daemon_utils::mock_app_daemon;
use common::prepare_test_env::{prepare_test_db, run_service_for_stream};
use common::test_utils::{airdrop, Key};
use detee_shared::app_proto::brain_app_cli_client::BrainAppCliClient;
use detee_shared::app_proto::{self, DelAppReq};
use std::vec;
use surreal_brain::constants::{ACCOUNT, ACTIVE_APP, DELETED_APP, NEW_APP_REQ, TOKEN_DECIMAL};
use surreal_brain::db::prelude as db;
use crate::common::app_cli_utils::create_new_app;
mod common;
#[tokio::test]
async fn test_app_creation() {
/*
env_logger::builder()
.filter_level(log::LevelFilter::Trace)
.filter_module("tungstenite", log::LevelFilter::Debug)
.filter_module("tokio_tungstenite", log::LevelFilter::Debug)
.init();
*/
let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap();
let key = Key::new();
let mut new_app_req = app_proto::NewAppReq {
admin_pubkey: key.pubkey.clone(),
node_pubkey: daemon_key.clone(),
price_per_unit: 1200,
resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }),
locked_nano: 100,
..Default::default()
};
let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone());
let new_app_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await;
assert!(new_app_resp.is_err());
let new_app_err = new_app_resp.err().unwrap();
assert!(new_app_err.to_string().contains("Insufficient funds"));
let airdrop_amount = 10;
airdrop(&brain_channel, &key.pubkey, airdrop_amount).await.unwrap();
let new_app_resp = client_app_cli
.new_app(key.sign_request(new_app_req.clone()).unwrap())
.await
.unwrap()
.into_inner();
let active_app =
db.select::<Option<db::ActiveApp>>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap();
assert!(active_app.is_some());
let daemon_key_02 =
mock_app_daemon(&brain_channel, Some("something went wrong 01".to_string())).await.unwrap();
new_app_req.node_pubkey = daemon_key_02.clone();
let new_app_resp = client_app_cli
.new_app(key.sign_request(new_app_req.clone()).unwrap())
.await
.unwrap()
.into_inner();
assert!(!new_app_resp.error.is_empty());
let app_req_db =
db.select::<Option<db::NewAppReq>>((NEW_APP_REQ, new_app_resp.uuid)).await.unwrap();
assert!(!app_req_db.unwrap().error.is_empty());
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100));
assert_eq!(acc_db.tmp_locked, 0);
let locking_nano = 1288;
new_app_req.node_pubkey = daemon_key;
new_app_req.locked_nano = locking_nano;
let new_app_resp =
client_app_cli.new_app(key.sign_request(new_app_req).unwrap()).await.unwrap().into_inner();
assert!(new_app_resp.error.is_empty());
let active_app =
db.select::<Option<db::ActiveApp>>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap();
assert!(active_app.is_some());
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100));
assert_eq!(acc_db.tmp_locked, 0);
}
#[tokio::test]
async fn test_timeout_app_creation() {
let _ = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = Key::new().pubkey.clone();
let key = Key::new();
let new_app_req = app_proto::NewAppReq {
admin_pubkey: key.pubkey.clone(),
node_pubkey: daemon_key.clone(),
price_per_unit: 1200,
resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }),
locked_nano: 100,
..Default::default()
};
airdrop(&brain_channel, &key.pubkey, 10).await.unwrap();
let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone());
let timeout_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await;
assert!(timeout_resp.is_err());
let timeout_err = timeout_resp.err().unwrap();
assert_eq!(
timeout_err.message(),
"Network timeout. Please try again later or contact the DeTEE devs team."
);
}
#[tokio::test]
async fn test_app_deletion() {
let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap();
let key = Key::new();
airdrop(&brain_channel, &key.pubkey, 10).await.unwrap();
let new_app_res = create_new_app(&key, &daemon_key, &brain_channel).await.unwrap();
let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone());
let del_app_req = DelAppReq { admin_pubkey: key.pubkey.clone(), uuid: new_app_res.uuid };
let _ = client_app_cli.delete_app(key.sign_request(del_app_req).unwrap()).await.unwrap();
let key_02 = Key::new();
// delete random app
let mut del_app_req = DelAppReq {
admin_pubkey: key_02.pubkey.clone(),
uuid: "9ae3VH8nJg2i8pqTQ6mJtvYuS2kd9n1XLLco8GUPfT95".to_string(),
};
let del_err = client_app_cli
.delete_app(key_02.sign_request(del_app_req.clone()).unwrap())
.await
.err()
.unwrap();
assert_eq!(del_err.message(), "Unauthorized");
let new_app_res_02 = create_new_app(&key, &daemon_key, &brain_channel).await.unwrap();
del_app_req.uuid = new_app_res_02.uuid;
let del_err = client_app_cli
.delete_app(key_02.sign_request(del_app_req.clone()).unwrap())
.await
.err()
.unwrap();
assert_eq!(del_err.message(), "Unauthorized");
// test refund
let key_03 = Key::new();
airdrop(&brain_channel, &key_03.pubkey, 10).await.unwrap();
let new_app_res_03 = create_new_app(&key_03, &daemon_key, &brain_channel).await.unwrap();
let del_app_req =
DelAppReq { admin_pubkey: key_03.pubkey.clone(), uuid: new_app_res_03.uuid.clone() };
let _ = client_app_cli.delete_app(key_03.sign_request(del_app_req).unwrap()).await.unwrap();
let acc: db::Account = db.select((ACCOUNT, key_03.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc.balance, 10 * TOKEN_DECIMAL);
let deleted_app =
db.select::<Option<db::DeletedApp>>((DELETED_APP, new_app_res_03.uuid)).await.unwrap();
assert!(deleted_app.is_some());
}
// TODO: test register app node, delete app contract while node offline, kick, etc..

@ -1,10 +1,10 @@
use common::prepare_test_env::{
prepare_test_db, run_service_for_stream, run_service_in_background,
};
use common::test_utils::{admin_keys, Key};
use common::test_utils::{admin_keys, airdrop, Key};
use common::vm_cli_utils::{
airdrop, create_new_vm, list_accounts, list_all_app_contracts, list_all_vm_contracts,
register_operator, report_node,
create_new_vm, list_accounts, list_all_app_contracts, list_all_vm_contracts, register_operator,
report_node,
};
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
use detee_shared::common_proto::{Empty, Pubkey};

@ -1,14 +1,14 @@
use common::prepare_test_env::{prepare_test_db, run_service_for_stream};
use common::test_utils::Key;
use common::vm_cli_utils::{airdrop, create_new_vm, user_list_vm_contracts};
use common::test_utils::{airdrop, Key};
use common::vm_cli_utils::{create_new_vm, user_list_vm_contracts};
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
use detee_shared::vm_proto::{self, DeleteVmReq};
use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq};
use futures::StreamExt;
use std::vec;
use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL};
use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, TOKEN_DECIMAL};
use surreal_brain::db::prelude as db;
mod common;
@ -73,7 +73,7 @@ async fn test_vm_creation() {
}
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL);
assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100));
assert_eq!(acc_db.tmp_locked, 0);
new_vm_req.node_pubkey = daemon_key;
@ -87,7 +87,7 @@ async fn test_vm_creation() {
assert!(active_vm.is_some());
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - locking_nano);
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100));
assert_eq!(acc_db.tmp_locked, 0);
}
@ -109,7 +109,7 @@ async fn test_timeout_vm_creation() {
node_pubkey: daemon_key.pubkey,
price_per_unit: 1200,
extra_ports: vec![8080, 8081],
locked_nano: 0,
locked_nano: 100,
..Default::default()
};
@ -125,6 +125,65 @@ async fn test_timeout_vm_creation() {
)
}
#[tokio::test]
async fn test_vm_deletion() {
let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_vm_daemon(&brain_channel, None).await.unwrap();
let key = Key::new();
airdrop(&brain_channel, &key.pubkey, 10).await.unwrap();
let new_vm_uuid = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap();
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
let del_app_req = DeleteVmReq { admin_pubkey: key.pubkey.clone(), uuid: new_vm_uuid };
let _ = client_vm_cli.delete_vm(key.sign_request(del_app_req).unwrap()).await.unwrap();
let key_02 = Key::new();
// delete random vm
let mut del_vm_req = DeleteVmReq {
admin_pubkey: key_02.pubkey.clone(),
uuid: "9ae3VH8nJg2i8pqTQ6mJtvYuS2kd9n1XLLco8GUPfT95".to_string(),
};
let del_err = client_vm_cli
.delete_vm(key_02.sign_request(del_vm_req.clone()).unwrap())
.await
.err()
.unwrap();
assert_eq!(del_err.message(), "Unauthorized");
let new_vm_uuid_02 = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap();
del_vm_req.uuid = new_vm_uuid_02;
let del_err = client_vm_cli
.delete_vm(key_02.sign_request(del_vm_req.clone()).unwrap())
.await
.err()
.unwrap();
assert_eq!(del_err.message(), "Unauthorized");
// test refund
let key_03 = Key::new();
airdrop(&brain_channel, &key_03.pubkey, 10).await.unwrap();
let new_vm_uuid_03 = create_new_vm(&db, &key_03, &daemon_key, &brain_channel).await.unwrap();
let del_vm_req =
DeleteVmReq { admin_pubkey: key_03.pubkey.clone(), uuid: new_vm_uuid_03.clone() };
let _ = client_vm_cli.delete_vm(key_03.sign_request(del_vm_req).unwrap()).await.unwrap();
let acc: db::Account = db.select((ACCOUNT, key_03.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc.balance, 10 * TOKEN_DECIMAL);
let deleted_vm =
db.select::<Option<db::DeletedVm>>((DELETED_VM, new_vm_uuid_03)).await.unwrap();
assert!(deleted_vm.is_some());
}
#[tokio::test]
// TODO: create vm for this user before testing this
async fn test_list_vm_contracts() {
@ -240,3 +299,5 @@ async fn test_extend_vm() {
let acc: db::Account = db_conn.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc.balance, expected_bal_02);
}
// TODO: test register vm node, delete vm contract while node offline, kick, etc..

@ -1,8 +1,7 @@
use common::prepare_test_env::{
prepare_test_db, run_service_for_stream, run_service_in_background,
};
use common::test_utils::Key;
use common::vm_cli_utils::airdrop;
use common::test_utils::{airdrop, Key};
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
@ -39,7 +38,7 @@ async fn test_brain_message() {
node_pubkey: daemon_key,
price_per_unit: 1200,
extra_ports: vec![8080, 8081],
locked_nano: 0,
locked_nano: 100,
..Default::default()
};
airdrop(&brain_channel, &cli_key.pubkey, 10).await.unwrap();