Compare commits

...

3 Commits

Author SHA1 Message Date
479b9b6fa4 Implement refund on delete app and vm
fix authentication and refund on delete vm
refactor app daemon register to get delete app req
db function for delete app
sample environment variable
extensive tests for delete app and vm
refactor test utilities
2025-06-04 16:01:42 +00:00
e59a3c8fd8 Implement new app contract balance locking
minimum locking balance on deployment
locking balance on app deployment
refunding locked nano while error on daemon
returning appropreate error on app deployment
fixed some typos on logging
new timeout constants for daemon respose
minor change in schema and proto
extensive tests on app deployments
fixed some vm tests
2025-06-04 16:01:42 +00:00
c8363b5e07 fix new_vm error
reverted stream query
checking err on db before listening error stream
refactor app grpc code
2025-06-04 16:01:42 +00:00
26 changed files with 847 additions and 155 deletions

1
.gitignore vendored

@ -1,3 +1,4 @@
/target
secrets
tmp
.env

2
Cargo.lock generated

@ -1000,7 +1000,7 @@ dependencies = [
[[package]]
name = "detee-shared"
version = "0.1.0"
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d6ca058d2de78b5257517034bca2b2c7d5929db8"
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#005677153b3fcd3251b64111a736c806106fdc04"
dependencies = [
"bincode 2.0.1",
"prost",

@ -13,7 +13,7 @@ serde_yaml = "0.9.34"
surrealdb = "2.2.2"
tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] }
tonic = { version = "0.12", features = ["tls"] }
detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain" }
detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" }
ed25519-dalek = "2.1.1"
bs58 = "0.5.1"
tokio-stream = "0.1.17"

@ -17,6 +17,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env");
let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap();
env_logger::builder().filter_level(log::LevelFilter::Trace);
db::migration0(&db, &old_brain_data).await?;

@ -48,3 +48,6 @@ pub const ID_ALPHABET: [char; 62] = [
pub const TOKEN_DECIMAL: u64 = 1_000_000_000;
pub const MIN_ESCROW: u64 = 5000 * TOKEN_DECIMAL;
pub const APP_DAEMON_TIMEOUT: u64 = 20;
pub const VM_DAEMON_TIMEOUT: u64 = 10;

@ -1,11 +1,13 @@
use std::time::Duration;
use super::Error;
use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ};
use crate::constants::{
ACCOUNT, ACTIVE_APP, APP_DAEMON_TIMEOUT, APP_NODE, DELETED_APP, NEW_APP_REQ,
};
use crate::db;
use crate::db::general::Report;
use crate::old_brain;
use detee_shared::app_proto;
use detee_shared::app_proto::{self, NewAppRes};
use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::Client;
use surrealdb::sql::Datetime;
@ -68,8 +70,8 @@ pub struct NewAppReq {
pub hratls_pubkey: String,
pub ports: Vec<u32>,
pub memory_mb: u32,
pub vcpu: u32,
pub disk_mb: u32,
pub vcpus: u32,
pub disk_size_gb: u32,
pub locked_nano: u64,
pub price_per_unit: u64,
pub error: String,
@ -81,26 +83,117 @@ impl NewAppReq {
let new_app_req: Option<Self> = db.select((NEW_APP_REQ, id)).await?;
Ok(new_app_req)
}
pub async fn submit_error(
db: &Surreal<Client>,
id: &str,
error: String,
) -> Result<Option<Self>, Error> {
#[derive(Serialize)]
struct NewAppError {
error: String,
pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> {
let tx_query = String::from(
"
BEGIN TRANSACTION;
LET $new_app_req = $new_app_req_input;
LET $error = $error_input;
LET $record = (select * from $new_app_req)[0];
LET $admin = $record.in;
if $record == None {{
THROW 'app req not exist ' + <string>$new_app_req
}};
UPDATE $new_app_req SET error = $error;
UPDATE $admin SET tmp_locked -= $record.locked_nano;
UPDATE $admin SET balance += $record.locked_nano;
COMMIT TRANSACTION;",
);
log::trace!("submit_new_app_err query: {tx_query}");
let mut query_resp = db
.query(tx_query)
.bind(("new_app_req_input", RecordId::from((NEW_APP_REQ, id))))
.bind(("error_input", error))
.await?;
let query_err = query_resp.take_errors();
if !query_err.is_empty() {
log::trace!("errors in submit_new_app_err: {query_err:?}");
let tx_fail_err_str =
String::from("The query was not executed due to a failed transaction");
if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str {
log::error!("app req not exist: {}", query_err[&4]);
return Err(Error::ContractNotFound);
} else {
log::error!("Unknown error in submit_new_app_err: {query_err:?}");
return Err(Error::Unknown("submit_new_app_req".to_string()));
}
}
let record: Option<Self> =
db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?;
Ok(record)
Ok(())
}
pub async fn submit(self, db: &Surreal<Client>) -> Result<Vec<Self>, Error> {
// TODO: handle financial transaction
let new_app_req: Vec<Self> = db.insert(NEW_APP_REQ).relation(self).await?;
Ok(new_app_req)
pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> {
let locked_nano = self.locked_nano;
let tx_query = format!( "
BEGIN TRANSACTION;
LET $account = $account_input;
LET $new_app_req = $new_app_req_input;
LET $app_node = $app_node_input;
LET $package_url = $package_url_input;
LET $mr_enclave = $mr_enclave_input;
LET $hratls_pubkey = $hratls_pubkey_input;
LET $app_name = $app_name_input;
UPDATE $account SET balance -= {locked_nano};
IF $account.balance < 0 {{
THROW 'Insufficient funds.'
}};
UPDATE $account SET tmp_locked += {locked_nano};
RELATE
$account
->$new_app_req
->$app_node
CONTENT {{
created_at: time::now(), app_name: $app_name, package_url: $package_url,
mr_enclave: $mr_enclave, hratls_pubkey: $hratls_pubkey, ports: {:?}, memory_mb: {},
vcpus: {}, disk_size_gb: {}, locked_nano: {locked_nano}, price_per_unit: {}, error: '',
}};
COMMIT TRANSACTION;
",
self.ports, self.memory_mb, self.vcpus, self.disk_size_gb, self.price_per_unit);
log::trace!("submit_new_app_req query: {tx_query}");
let mut query_resp = db
.query(tx_query)
.bind(("account_input", self.admin))
.bind(("new_app_req_input", self.id))
.bind(("app_node_input", self.app_node))
.bind(("package_url_input", self.package_url))
.bind(("mr_enclave_input", self.mr_enclave))
.bind(("hratls_pubkey_input", self.hratls_pubkey))
.bind(("app_name_input", self.app_name))
.await?;
let query_err = query_resp.take_errors();
if !query_err.is_empty() {
log::trace!("errors in submit_new_app_req: {query_err:?}");
let tx_fail_err_str =
String::from("The query was not executed due to a failed transaction");
if query_err.contains_key(&8) && query_err[&8].to_string() != tx_fail_err_str {
log::error!("Transaction error: {}", query_err[&8]);
return Err(Error::InsufficientFunds);
} else {
log::error!("Unknown error in submit_new_app_req: {query_err:?}");
return Err(Error::Unknown("submit_new_app_req".to_string()));
}
}
Ok(())
}
}
@ -202,8 +295,6 @@ impl From<ActiveApp> for DeletedApp {
disk_size_gb: value.disk_size_gb,
created_at: value.created_at,
price_per_unit: value.price_per_unit,
locked_nano: value.locked_nano,
collected_at: value.collected_at,
mr_enclave: value.mr_enclave,
package_url: value.package_url,
hratls_pubkey: value.hratls_pubkey,
@ -221,22 +312,31 @@ impl ActiveApp {
(self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64)
}
pub async fn activate(db: &Surreal<Client>, id: &str) -> Result<(), Error> {
let new_app_req = match NewAppReq::get(db, id).await? {
pub async fn activate(
db: &Surreal<Client>,
new_app_res: app_proto::NewAppRes,
) -> Result<(), Error> {
let new_app_req = match NewAppReq::get(db, &new_app_res.uuid).await? {
Some(r) => r,
None => return Ok(()),
};
let mapped_ports = new_app_res
.mapped_ports
.into_iter()
.map(|data| (data.host_port, data.guest_port))
.collect::<Vec<(u32, u32)>>();
let active_app = Self {
id: RecordId::from((ACTIVE_APP, id)),
id: RecordId::from((ACTIVE_APP, &new_app_res.uuid)),
admin: new_app_req.admin,
app_node: new_app_req.app_node,
app_name: new_app_req.app_name,
mapped_ports: vec![],
mapped_ports,
host_ipv4: String::new(),
vcpus: new_app_req.vcpu,
vcpus: new_app_req.vcpus,
memory_mb: new_app_req.memory_mb,
disk_size_gb: new_app_req.disk_mb,
disk_size_gb: new_app_req.disk_size_gb,
created_at: new_app_req.created_at.clone(),
price_per_unit: new_app_req.price_per_unit,
locked_nano: new_app_req.locked_nano,
@ -246,22 +346,63 @@ impl ActiveApp {
hratls_pubkey: new_app_req.hratls_pubkey.clone(),
};
let admin_account = active_app.admin.key().to_string();
let locked_nano = active_app.locked_nano;
let _: Vec<ActiveApp> = db.insert(()).relation(active_app).await?;
db.delete::<Option<NewAppReq>>((NEW_APP_REQ, &new_app_res.uuid)).await?;
db.query(format!("UPDATE {ACCOUNT}:{admin_account} SET tmp_locked -= {locked_nano};"))
.await?;
Ok(())
}
pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<bool, Error> {
let deleted_app: Option<Self> = db.delete((ACTIVE_APP, id)).await?;
if let Some(deleted_app) = deleted_app {
let deleted_app: DeletedApp = deleted_app.into();
let _: Vec<DeletedApp> = db.insert(DELETED_APP).relation(deleted_app).await?;
Ok(true)
} else {
Ok(false)
pub async fn delete(db: &Surreal<Client>, admin: &str, id: &str) -> Result<(), Error> {
let mut app_del_resp = db
.query(
"
BEGIN TRANSACTION;
LET $active_app = $app_id_input;
LET $admin = $admin_input;
IF $active_app.in != $admin {
THROW 'Unauthorized'
};
return fn::delete_app($active_app);
COMMIT TRANSACTION;
",
)
.bind(("app_id_input", RecordId::from((ACTIVE_APP, id))))
.bind(("admin_input", RecordId::from((ACCOUNT, admin))))
.await?;
log::trace!("delete_app query response: {app_del_resp:?}");
let query_err = app_del_resp.take_errors();
if !query_err.is_empty() {
log::trace!("errors in delete_app: {query_err:?}");
let tx_fail_err_str =
String::from("The query was not executed due to a failed transaction");
if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str {
log::error!("Unauthorized: {}", query_err[&2]);
return Err(Error::AccessDenied);
}
}
pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<ActiveApp, Error> {
Ok(())
}
}
pub enum WrappedAppResp {
NewAppRes(NewAppRes),
Error(NewAppRes),
}
impl WrappedAppResp {
pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<WrappedAppResp, Error> {
let mut query_response = db
.query(format!(
"live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};"
@ -272,14 +413,40 @@ impl ActiveApp {
let mut error_stream = query_response.stream::<Notification<db::ErrorFromTable>>(0)?;
let mut active_app_stream = query_response.stream::<Notification<ActiveApp>>(1)?;
tokio::time::timeout(Duration::from_secs(30), async {
let mut error = db
.query(format!("select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};"))
.await?;
if let Some(error_on_newapp_req) = error.take::<Option<db::ErrorFromTable>>(0)? {
if !error_on_newapp_req.error.is_empty() {
let app_daemon_err = NewAppRes {
uuid: app_id.to_string(),
error: error_on_newapp_req.error,
..Default::default()
};
return Ok(Self::Error(app_daemon_err));
}
}
if let Some(active_app) = db.select::<Option<ActiveApp>>((ACTIVE_APP, app_id)).await? {
return Ok(Self::NewAppRes(active_app.into()));
}
log::trace!("listening for table: {NEW_APP_REQ}");
tokio::time::timeout(Duration::from_secs(APP_DAEMON_TIMEOUT), async {
loop {
tokio::select! {
Some(err_notif) = error_stream.next() =>{
match err_notif{
Ok(err_notif) =>{
if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){
return Err(Error::NewAppDaemonResp(err_notif.data.error))
let app_daemon_err = NewAppRes {
uuid: app_id.to_string(),
error: err_notif.data.error,
..Default::default()
};
return Ok(Self::Error(app_daemon_err));
}
}
Err(e) => return Err(e.into())
@ -291,7 +458,7 @@ impl ActiveApp {
Ok(active_app_notif) =>{
if active_app_notif.action == surrealdb::Action::Create {
let _: Option<NewAppReq> = db.delete((NEW_APP_REQ, app_id)).await?;
return Ok(active_app_notif.data);
return Ok(Self::NewAppRes(active_app_notif.data.into()));
}
}
Err(e) => return Err(e.into())
@ -511,9 +678,17 @@ pub struct DeletedApp {
pub disk_size_gb: u32,
pub created_at: Datetime,
pub price_per_unit: u64,
pub locked_nano: u64,
pub collected_at: Datetime,
pub mr_enclave: String,
pub package_url: String,
pub hratls_pubkey: String,
}
impl DeletedApp {
pub async fn list_by_node(db: &Surreal<Client>, node_pubkey: &str) -> Result<Vec<Self>, Error> {
let mut result = db
.query(format!("select * from {DELETED_APP} where out = {APP_NODE}:{node_pubkey};"))
.await?;
let contracts: Vec<Self> = result.take(0)?;
Ok(contracts)
}
}

@ -31,8 +31,6 @@ pub enum Error {
UnknownTable(String),
#[error("Daemon channel got closed: {0}")]
AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>),
#[error("AppDaemon Error {0}")]
NewAppDaemonResp(String),
#[error("Minimum escrow amount is {MIN_ESCROW}")]
MinimalEscrow,
#[error("Insufficient funds, deposit more tokens")]
@ -186,7 +184,7 @@ pub async fn live_appnode_msgs<
}
Err(e) => {
log::error!(
"live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}"
"live_appnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}"
);
return Err(Error::from(e));
}
@ -196,7 +194,7 @@ pub async fn live_appnode_msgs<
Ok(())
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug, Clone)]
pub struct ErrorFromTable {
pub error: String,
}

@ -3,7 +3,8 @@ use std::time::Duration;
use super::Error;
use crate::constants::{
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT,
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_DAEMON_TIMEOUT, VM_NODE,
VM_UPDATE_EVENT,
};
use crate::db::{Account, ErrorFromTable, Report};
use crate::old_brain;
@ -329,18 +330,22 @@ impl WrappedMeasurement {
UPDATE_VM_REQ => UPDATE_VM_REQ,
_ => NEW_VM_REQ,
};
let mut args_stream = db
let mut resp = db
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
.query(format!(
"live select * from measurement_args where id = measurement_args:{vm_id};"
))
.await?
.stream::<Notification<vm_proto::MeasurementArgs>>(0)?;
.await?;
let mut error_stream = resp.stream::<Notification<ErrorFromTable>>(0)?;
let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?;
let mut error_stream = db
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
.await?
.stream::<Notification<ErrorFromTable>>(0)?;
let mut error =
db.query(format!("select error from {table} where id = {NEW_VM_REQ}:{vm_id}")).await?;
if let Some(error_on_newvm_req) = error.take::<Option<ErrorFromTable>>(0)? {
if !error_on_newvm_req.error.is_empty() {
return Ok(Self::Error(vm_id.to_string(), error_on_newvm_req.error));
}
}
let args: Option<vm_proto::MeasurementArgs> =
db.delete(("measurement_args", vm_id)).await?;
@ -349,7 +354,7 @@ impl WrappedMeasurement {
}
log::trace!("listening for table: {table}");
tokio::time::timeout(Duration::from_secs(10), async {
tokio::time::timeout(Duration::from_secs(VM_DAEMON_TIMEOUT), async {
loop {
tokio::select! {
error_notification = error_stream.next() => {
@ -358,10 +363,7 @@ impl WrappedMeasurement {
Ok(err_notif) => {
if err_notif.action == surrealdb::Action::Update
&& !err_notif.data.error.is_empty() {
return Ok::<WrappedMeasurement, Error>(
Self::Error(vm_id.to_string(),
err_notif.data.error)
);
return Ok(Self::Error(vm_id.to_string(), err_notif.data.error));
};
},
Err(e) => return Err(e.into()),
@ -544,16 +546,43 @@ impl ActiveVm {
Ok(false)
}
pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<bool, Error> {
let deleted_vm: Option<Self> = db.delete((ACTIVE_VM, id)).await?;
if let Some(deleted_vm) = deleted_vm {
let deleted_vm: DeletedVm = deleted_vm.into();
let _: Vec<DeletedVm> = db.insert(DELETED_VM).relation(deleted_vm).await?;
Ok(true)
} else {
Ok(false)
pub async fn delete(db: &Surreal<Client>, admin: &str, id: &str) -> Result<(), Error> {
let mut vm_del_resp = db
.query(
"
BEGIN TRANSACTION;
LET $active_vm = $vm_id_input;
LET $admin = $admin_input;
IF $active_vm.in != $admin {
THROW 'Unauthorized'
};
return fn::delete_vm($active_vm);
COMMIT TRANSACTION;
",
)
.bind(("vm_id_input", RecordId::from((ACTIVE_VM, id))))
.bind(("admin_input", RecordId::from((ACCOUNT, admin))))
.await?;
log::trace!("delete_vm query response: {vm_del_resp:?}");
let query_err = vm_del_resp.take_errors();
if !query_err.is_empty() {
log::trace!("errors in delete_vm: {query_err:?}");
let tx_fail_err_str =
String::from("The query was not executed due to a failed transaction");
if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str {
log::error!("Unauthorized: {}", query_err[&2]);
return Err(Error::AccessDenied);
}
}
Ok(())
}
pub async fn extend_time(
db: &Surreal<Client>,

@ -15,7 +15,7 @@ use surrealdb::Surreal;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};
use tonic::{Response, Status, Streaming};
use tonic::{Request, Response, Status, Streaming};
pub struct AppDaemonServer {
pub db: Arc<Surreal<Client>>,
@ -29,13 +29,13 @@ impl AppDaemonServer {
#[tonic::async_trait]
impl BrainAppDaemon for AppDaemonServer {
type RegisterAppNodeStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
type RegisterAppNodeStream = Pin<Box<dyn Stream<Item = Result<DelAppReq, Status>> + Send>>;
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessageApp, Status>> + Send>>;
async fn register_app_node(
&self,
req: tonic::Request<RegisterAppNodeReq>,
) -> Result<tonic::Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> {
req: Request<RegisterAppNodeReq>,
) -> Result<Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> {
let req = check_sig_from_req(req)?;
info!("Starting app_node registration process for {:?}", req);
@ -59,11 +59,11 @@ impl BrainAppDaemon for AppDaemonServer {
app_node.register(&self.db).await?;
info!("Sending existing contracts to {}", req.node_pubkey);
let contracts = db::ActiveAppWithNode::list_by_node(&self.db, &req.node_pubkey).await?;
let deleted_apps = db::DeletedApp::list_by_node(&self.db, &req.node_pubkey).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
for deleted_app in deleted_apps {
let _ = tx.send(Ok(deleted_app.into())).await;
}
});
@ -73,8 +73,8 @@ impl BrainAppDaemon for AppDaemonServer {
async fn brain_messages(
&self,
req: tonic::Request<DaemonAuth>,
) -> Result<tonic::Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> {
req: Request<DaemonAuth>,
) -> Result<Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> {
let auth = req.into_inner();
let pubkey = auth.pubkey.clone();
check_sig_from_parts(
@ -110,8 +110,8 @@ impl BrainAppDaemon for AppDaemonServer {
async fn daemon_messages(
&self,
req: tonic::Request<Streaming<DaemonMessageApp>>,
) -> Result<tonic::Response<Empty>, Status> {
req: Request<Streaming<DaemonMessageApp>>,
) -> Result<Response<Empty>, Status> {
let mut req_stream = req.into_inner();
let pubkey: String;
if let Some(Ok(msg)) = req_stream.next().await {
@ -145,7 +145,7 @@ impl BrainAppDaemon for AppDaemonServer {
)
.await?;
} else {
db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?;
db::ActiveApp::activate(&self.db, new_app_resp).await?;
}
}
Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => {
@ -180,33 +180,40 @@ impl BrainAppCli for AppCliServer {
type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>;
async fn new_app(
&self,
req: tonic::Request<detee_shared::app_proto::NewAppReq>,
) -> Result<tonic::Response<detee_shared::app_proto::NewAppRes>, Status> {
async fn new_app(&self, req: Request<NewAppReq>) -> Result<Response<NewAppRes>, Status> {
let req = check_sig_from_req(req)?;
info!("deploy_app process starting for {:?}", req);
// TODO: make it atleast 1 hour
if req.locked_nano < 100 {
log::error!("locking lessthan 100 nano lps: {}", req.locked_nano);
return Err(Status::unknown("lock atleaset 100 nano lps"));
}
info!("new_app process starting for {:?}", req);
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
return Err(Status::permission_denied("This operator banned you. What did you do?"));
}
let new_app_req: db::NewAppReq = req.into();
let id = new_app_req.id.to_string();
let db_req: db::NewAppReq = req.into();
let id = db_req.id.key().to_string();
let (tx, rx) = tokio::sync::oneshot::channel();
let db = self.db.clone();
tokio::spawn(async move {
let _ = tx.send(db::ActiveApp::listen(&db, &id).await);
let _ = tx.send(db::WrappedAppResp::listen(&db, &id).await);
});
new_app_req.submit(&self.db).await?;
db_req.submit(&self.db).await?;
match rx.await {
Ok(Ok(db::WrappedAppResp::NewAppRes(new_app_resp))) => Ok(Response::new(new_app_resp)),
Ok(Ok(db::WrappedAppResp::Error(err))) => Ok(Response::new(err)),
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
"Network timeout. Please try again later or contact the DeTEE devs team.",
)),
Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)),
Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())),
Ok(Err(e)) => {
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
Err(e) => {
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
Err(Status::unknown(
@ -216,22 +223,25 @@ impl BrainAppCli for AppCliServer {
}
}
async fn delete_app(
&self,
req: tonic::Request<DelAppReq>,
) -> Result<tonic::Response<Empty>, Status> {
async fn delete_app(&self, req: Request<DelAppReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
info!("delete_app process starting for {:?}", req);
match ActiveApp::delete(&self.db, &req.uuid).await? {
true => Ok(Response::new(Empty {})),
false => Err(Status::not_found(format!("Could not find App contract {}", &req.uuid))),
match ActiveApp::delete(&self.db, &req.admin_pubkey, &req.uuid).await {
Ok(()) => Ok(Response::new(Empty {})),
Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")),
Err(e) => {
log::error!("Error deleting app contract {}: {e}", &req.uuid);
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn list_app_contracts(
&self,
req: tonic::Request<ListAppContractsReq>,
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> {
req: Request<ListAppContractsReq>,
) -> Result<Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> {
let req = check_sig_from_req(req)?;
info!("list_app_contracts process starting for {:?}", req);
@ -270,8 +280,8 @@ impl BrainAppCli for AppCliServer {
async fn list_app_nodes(
&self,
req: tonic::Request<AppNodeFilters>,
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> {
req: Request<AppNodeFilters>,
) -> Result<Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> {
let req = check_sig_from_req(req)?;
info!("list_app_nodes process starting for {:?}", req);
let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?;
@ -287,8 +297,8 @@ impl BrainAppCli for AppCliServer {
async fn get_one_app_node(
&self,
req: tonic::Request<AppNodeFilters>,
) -> Result<tonic::Response<AppNodeListResp>, Status> {
req: Request<AppNodeFilters>,
) -> Result<Response<AppNodeListResp>, Status> {
let req = check_sig_from_req(req)?;
info!("get_one_app_node process starting for {:?}", req);
let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true)

@ -274,8 +274,8 @@ impl From<db::ActiveAppWithNode> for AppContract {
public_ipv4: value.host_ipv4,
resource: Some(AppResource {
memory_mb: value.memory_mb,
disk_mb: value.disk_size_gb,
vcpu: value.vcpus,
disk_size_gb: value.disk_size_gb,
vcpus: value.vcpus,
ports: value.mapped_ports.iter().map(|(_, g)| *g).collect(),
}),
mapped_ports: value
@ -316,8 +316,8 @@ impl From<NewAppReq> for db::NewAppReq {
hratls_pubkey: val.hratls_pubkey,
ports: resource.ports,
memory_mb: resource.memory_mb,
vcpu: resource.vcpu,
disk_mb: resource.disk_mb,
vcpus: resource.vcpus,
disk_size_gb: resource.disk_size_gb,
locked_nano: val.locked_nano,
price_per_unit: val.price_per_unit,
error: String::new(),
@ -329,9 +329,9 @@ impl From<NewAppReq> for db::NewAppReq {
impl From<db::NewAppReq> for NewAppReq {
fn from(value: db::NewAppReq) -> Self {
let resource = AppResource {
vcpu: value.vcpu,
vcpus: value.vcpus,
memory_mb: value.memory_mb,
disk_mb: value.disk_mb,
disk_size_gb: value.disk_size_gb,
ports: value.ports,
};
let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default());

@ -222,6 +222,11 @@ impl BrainVmCli for VmCliServer {
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
let req = check_sig_from_req(req)?;
// TODO: make it atleast 1 hour
if req.locked_nano < 100 {
log::error!("locking lessthan 100 nano lps: {}", req.locked_nano);
return Err(Status::unknown("lock atleaset 100 nano lps"));
}
info!("New VM requested via CLI: {req:?}");
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
return Err(Status::permission_denied("This operator banned you. What did you do?"));
@ -337,9 +342,15 @@ impl BrainVmCli for VmCliServer {
async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
match db::ActiveVm::delete(&self.db, &req.uuid).await? {
true => Ok(Response::new(Empty {})),
false => Err(Status::not_found(format!("Could not find VM contract {}", &req.uuid))),
match db::ActiveVm::delete(&self.db, &req.admin_pubkey, &req.uuid).await {
Ok(()) => Ok(Response::new(Empty {})),
Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")),
Err(e) => {
log::error!("Error deleting VM contract {}: {e}", &req.uuid);
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}

@ -22,7 +22,7 @@ DEFINE FUNCTION OVERWRITE fn::delete_vm(
UPDATE $account SET balance += $vm.locked_nano;
};
INSERT RELATION INTO deleted_vm ( $deleted_vm );
DELETE $vm.id;
RETURN DELETE $vm.id RETURN BEFORE;
};
DEFINE FUNCTION OVERWRITE fn::app_price_per_minute(
@ -35,3 +35,20 @@ DEFINE FUNCTION OVERWRITE fn::app_price_per_minute(
($app.disk_size_gb / 10))
* $app.price_per_unit;
};
DEFINE FUNCTION OVERWRITE fn::delete_app(
$app_id: record
) {
LET $app = (select * from $app_id)[0];
LET $account = $app.in;
LET $deleted_app = $app.patch([{
'op': 'replace',
'path': 'id',
'value': type::record("deleted_app:" + record::id($app.id))
}]);
IF $app.locked_nano >= 0 {
UPDATE $account SET balance += $app.locked_nano;
};
INSERT RELATION INTO deleted_app ( $deleted_app );
RETURN DELETE $app.id RETURN BEFORE;
};

@ -99,8 +99,8 @@ DEFINE FIELD mr_enclave ON TABLE new_app_req TYPE string;
DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string;
DEFINE FIELD ports ON TABLE new_app_req TYPE array<int>;
DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int;
DEFINE FIELD vcpu ON TABLE new_app_req TYPE int;
DEFINE FIELD disk_mb ON TABLE new_app_req TYPE int;
DEFINE FIELD vcpus ON TABLE new_app_req TYPE int;
DEFINE FIELD disk_size_gb ON TABLE new_app_req TYPE int;
DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int;
DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int;
DEFINE FIELD error ON TABLE new_app_req TYPE string;

@ -27,3 +27,5 @@ FOR $contract IN (select * from active_vm fetch out) {
fn::delete_vm($contract.id);
};
};
-- TODO: implement for active_app

@ -0,0 +1,31 @@
use anyhow::Result;
use detee_shared::app_proto::{
brain_app_cli_client::BrainAppCliClient, AppResource, NewAppReq, NewAppRes,
};
use tonic::transport::Channel;
use crate::common::test_utils::Key;
pub async fn create_new_app(
key: &Key,
node_pubkey: &str,
brain_channel: &Channel,
) -> Result<NewAppRes> {
let new_app_req = NewAppReq {
admin_pubkey: key.pubkey.clone(),
node_pubkey: node_pubkey.to_string(),
price_per_unit: 1200,
resource: Some(AppResource { ports: vec![8080, 8081], ..Default::default() }),
locked_nano: 100,
..Default::default()
};
let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone());
let new_app_resp =
client_app_cli.new_app(key.sign_request(new_app_req.clone())?).await?.into_inner();
assert!(new_app_resp.error.is_empty());
assert!(new_app_resp.uuid.len() == 40);
Ok(new_app_resp)
}

@ -0,0 +1,144 @@
use anyhow::Result;
use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient;
use detee_shared::app_proto::{self, NewAppRes, RegisterAppNodeReq};
use detee_shared::common_proto::MappedPort;
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use super::test_utils::Key;
pub async fn mock_app_daemon(
brain_channel: &Channel,
daemon_error: Option<String>,
) -> Result<String> {
let mut daemon_client = BrainAppDaemonClient::new(brain_channel.clone());
let daemon_key = Key::new();
register_app_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?;
let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx));
let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(daemon_msg_sender(
daemon_client.clone(),
daemon_key.clone(),
daemon_msg_tx.clone(),
rx,
));
tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error));
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Ok(daemon_key.pubkey)
}
pub async fn register_app_node(
client: &mut BrainAppDaemonClient<Channel>,
key: &Key,
operator_wallet: &str,
) -> Result<Vec<app_proto::DelAppReq>> {
log::info!("Registering app_node: {}", key.pubkey);
let node_pubkey = key.pubkey.clone();
let req = RegisterAppNodeReq {
node_pubkey,
operator_wallet: operator_wallet.to_string(),
main_ip: String::from("185.243.218.213"),
city: String::from("Oslo"),
country: String::from("Norway"),
region: String::from("EU"),
price: 1200,
};
let mut grpc_stream = client.register_app_node(key.sign_request(req)?).await?.into_inner();
let mut deleted_app_reqs = Vec::new();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(del_app_req) => {
deleted_app_reqs.push(del_app_req);
}
Err(e) => {
panic!("Received error instead of deleted_app_reqs: {e:?}");
}
}
}
Ok(deleted_app_reqs)
}
pub async fn daemon_listener(
mut client: BrainAppDaemonClient<Channel>,
key: Key,
tx: mpsc::Sender<app_proto::BrainMessageApp>,
) -> Result<()> {
log::info!("listening app_daemon");
let mut grpc_stream =
client.brain_messages(key.sign_stream_auth_app(vec![])?).await?.into_inner();
while let Some(Ok(stream_update)) = grpc_stream.next().await {
log::info!("app deamon got notified: {:?}", &stream_update);
let _ = tx.send(stream_update).await;
}
Ok(())
}
pub async fn daemon_msg_sender(
mut client: BrainAppDaemonClient<Channel>,
key: Key,
tx: mpsc::Sender<app_proto::DaemonMessageApp>,
rx: mpsc::Receiver<app_proto::DaemonMessageApp>,
) -> Result<()> {
log::info!("sender app_daemon");
let rx_stream = ReceiverStream::new(rx);
tx.send(app_proto::DaemonMessageApp {
msg: Some(app_proto::daemon_message_app::Msg::Auth(key.sign_stream_auth_app(vec![])?)),
})
.await?;
client.daemon_messages(rx_stream).await?;
Ok(())
}
pub async fn daemon_engine(
tx: mpsc::Sender<app_proto::DaemonMessageApp>,
mut rx: mpsc::Receiver<app_proto::BrainMessageApp>,
new_app_err: Option<String>,
) -> Result<()> {
log::info!("daemon engine app_daemon");
while let Some(brain_msg) = rx.recv().await {
match brain_msg.msg {
Some(app_proto::brain_message_app::Msg::NewAppReq(new_app_req)) => {
let exposed_ports =
[vec![34500], new_app_req.resource.unwrap_or_default().ports].concat();
let mapped_ports = exposed_ports
.into_iter()
.map(|port| MappedPort { host_port: port, guest_port: port })
.collect::<Vec<MappedPort>>();
let res_data = NewAppRes {
uuid: new_app_req.uuid,
mapped_ports,
ip_address: "127.0.0.1".to_string(),
error: new_app_err.clone().unwrap_or_default(),
};
let res = app_proto::DaemonMessageApp {
msg: Some(app_proto::daemon_message_app::Msg::NewAppRes(res_data)),
};
tx.send(res).await?;
}
Some(app_proto::brain_message_app::Msg::DeleteAppReq(del_app_req)) => {
println!("MOCK_APP_DAEMON:: delete app request for {}", del_app_req.uuid);
}
None => todo!(),
}
}
Ok(())
}

@ -6,3 +6,9 @@ pub mod test_utils;
pub mod vm_cli_utils;
#[allow(dead_code)]
pub mod vm_daemon_utils;
#[allow(dead_code)]
pub mod app_daemon_utils;
#[allow(dead_code)]
pub mod app_cli_utils;

@ -1,4 +1,6 @@
use anyhow::Result;
use detee_shared::app_proto::brain_app_cli_server::BrainAppCliServer;
use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemonServer;
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
@ -7,6 +9,7 @@ use hyper_util::rt::TokioIo;
use std::net::SocketAddr;
use std::sync::Arc;
use surreal_brain::constants::DB_SCHEMA_FILES;
use surreal_brain::grpc::app::{AppCliServer, AppDaemonServer};
use surreal_brain::grpc::general::GeneralCliServer;
use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer};
use surrealdb::engine::remote::ws::Client;
@ -96,6 +99,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream {
.add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone())))
.add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone())))
.add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())))
.add_service(BrainAppCliServer::new(AppCliServer::new(db_arc.clone())))
.add_service(BrainAppDaemonServer::new(AppDaemonServer::new(db_arc.clone())))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await?;

@ -1,9 +1,14 @@
use anyhow::Result;
use detee_shared::app_proto as sgx_proto;
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
use detee_shared::general_proto::AirdropReq;
use detee_shared::vm_proto as snp_proto;
use ed25519_dalek::{Signer, SigningKey};
use itertools::Itertools;
use std::sync::OnceLock;
use surreal_brain::constants::TOKEN_DECIMAL;
use tonic::metadata::AsciiMetadataValue;
use tonic::transport::Channel;
use tonic::Request;
pub static ADMIN_KEYS: OnceLock<Vec<Key>> = OnceLock::new();
@ -63,11 +68,33 @@ impl Key {
Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string())
}
pub fn sign_stream_auth(&self, contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> {
pub fn sign_stream_auth_vm(
&self,
contracts: Vec<String>,
) -> Result<snp_proto::DaemonStreamAuth> {
let pubkey = self.pubkey.clone();
let timestamp = chrono::Utc::now().to_rfc3339();
let signature =
self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?;
Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature })
}
pub fn sign_stream_auth_app(&self, contracts: Vec<String>) -> Result<sgx_proto::DaemonAuth> {
let pubkey = self.pubkey.clone();
let timestamp = chrono::Utc::now().to_rfc3339();
let signature =
self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?;
Ok(sgx_proto::DaemonAuth { timestamp, pubkey, contracts, signature })
}
}
pub async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> {
let mut client = BrainGeneralCliClient::new(brain_channel.clone());
let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount * TOKEN_DECIMAL };
let admin_key = admin_keys()[0].clone();
client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?;
Ok(())
}

@ -1,29 +1,18 @@
use super::test_utils::{admin_keys, Key};
use super::test_utils::Key;
use anyhow::{anyhow, Result};
use detee_shared::app_proto;
use detee_shared::common_proto::Empty;
use detee_shared::general_proto::brain_general_cli_client::BrainGeneralCliClient;
use detee_shared::general_proto::{Account, AirdropReq, RegOperatorReq, ReportNodeReq};
use detee_shared::general_proto::{Account, RegOperatorReq, ReportNodeReq};
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use futures::StreamExt;
use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL};
use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ};
use surreal_brain::db::prelude as db;
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use tonic::transport::Channel;
pub async fn airdrop(brain_channel: &Channel, wallet: &str, amount: u64) -> Result<()> {
let mut client = BrainGeneralCliClient::new(brain_channel.clone());
let airdrop_req = AirdropReq { pubkey: wallet.to_string(), tokens: amount * TOKEN_DECIMAL };
let admin_key = admin_keys()[0].clone();
client.airdrop(admin_key.sign_request(airdrop_req.clone())?).await?;
Ok(())
}
pub async fn create_new_vm(
db: &Surreal<Client>,
key: &Key,
@ -35,7 +24,7 @@ pub async fn create_new_vm(
node_pubkey: node_pubkey.to_string(),
price_per_unit: 1200,
extra_ports: vec![8080, 8081],
locked_nano: 0,
locked_nano: 100,
..Default::default()
};

@ -59,8 +59,8 @@ pub async fn register_vm_node(
let mut deleted_vm_reqs = Vec::new();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(del_vm_rq) => {
deleted_vm_reqs.push(del_vm_rq);
Ok(del_vm_req) => {
deleted_vm_reqs.push(del_vm_req);
}
Err(e) => {
panic!("Received error instead of deleted_vm_reqs: {e:?}");
@ -76,7 +76,8 @@ pub async fn daemon_listener(
tx: mpsc::Sender<vm_proto::BrainVmMessage>,
) -> Result<()> {
log::info!("listening vm_daemon");
let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner();
let mut grpc_stream =
client.brain_messages(key.sign_stream_auth_vm(vec![])?).await?.into_inner();
while let Some(Ok(stream_update)) = grpc_stream.next().await {
log::info!("vm deamon got notified: {:?}", &stream_update);
@ -95,7 +96,7 @@ pub async fn daemon_msg_sender(
log::info!("sender vm_daemon");
let rx_stream = ReceiverStream::new(rx);
tx.send(vm_proto::VmDaemonMessage {
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)),
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth_vm(vec![])?)),
})
.await?;
client.daemon_messages(rx_stream).await?;
@ -135,8 +136,8 @@ pub async fn daemon_engine(
Some(vm_proto::brain_vm_message::Msg::UpdateVmReq(_update_vm_req)) => {
todo!()
}
Some(vm_proto::brain_vm_message::Msg::DeleteVm(_del_vm_req)) => {
todo!()
Some(vm_proto::brain_vm_message::Msg::DeleteVm(del_vm_req)) => {
println!("MOCK_VM_DAEMON:: delete vm request for {}", del_vm_req.uuid);
}
None => todo!(),
}

182
tests/grpc_app_cli_test.rs Normal file

@ -0,0 +1,182 @@
use common::app_daemon_utils::mock_app_daemon;
use common::prepare_test_env::{prepare_test_db, run_service_for_stream};
use common::test_utils::{airdrop, Key};
use detee_shared::app_proto::brain_app_cli_client::BrainAppCliClient;
use detee_shared::app_proto::{self, DelAppReq};
use std::vec;
use surreal_brain::constants::{ACCOUNT, ACTIVE_APP, DELETED_APP, NEW_APP_REQ, TOKEN_DECIMAL};
use surreal_brain::db::prelude as db;
use crate::common::app_cli_utils::create_new_app;
mod common;
#[tokio::test]
async fn test_app_creation() {
/*
env_logger::builder()
.filter_level(log::LevelFilter::Trace)
.filter_module("tungstenite", log::LevelFilter::Debug)
.filter_module("tokio_tungstenite", log::LevelFilter::Debug)
.init();
*/
let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap();
let key = Key::new();
let mut new_app_req = app_proto::NewAppReq {
admin_pubkey: key.pubkey.clone(),
node_pubkey: daemon_key.clone(),
price_per_unit: 1200,
resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }),
locked_nano: 100,
..Default::default()
};
let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone());
let new_app_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await;
assert!(new_app_resp.is_err());
let new_app_err = new_app_resp.err().unwrap();
assert!(new_app_err.to_string().contains("Insufficient funds"));
let airdrop_amount = 10;
airdrop(&brain_channel, &key.pubkey, airdrop_amount).await.unwrap();
let new_app_resp = client_app_cli
.new_app(key.sign_request(new_app_req.clone()).unwrap())
.await
.unwrap()
.into_inner();
let active_app =
db.select::<Option<db::ActiveApp>>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap();
assert!(active_app.is_some());
let daemon_key_02 =
mock_app_daemon(&brain_channel, Some("something went wrong 01".to_string())).await.unwrap();
new_app_req.node_pubkey = daemon_key_02.clone();
let new_app_resp = client_app_cli
.new_app(key.sign_request(new_app_req.clone()).unwrap())
.await
.unwrap()
.into_inner();
assert!(!new_app_resp.error.is_empty());
let app_req_db =
db.select::<Option<db::NewAppReq>>((NEW_APP_REQ, new_app_resp.uuid)).await.unwrap();
assert!(!app_req_db.unwrap().error.is_empty());
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100));
assert_eq!(acc_db.tmp_locked, 0);
let locking_nano = 1288;
new_app_req.node_pubkey = daemon_key;
new_app_req.locked_nano = locking_nano;
let new_app_resp =
client_app_cli.new_app(key.sign_request(new_app_req).unwrap()).await.unwrap().into_inner();
assert!(new_app_resp.error.is_empty());
let active_app =
db.select::<Option<db::ActiveApp>>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap();
assert!(active_app.is_some());
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100));
assert_eq!(acc_db.tmp_locked, 0);
}
#[tokio::test]
async fn test_timeout_app_creation() {
let _ = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = Key::new().pubkey.clone();
let key = Key::new();
let new_app_req = app_proto::NewAppReq {
admin_pubkey: key.pubkey.clone(),
node_pubkey: daemon_key.clone(),
price_per_unit: 1200,
resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }),
locked_nano: 100,
..Default::default()
};
airdrop(&brain_channel, &key.pubkey, 10).await.unwrap();
let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone());
let timeout_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await;
assert!(timeout_resp.is_err());
let timeout_err = timeout_resp.err().unwrap();
assert_eq!(
timeout_err.message(),
"Network timeout. Please try again later or contact the DeTEE devs team."
);
}
#[tokio::test]
async fn test_app_deletion() {
let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap();
let key = Key::new();
airdrop(&brain_channel, &key.pubkey, 10).await.unwrap();
let new_app_res = create_new_app(&key, &daemon_key, &brain_channel).await.unwrap();
let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone());
let del_app_req = DelAppReq { admin_pubkey: key.pubkey.clone(), uuid: new_app_res.uuid };
let _ = client_app_cli.delete_app(key.sign_request(del_app_req).unwrap()).await.unwrap();
let key_02 = Key::new();
// delete random app
let mut del_app_req = DelAppReq {
admin_pubkey: key_02.pubkey.clone(),
uuid: "9ae3VH8nJg2i8pqTQ6mJtvYuS2kd9n1XLLco8GUPfT95".to_string(),
};
let del_err = client_app_cli
.delete_app(key_02.sign_request(del_app_req.clone()).unwrap())
.await
.err()
.unwrap();
assert_eq!(del_err.message(), "Unauthorized");
let new_app_res_02 = create_new_app(&key, &daemon_key, &brain_channel).await.unwrap();
del_app_req.uuid = new_app_res_02.uuid;
let del_err = client_app_cli
.delete_app(key_02.sign_request(del_app_req.clone()).unwrap())
.await
.err()
.unwrap();
assert_eq!(del_err.message(), "Unauthorized");
// test refund
let key_03 = Key::new();
airdrop(&brain_channel, &key_03.pubkey, 10).await.unwrap();
let new_app_res_03 = create_new_app(&key_03, &daemon_key, &brain_channel).await.unwrap();
let del_app_req =
DelAppReq { admin_pubkey: key_03.pubkey.clone(), uuid: new_app_res_03.uuid.clone() };
let _ = client_app_cli.delete_app(key_03.sign_request(del_app_req).unwrap()).await.unwrap();
let acc: db::Account = db.select((ACCOUNT, key_03.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc.balance, 10 * TOKEN_DECIMAL);
let deleted_app =
db.select::<Option<db::DeletedApp>>((DELETED_APP, new_app_res_03.uuid)).await.unwrap();
assert!(deleted_app.is_some());
}
// TODO: test register app node, delete app contract while node offline, kick, etc..

@ -1,10 +1,10 @@
use common::prepare_test_env::{
prepare_test_db, run_service_for_stream, run_service_in_background,
};
use common::test_utils::{admin_keys, Key};
use common::test_utils::{admin_keys, airdrop, Key};
use common::vm_cli_utils::{
airdrop, create_new_vm, list_accounts, list_all_app_contracts, list_all_vm_contracts,
register_operator, report_node,
create_new_vm, list_accounts, list_all_app_contracts, list_all_vm_contracts, register_operator,
report_node,
};
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
use detee_shared::common_proto::{Empty, Pubkey};

@ -1,14 +1,14 @@
use common::prepare_test_env::{prepare_test_db, run_service_for_stream};
use common::test_utils::Key;
use common::vm_cli_utils::{airdrop, create_new_vm, user_list_vm_contracts};
use common::test_utils::{airdrop, Key};
use common::vm_cli_utils::{create_new_vm, user_list_vm_contracts};
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
use detee_shared::vm_proto::{self, DeleteVmReq};
use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq};
use futures::StreamExt;
use std::vec;
use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, NEW_VM_REQ, TOKEN_DECIMAL};
use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, TOKEN_DECIMAL};
use surreal_brain::db::prelude as db;
mod common;
@ -73,7 +73,7 @@ async fn test_vm_creation() {
}
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL);
assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100));
assert_eq!(acc_db.tmp_locked, 0);
new_vm_req.node_pubkey = daemon_key;
@ -87,7 +87,7 @@ async fn test_vm_creation() {
assert!(active_vm.is_some());
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - locking_nano);
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100));
assert_eq!(acc_db.tmp_locked, 0);
}
@ -109,7 +109,7 @@ async fn test_timeout_vm_creation() {
node_pubkey: daemon_key.pubkey,
price_per_unit: 1200,
extra_ports: vec![8080, 8081],
locked_nano: 0,
locked_nano: 100,
..Default::default()
};
@ -125,6 +125,65 @@ async fn test_timeout_vm_creation() {
)
}
#[tokio::test]
async fn test_vm_deletion() {
let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let daemon_key = mock_vm_daemon(&brain_channel, None).await.unwrap();
let key = Key::new();
airdrop(&brain_channel, &key.pubkey, 10).await.unwrap();
let new_vm_uuid = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap();
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
let del_app_req = DeleteVmReq { admin_pubkey: key.pubkey.clone(), uuid: new_vm_uuid };
let _ = client_vm_cli.delete_vm(key.sign_request(del_app_req).unwrap()).await.unwrap();
let key_02 = Key::new();
// delete random vm
let mut del_vm_req = DeleteVmReq {
admin_pubkey: key_02.pubkey.clone(),
uuid: "9ae3VH8nJg2i8pqTQ6mJtvYuS2kd9n1XLLco8GUPfT95".to_string(),
};
let del_err = client_vm_cli
.delete_vm(key_02.sign_request(del_vm_req.clone()).unwrap())
.await
.err()
.unwrap();
assert_eq!(del_err.message(), "Unauthorized");
let new_vm_uuid_02 = create_new_vm(&db, &key, &daemon_key, &brain_channel).await.unwrap();
del_vm_req.uuid = new_vm_uuid_02;
let del_err = client_vm_cli
.delete_vm(key_02.sign_request(del_vm_req.clone()).unwrap())
.await
.err()
.unwrap();
assert_eq!(del_err.message(), "Unauthorized");
// test refund
let key_03 = Key::new();
airdrop(&brain_channel, &key_03.pubkey, 10).await.unwrap();
let new_vm_uuid_03 = create_new_vm(&db, &key_03, &daemon_key, &brain_channel).await.unwrap();
let del_vm_req =
DeleteVmReq { admin_pubkey: key_03.pubkey.clone(), uuid: new_vm_uuid_03.clone() };
let _ = client_vm_cli.delete_vm(key_03.sign_request(del_vm_req).unwrap()).await.unwrap();
let acc: db::Account = db.select((ACCOUNT, key_03.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc.balance, 10 * TOKEN_DECIMAL);
let deleted_vm =
db.select::<Option<db::DeletedVm>>((DELETED_VM, new_vm_uuid_03)).await.unwrap();
assert!(deleted_vm.is_some());
}
#[tokio::test]
// TODO: create vm for this user before testing this
async fn test_list_vm_contracts() {
@ -240,3 +299,5 @@ async fn test_extend_vm() {
let acc: db::Account = db_conn.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
assert_eq!(acc.balance, expected_bal_02);
}
// TODO: test register vm node, delete vm contract while node offline, kick, etc..

@ -1,8 +1,7 @@
use common::prepare_test_env::{
prepare_test_db, run_service_for_stream, run_service_in_background,
};
use common::test_utils::Key;
use common::vm_cli_utils::airdrop;
use common::test_utils::{airdrop, Key};
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
@ -39,7 +38,7 @@ async fn test_brain_message() {
node_pubkey: daemon_key,
price_per_unit: 1200,
extra_ports: vec![8080, 8081],
locked_nano: 0,
locked_nano: 100,
..Default::default()
};
airdrop(&brain_channel, &cli_key.pubkey, 10).await.unwrap();