payments on app deployment and refund for vm and app deletion #5
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,3 +1,4 @@
|
||||
/target
|
||||
secrets
|
||||
tmp
|
||||
.env
|
||||
|
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -1000,7 +1000,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "detee-shared"
|
||||
version = "0.1.0"
|
||||
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d6ca058d2de78b5257517034bca2b2c7d5929db8"
|
||||
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain_app#694d5811aa0edc9b2b9bdb17c6e972b04a21b96f"
|
||||
dependencies = [
|
||||
"bincode 2.0.1",
|
||||
"prost",
|
||||
|
@ -13,7 +13,7 @@ serde_yaml = "0.9.34"
|
||||
surrealdb = "2.2.2"
|
||||
tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] }
|
||||
tonic = { version = "0.12", features = ["tls"] }
|
||||
detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain" }
|
||||
detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "surreal_brain_app" }
|
||||
ed25519-dalek = "2.1.1"
|
||||
bs58 = "0.5.1"
|
||||
tokio-stream = "0.1.17"
|
||||
|
@ -17,6 +17,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let db_name = std::env::var("DB_NAME").expect("DB_NAME not set in .env");
|
||||
|
||||
let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap();
|
||||
env_logger::builder().filter_level(log::LevelFilter::Trace);
|
||||
|
||||
db::migration0(&db, &old_brain_data).await?;
|
||||
|
||||
|
@ -48,3 +48,6 @@ pub const ID_ALPHABET: [char; 62] = [
|
||||
|
||||
pub const TOKEN_DECIMAL: u64 = 1_000_000_000;
|
||||
pub const MIN_ESCROW: u64 = 5000 * TOKEN_DECIMAL;
|
||||
|
||||
pub const APP_DAEMON_TIMEOUT: u64 = 20;
|
||||
pub const VM_DAEMON_TIMEOUT: u64 = 10;
|
||||
|
202
src/db/app.rs
202
src/db/app.rs
@ -1,11 +1,13 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use super::Error;
|
||||
use crate::constants::{ACCOUNT, ACTIVE_APP, APP_NODE, DELETED_APP, NEW_APP_REQ};
|
||||
use crate::constants::{
|
||||
ACCOUNT, ACTIVE_APP, APP_DAEMON_TIMEOUT, APP_NODE, DELETED_APP, NEW_APP_REQ,
|
||||
};
|
||||
use crate::db;
|
||||
use crate::db::general::Report;
|
||||
use crate::old_brain;
|
||||
use detee_shared::app_proto;
|
||||
use detee_shared::app_proto::{self, NewAppRes};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use surrealdb::engine::remote::ws::Client;
|
||||
use surrealdb::sql::Datetime;
|
||||
@ -68,8 +70,8 @@ pub struct NewAppReq {
|
||||
pub hratls_pubkey: String,
|
||||
pub ports: Vec<u32>,
|
||||
pub memory_mb: u32,
|
||||
pub vcpu: u32,
|
||||
pub disk_mb: u32,
|
||||
pub vcpus: u32,
|
||||
pub disk_size_gb: u32,
|
||||
pub locked_nano: u64,
|
||||
pub price_per_unit: u64,
|
||||
pub error: String,
|
||||
@ -81,26 +83,117 @@ impl NewAppReq {
|
||||
let new_app_req: Option<Self> = db.select((NEW_APP_REQ, id)).await?;
|
||||
Ok(new_app_req)
|
||||
}
|
||||
pub async fn submit_error(
|
||||
db: &Surreal<Client>,
|
||||
id: &str,
|
||||
error: String,
|
||||
) -> Result<Option<Self>, Error> {
|
||||
#[derive(Serialize)]
|
||||
struct NewAppError {
|
||||
error: String,
|
||||
pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> {
|
||||
let tx_query = String::from(
|
||||
"
|
||||
BEGIN TRANSACTION;
|
||||
|
||||
LET $new_app_req = $new_app_req_input;
|
||||
LET $error = $error_input;
|
||||
LET $record = (select * from $new_app_req)[0];
|
||||
LET $admin = $record.in;
|
||||
|
||||
if $record == None {{
|
||||
THROW 'app req not exist ' + <string>$new_app_req
|
||||
}};
|
||||
|
||||
UPDATE $new_app_req SET error = $error;
|
||||
|
||||
UPDATE $admin SET tmp_locked -= $record.locked_nano;
|
||||
UPDATE $admin SET balance += $record.locked_nano;
|
||||
|
||||
COMMIT TRANSACTION;",
|
||||
);
|
||||
|
||||
log::trace!("submit_new_app_err query: {tx_query}");
|
||||
|
||||
let mut query_resp = db
|
||||
.query(tx_query)
|
||||
.bind(("new_app_req_input", RecordId::from((NEW_APP_REQ, id))))
|
||||
.bind(("error_input", error))
|
||||
.await?;
|
||||
|
||||
let query_err = query_resp.take_errors();
|
||||
if !query_err.is_empty() {
|
||||
log::trace!("errors in submit_new_app_err: {query_err:?}");
|
||||
let tx_fail_err_str =
|
||||
String::from("The query was not executed due to a failed transaction");
|
||||
|
||||
if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str {
|
||||
log::error!("app req not exist: {}", query_err[&4]);
|
||||
return Err(Error::ContractNotFound);
|
||||
} else {
|
||||
log::error!("Unknown error in submit_new_app_err: {query_err:?}");
|
||||
return Err(Error::Unknown("submit_new_app_req".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let record: Option<Self> =
|
||||
db.update((NEW_APP_REQ, id)).merge(NewAppError { error }).await?;
|
||||
|
||||
Ok(record)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn submit(self, db: &Surreal<Client>) -> Result<Vec<Self>, Error> {
|
||||
// TODO: handle financial transaction
|
||||
let new_app_req: Vec<Self> = db.insert(NEW_APP_REQ).relation(self).await?;
|
||||
Ok(new_app_req)
|
||||
pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> {
|
||||
let locked_nano = self.locked_nano;
|
||||
let tx_query = format!( "
|
||||
BEGIN TRANSACTION;
|
||||
|
||||
LET $account = $account_input;
|
||||
LET $new_app_req = $new_app_req_input;
|
||||
LET $app_node = $app_node_input;
|
||||
LET $package_url = $package_url_input;
|
||||
LET $mr_enclave = $mr_enclave_input;
|
||||
LET $hratls_pubkey = $hratls_pubkey_input;
|
||||
LET $app_name = $app_name_input;
|
||||
|
||||
UPDATE $account SET balance -= {locked_nano};
|
||||
IF $account.balance < 0 {{
|
||||
THROW 'Insufficient funds.'
|
||||
}};
|
||||
UPDATE $account SET tmp_locked += {locked_nano};
|
||||
|
||||
|
||||
RELATE
|
||||
$account
|
||||
->$new_app_req
|
||||
->$app_node
|
||||
CONTENT {{
|
||||
created_at: time::now(), app_name: $app_name, package_url: $package_url,
|
||||
mr_enclave: $mr_enclave, hratls_pubkey: $hratls_pubkey, ports: {:?}, memory_mb: {},
|
||||
vcpus: {}, disk_size_gb: {}, locked_nano: {locked_nano}, price_per_unit: {}, error: '',
|
||||
}};
|
||||
|
||||
COMMIT TRANSACTION;
|
||||
",
|
||||
self.ports, self.memory_mb, self.vcpus, self.disk_size_gb, self.price_per_unit);
|
||||
|
||||
log::trace!("submit_new_app_req query: {tx_query}");
|
||||
|
||||
let mut query_resp = db
|
||||
.query(tx_query)
|
||||
.bind(("account_input", self.admin))
|
||||
.bind(("new_app_req_input", self.id))
|
||||
.bind(("app_node_input", self.app_node))
|
||||
.bind(("package_url_input", self.package_url))
|
||||
.bind(("mr_enclave_input", self.mr_enclave))
|
||||
.bind(("hratls_pubkey_input", self.hratls_pubkey))
|
||||
.bind(("app_name_input", self.app_name))
|
||||
.await?;
|
||||
|
||||
let query_err = query_resp.take_errors();
|
||||
if !query_err.is_empty() {
|
||||
log::trace!("errors in submit_new_app_req: {query_err:?}");
|
||||
let tx_fail_err_str =
|
||||
String::from("The query was not executed due to a failed transaction");
|
||||
|
||||
if query_err.contains_key(&8) && query_err[&8].to_string() != tx_fail_err_str {
|
||||
log::error!("Transaction error: {}", query_err[&8]);
|
||||
return Err(Error::InsufficientFunds);
|
||||
} else {
|
||||
log::error!("Unknown error in submit_new_app_req: {query_err:?}");
|
||||
return Err(Error::Unknown("submit_new_app_req".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,22 +314,31 @@ impl ActiveApp {
|
||||
(self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64)
|
||||
}
|
||||
|
||||
pub async fn activate(db: &Surreal<Client>, id: &str) -> Result<(), Error> {
|
||||
let new_app_req = match NewAppReq::get(db, id).await? {
|
||||
pub async fn activate(
|
||||
db: &Surreal<Client>,
|
||||
new_app_res: app_proto::NewAppRes,
|
||||
) -> Result<(), Error> {
|
||||
let new_app_req = match NewAppReq::get(db, &new_app_res.uuid).await? {
|
||||
Some(r) => r,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let mapped_ports = new_app_res
|
||||
.mapped_ports
|
||||
.into_iter()
|
||||
.map(|data| (data.host_port, data.guest_port))
|
||||
.collect::<Vec<(u32, u32)>>();
|
||||
|
||||
let active_app = Self {
|
||||
id: RecordId::from((ACTIVE_APP, id)),
|
||||
id: RecordId::from((ACTIVE_APP, &new_app_res.uuid)),
|
||||
admin: new_app_req.admin,
|
||||
app_node: new_app_req.app_node,
|
||||
app_name: new_app_req.app_name,
|
||||
mapped_ports: vec![],
|
||||
mapped_ports,
|
||||
host_ipv4: String::new(),
|
||||
vcpus: new_app_req.vcpu,
|
||||
vcpus: new_app_req.vcpus,
|
||||
memory_mb: new_app_req.memory_mb,
|
||||
disk_size_gb: new_app_req.disk_mb,
|
||||
disk_size_gb: new_app_req.disk_size_gb,
|
||||
created_at: new_app_req.created_at.clone(),
|
||||
price_per_unit: new_app_req.price_per_unit,
|
||||
locked_nano: new_app_req.locked_nano,
|
||||
@ -246,7 +348,13 @@ impl ActiveApp {
|
||||
hratls_pubkey: new_app_req.hratls_pubkey.clone(),
|
||||
};
|
||||
|
||||
let admin_account = active_app.admin.key().to_string();
|
||||
let locked_nano = active_app.locked_nano;
|
||||
|
||||
let _: Vec<ActiveApp> = db.insert(()).relation(active_app).await?;
|
||||
db.delete::<Option<NewAppReq>>((NEW_APP_REQ, &new_app_res.uuid)).await?;
|
||||
db.query(format!("UPDATE {ACCOUNT}:{admin_account} SET tmp_locked -= {locked_nano};"))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -261,7 +369,15 @@ impl ActiveApp {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<ActiveApp, Error> {
|
||||
}
|
||||
|
||||
pub enum WrappedAppResp {
|
||||
NewAppRes(NewAppRes),
|
||||
Error(NewAppRes),
|
||||
}
|
||||
|
||||
impl WrappedAppResp {
|
||||
pub async fn listen(db: &Surreal<Client>, app_id: &str) -> Result<WrappedAppResp, Error> {
|
||||
let mut query_response = db
|
||||
.query(format!(
|
||||
"live select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};"
|
||||
@ -272,14 +388,40 @@ impl ActiveApp {
|
||||
let mut error_stream = query_response.stream::<Notification<db::ErrorFromTable>>(0)?;
|
||||
let mut active_app_stream = query_response.stream::<Notification<ActiveApp>>(1)?;
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(30), async {
|
||||
let mut error = db
|
||||
.query(format!("select error from {NEW_APP_REQ} where id = {NEW_APP_REQ}:{app_id};"))
|
||||
.await?;
|
||||
if let Some(error_on_newapp_req) = error.take::<Option<db::ErrorFromTable>>(0)? {
|
||||
if !error_on_newapp_req.error.is_empty() {
|
||||
let app_daemon_err = NewAppRes {
|
||||
uuid: app_id.to_string(),
|
||||
error: error_on_newapp_req.error,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
return Ok(Self::Error(app_daemon_err));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(active_app) = db.select::<Option<ActiveApp>>((ACTIVE_APP, app_id)).await? {
|
||||
return Ok(Self::NewAppRes(active_app.into()));
|
||||
}
|
||||
log::trace!("listening for table: {NEW_APP_REQ}");
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(APP_DAEMON_TIMEOUT), async {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(err_notif) = error_stream.next() =>{
|
||||
match err_notif{
|
||||
Ok(err_notif) =>{
|
||||
if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty(){
|
||||
return Err(Error::NewAppDaemonResp(err_notif.data.error))
|
||||
let app_daemon_err = NewAppRes {
|
||||
uuid: app_id.to_string(),
|
||||
error: err_notif.data.error,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
return Ok(Self::Error(app_daemon_err));
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(e.into())
|
||||
@ -291,7 +433,7 @@ impl ActiveApp {
|
||||
Ok(active_app_notif) =>{
|
||||
if active_app_notif.action == surrealdb::Action::Create {
|
||||
let _: Option<NewAppReq> = db.delete((NEW_APP_REQ, app_id)).await?;
|
||||
return Ok(active_app_notif.data);
|
||||
return Ok(Self::NewAppRes(active_app_notif.data.into()));
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(e.into())
|
||||
|
@ -31,8 +31,6 @@ pub enum Error {
|
||||
UnknownTable(String),
|
||||
#[error("Daemon channel got closed: {0}")]
|
||||
AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>),
|
||||
#[error("AppDaemon Error {0}")]
|
||||
NewAppDaemonResp(String),
|
||||
#[error("Minimum escrow amount is {MIN_ESCROW}")]
|
||||
MinimalEscrow,
|
||||
#[error("Insufficient funds, deposit more tokens")]
|
||||
@ -186,7 +184,7 @@ pub async fn live_appnode_msgs<
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
"live_vmnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}"
|
||||
"live_appnode_msgs for {table_name} DB stream failed for {node_pubkey}: {e}"
|
||||
);
|
||||
return Err(Error::from(e));
|
||||
}
|
||||
|
@ -3,7 +3,8 @@ use std::time::Duration;
|
||||
|
||||
use super::Error;
|
||||
use crate::constants::{
|
||||
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE, VM_UPDATE_EVENT,
|
||||
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_DAEMON_TIMEOUT, VM_NODE,
|
||||
VM_UPDATE_EVENT,
|
||||
};
|
||||
use crate::db::{Account, ErrorFromTable, Report};
|
||||
use crate::old_brain;
|
||||
@ -353,7 +354,7 @@ impl WrappedMeasurement {
|
||||
}
|
||||
log::trace!("listening for table: {table}");
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(10), async {
|
||||
tokio::time::timeout(Duration::from_secs(VM_DAEMON_TIMEOUT), async {
|
||||
loop {
|
||||
tokio::select! {
|
||||
error_notification = error_stream.next() => {
|
||||
|
@ -145,7 +145,7 @@ impl BrainAppDaemon for AppDaemonServer {
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?;
|
||||
db::ActiveApp::activate(&self.db, new_app_resp).await?;
|
||||
}
|
||||
}
|
||||
Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => {
|
||||
@ -182,28 +182,38 @@ impl BrainAppCli for AppCliServer {
|
||||
|
||||
async fn new_app(&self, req: Request<NewAppReq>) -> Result<Response<NewAppRes>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
// TODO: make it atleast 1 hour
|
||||
if req.locked_nano < 100 {
|
||||
log::error!("locking lessthan 100 nano lps: {}", req.locked_nano);
|
||||
return Err(Status::unknown("lock atleaset 100 nano lps"));
|
||||
}
|
||||
info!("new_app process starting for {:?}", req);
|
||||
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
|
||||
return Err(Status::permission_denied("This operator banned you. What did you do?"));
|
||||
}
|
||||
|
||||
let db_req: db::NewAppReq = req.into();
|
||||
let id = db_req.id.to_string();
|
||||
let id = db_req.id.key().to_string();
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
let db = self.db.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = tx.send(db::ActiveApp::listen(&db, &id).await);
|
||||
let _ = tx.send(db::WrappedAppResp::listen(&db, &id).await);
|
||||
});
|
||||
|
||||
db_req.submit(&self.db).await?;
|
||||
|
||||
match rx.await {
|
||||
Ok(Ok(db::WrappedAppResp::NewAppRes(new_app_resp))) => Ok(Response::new(new_app_resp)),
|
||||
Ok(Ok(db::WrappedAppResp::Error(err))) => Ok(Response::new(err)),
|
||||
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
|
||||
"Network timeout. Please try again later or contact the DeTEE devs team.",
|
||||
)),
|
||||
Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)),
|
||||
Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())),
|
||||
Ok(Err(e)) => {
|
||||
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
|
||||
Err(Status::unknown(
|
||||
"Unknown error. Please try again or contact the DeTEE devs team.",
|
||||
))
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
|
||||
Err(Status::unknown(
|
||||
|
@ -274,8 +274,8 @@ impl From<db::ActiveAppWithNode> for AppContract {
|
||||
public_ipv4: value.host_ipv4,
|
||||
resource: Some(AppResource {
|
||||
memory_mb: value.memory_mb,
|
||||
disk_mb: value.disk_size_gb,
|
||||
vcpu: value.vcpus,
|
||||
disk_size_gb: value.disk_size_gb,
|
||||
vcpus: value.vcpus,
|
||||
ports: value.mapped_ports.iter().map(|(_, g)| *g).collect(),
|
||||
}),
|
||||
mapped_ports: value
|
||||
@ -316,8 +316,8 @@ impl From<NewAppReq> for db::NewAppReq {
|
||||
hratls_pubkey: val.hratls_pubkey,
|
||||
ports: resource.ports,
|
||||
memory_mb: resource.memory_mb,
|
||||
vcpu: resource.vcpu,
|
||||
disk_mb: resource.disk_mb,
|
||||
vcpus: resource.vcpus,
|
||||
disk_size_gb: resource.disk_size_gb,
|
||||
locked_nano: val.locked_nano,
|
||||
price_per_unit: val.price_per_unit,
|
||||
error: String::new(),
|
||||
@ -329,9 +329,9 @@ impl From<NewAppReq> for db::NewAppReq {
|
||||
impl From<db::NewAppReq> for NewAppReq {
|
||||
fn from(value: db::NewAppReq) -> Self {
|
||||
let resource = AppResource {
|
||||
vcpu: value.vcpu,
|
||||
vcpus: value.vcpus,
|
||||
memory_mb: value.memory_mb,
|
||||
disk_mb: value.disk_mb,
|
||||
disk_size_gb: value.disk_size_gb,
|
||||
ports: value.ports,
|
||||
};
|
||||
let mr_enclave = Some(hex::decode(value.mr_enclave).unwrap_or_default());
|
||||
|
@ -222,6 +222,11 @@ impl BrainVmCli for VmCliServer {
|
||||
|
||||
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
// TODO: make it atleast 1 hour
|
||||
if req.locked_nano < 100 {
|
||||
log::error!("locking lessthan 100 nano lps: {}", req.locked_nano);
|
||||
return Err(Status::unknown("lock atleaset 100 nano lps"));
|
||||
}
|
||||
info!("New VM requested via CLI: {req:?}");
|
||||
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
|
||||
return Err(Status::permission_denied("This operator banned you. What did you do?"));
|
||||
|
@ -99,8 +99,8 @@ DEFINE FIELD mr_enclave ON TABLE new_app_req TYPE string;
|
||||
DEFINE FIELD hratls_pubkey ON TABLE new_app_req TYPE string;
|
||||
DEFINE FIELD ports ON TABLE new_app_req TYPE array<int>;
|
||||
DEFINE FIELD memory_mb ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD vcpu ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD disk_mb ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD vcpus ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD disk_size_gb ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD locked_nano ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD price_per_unit ON TABLE new_app_req TYPE int;
|
||||
DEFINE FIELD error ON TABLE new_app_req TYPE string;
|
||||
|
144
tests/common/app_daemon_utils.rs
Normal file
144
tests/common/app_daemon_utils.rs
Normal file
@ -0,0 +1,144 @@
|
||||
use anyhow::Result;
|
||||
use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient;
|
||||
use detee_shared::app_proto::{self, NewAppRes, RegisterAppNodeReq};
|
||||
use detee_shared::common_proto::MappedPort;
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use super::test_utils::Key;
|
||||
|
||||
pub async fn mock_app_daemon(
|
||||
brain_channel: &Channel,
|
||||
daemon_error: Option<String>,
|
||||
) -> Result<String> {
|
||||
let mut daemon_client = BrainAppDaemonClient::new(brain_channel.clone());
|
||||
let daemon_key = Key::new();
|
||||
|
||||
register_app_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?;
|
||||
|
||||
let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx));
|
||||
|
||||
let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
tokio::spawn(daemon_msg_sender(
|
||||
daemon_client.clone(),
|
||||
daemon_key.clone(),
|
||||
daemon_msg_tx.clone(),
|
||||
rx,
|
||||
));
|
||||
|
||||
tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error));
|
||||
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
|
||||
Ok(daemon_key.pubkey)
|
||||
}
|
||||
|
||||
pub async fn register_app_node(
|
||||
client: &mut BrainAppDaemonClient<Channel>,
|
||||
key: &Key,
|
||||
operator_wallet: &str,
|
||||
) -> Result<Vec<app_proto::AppContract>> {
|
||||
log::info!("Registering app_node: {}", key.pubkey);
|
||||
let node_pubkey = key.pubkey.clone();
|
||||
|
||||
let req = RegisterAppNodeReq {
|
||||
node_pubkey,
|
||||
operator_wallet: operator_wallet.to_string(),
|
||||
main_ip: String::from("185.243.218.213"),
|
||||
city: String::from("Oslo"),
|
||||
country: String::from("Norway"),
|
||||
region: String::from("EU"),
|
||||
price: 1200,
|
||||
};
|
||||
|
||||
let mut grpc_stream = client.register_app_node(key.sign_request(req)?).await?.into_inner();
|
||||
|
||||
let mut deleted_app_reqs = Vec::new();
|
||||
while let Some(stream_update) = grpc_stream.next().await {
|
||||
match stream_update {
|
||||
Ok(del_app_req) => {
|
||||
deleted_app_reqs.push(del_app_req);
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Received error instead of deleted_app_reqs: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(deleted_app_reqs)
|
||||
}
|
||||
|
||||
pub async fn daemon_listener(
|
||||
mut client: BrainAppDaemonClient<Channel>,
|
||||
key: Key,
|
||||
tx: mpsc::Sender<app_proto::BrainMessageApp>,
|
||||
) -> Result<()> {
|
||||
log::info!("listening app_daemon");
|
||||
let mut grpc_stream =
|
||||
client.brain_messages(key.sign_stream_auth_app(vec![])?).await?.into_inner();
|
||||
|
||||
while let Some(Ok(stream_update)) = grpc_stream.next().await {
|
||||
log::info!("app deamon got notified: {:?}", &stream_update);
|
||||
let _ = tx.send(stream_update).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn daemon_msg_sender(
|
||||
mut client: BrainAppDaemonClient<Channel>,
|
||||
key: Key,
|
||||
tx: mpsc::Sender<app_proto::DaemonMessageApp>,
|
||||
rx: mpsc::Receiver<app_proto::DaemonMessageApp>,
|
||||
) -> Result<()> {
|
||||
log::info!("sender app_daemon");
|
||||
let rx_stream = ReceiverStream::new(rx);
|
||||
tx.send(app_proto::DaemonMessageApp {
|
||||
msg: Some(app_proto::daemon_message_app::Msg::Auth(key.sign_stream_auth_app(vec![])?)),
|
||||
})
|
||||
.await?;
|
||||
client.daemon_messages(rx_stream).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn daemon_engine(
|
||||
tx: mpsc::Sender<app_proto::DaemonMessageApp>,
|
||||
mut rx: mpsc::Receiver<app_proto::BrainMessageApp>,
|
||||
new_app_err: Option<String>,
|
||||
) -> Result<()> {
|
||||
log::info!("daemon engine app_daemon");
|
||||
while let Some(brain_msg) = rx.recv().await {
|
||||
match brain_msg.msg {
|
||||
Some(app_proto::brain_message_app::Msg::NewAppReq(new_app_req)) => {
|
||||
let exposed_ports =
|
||||
[vec![34500], new_app_req.resource.unwrap_or_default().ports].concat();
|
||||
|
||||
let mapped_ports = exposed_ports
|
||||
.into_iter()
|
||||
.map(|port| MappedPort { host_port: port, guest_port: port })
|
||||
.collect::<Vec<MappedPort>>();
|
||||
|
||||
let res_data = NewAppRes {
|
||||
uuid: new_app_req.uuid,
|
||||
mapped_ports,
|
||||
ip_address: "127.0.0.1".to_string(),
|
||||
error: new_app_err.clone().unwrap_or_default(),
|
||||
};
|
||||
|
||||
let res = app_proto::DaemonMessageApp {
|
||||
msg: Some(app_proto::daemon_message_app::Msg::NewAppRes(res_data)),
|
||||
};
|
||||
tx.send(res).await?;
|
||||
}
|
||||
Some(app_proto::brain_message_app::Msg::DeleteAppReq(_del_app_req)) => {
|
||||
todo!()
|
||||
}
|
||||
None => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
@ -6,3 +6,6 @@ pub mod test_utils;
|
||||
pub mod vm_cli_utils;
|
||||
#[allow(dead_code)]
|
||||
pub mod vm_daemon_utils;
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub mod app_daemon_utils;
|
||||
|
@ -1,4 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use detee_shared::app_proto::brain_app_cli_server::BrainAppCliServer;
|
||||
use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemonServer;
|
||||
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
|
||||
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
|
||||
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
|
||||
@ -7,6 +9,7 @@ use hyper_util::rt::TokioIo;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use surreal_brain::constants::DB_SCHEMA_FILES;
|
||||
use surreal_brain::grpc::app::{AppCliServer, AppDaemonServer};
|
||||
use surreal_brain::grpc::general::GeneralCliServer;
|
||||
use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer};
|
||||
use surrealdb::engine::remote::ws::Client;
|
||||
@ -96,6 +99,8 @@ pub async fn run_service_for_stream_server() -> DuplexStream {
|
||||
.add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone())))
|
||||
.add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone())))
|
||||
.add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())))
|
||||
.add_service(BrainAppCliServer::new(AppCliServer::new(db_arc.clone())))
|
||||
.add_service(BrainAppDaemonServer::new(AppDaemonServer::new(db_arc.clone())))
|
||||
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
|
||||
.await?;
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use detee_shared::app_proto as sgx_proto;
|
||||
use detee_shared::vm_proto as snp_proto;
|
||||
use ed25519_dalek::{Signer, SigningKey};
|
||||
use itertools::Itertools;
|
||||
@ -63,11 +64,22 @@ impl Key {
|
||||
Ok(bs58::encode(key.sign(message.as_bytes()).to_bytes()).into_string())
|
||||
}
|
||||
|
||||
pub fn sign_stream_auth(&self, contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> {
|
||||
pub fn sign_stream_auth_vm(
|
||||
&self,
|
||||
contracts: Vec<String>,
|
||||
) -> Result<snp_proto::DaemonStreamAuth> {
|
||||
let pubkey = self.pubkey.clone();
|
||||
let timestamp = chrono::Utc::now().to_rfc3339();
|
||||
let signature =
|
||||
self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?;
|
||||
Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature })
|
||||
}
|
||||
|
||||
pub fn sign_stream_auth_app(&self, contracts: Vec<String>) -> Result<sgx_proto::DaemonAuth> {
|
||||
let pubkey = self.pubkey.clone();
|
||||
let timestamp = chrono::Utc::now().to_rfc3339();
|
||||
let signature =
|
||||
self.try_sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?;
|
||||
Ok(sgx_proto::DaemonAuth { timestamp, pubkey, contracts, signature })
|
||||
}
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ pub async fn create_new_vm(
|
||||
node_pubkey: node_pubkey.to_string(),
|
||||
price_per_unit: 1200,
|
||||
extra_ports: vec![8080, 8081],
|
||||
locked_nano: 0,
|
||||
locked_nano: 100,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
@ -59,8 +59,8 @@ pub async fn register_vm_node(
|
||||
let mut deleted_vm_reqs = Vec::new();
|
||||
while let Some(stream_update) = grpc_stream.next().await {
|
||||
match stream_update {
|
||||
Ok(del_vm_rq) => {
|
||||
deleted_vm_reqs.push(del_vm_rq);
|
||||
Ok(del_vm_req) => {
|
||||
deleted_vm_reqs.push(del_vm_req);
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Received error instead of deleted_vm_reqs: {e:?}");
|
||||
@ -76,7 +76,8 @@ pub async fn daemon_listener(
|
||||
tx: mpsc::Sender<vm_proto::BrainVmMessage>,
|
||||
) -> Result<()> {
|
||||
log::info!("listening vm_daemon");
|
||||
let mut grpc_stream = client.brain_messages(key.sign_stream_auth(vec![])?).await?.into_inner();
|
||||
let mut grpc_stream =
|
||||
client.brain_messages(key.sign_stream_auth_vm(vec![])?).await?.into_inner();
|
||||
|
||||
while let Some(Ok(stream_update)) = grpc_stream.next().await {
|
||||
log::info!("vm deamon got notified: {:?}", &stream_update);
|
||||
@ -95,7 +96,7 @@ pub async fn daemon_msg_sender(
|
||||
log::info!("sender vm_daemon");
|
||||
let rx_stream = ReceiverStream::new(rx);
|
||||
tx.send(vm_proto::VmDaemonMessage {
|
||||
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![])?)),
|
||||
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth_vm(vec![])?)),
|
||||
})
|
||||
.await?;
|
||||
client.daemon_messages(rx_stream).await?;
|
||||
|
91
tests/grpc_app_cli_test.rs
Normal file
91
tests/grpc_app_cli_test.rs
Normal file
@ -0,0 +1,91 @@
|
||||
use common::app_daemon_utils::mock_app_daemon;
|
||||
use common::prepare_test_env::{prepare_test_db, run_service_for_stream};
|
||||
use common::test_utils::Key;
|
||||
use common::vm_cli_utils::airdrop;
|
||||
use detee_shared::app_proto;
|
||||
use detee_shared::app_proto::brain_app_cli_client::BrainAppCliClient;
|
||||
use std::vec;
|
||||
use surreal_brain::constants::{ACCOUNT, ACTIVE_APP, NEW_APP_REQ, TOKEN_DECIMAL};
|
||||
use surreal_brain::db::prelude as db;
|
||||
|
||||
mod common;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_app_creation() {
|
||||
/*
|
||||
env_logger::builder()
|
||||
.filter_level(log::LevelFilter::Trace)
|
||||
.filter_module("tungstenite", log::LevelFilter::Debug)
|
||||
.filter_module("tokio_tungstenite", log::LevelFilter::Debug)
|
||||
.init();
|
||||
*/
|
||||
let db = prepare_test_db().await.unwrap();
|
||||
let brain_channel = run_service_for_stream().await.unwrap();
|
||||
let daemon_key = mock_app_daemon(&brain_channel, None).await.unwrap();
|
||||
|
||||
let key = Key::new();
|
||||
|
||||
let mut new_app_req = app_proto::NewAppReq {
|
||||
admin_pubkey: key.pubkey.clone(),
|
||||
node_pubkey: daemon_key.clone(),
|
||||
price_per_unit: 1200,
|
||||
resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }),
|
||||
locked_nano: 100,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone());
|
||||
let new_app_resp = client_app_cli.new_app(key.sign_request(new_app_req.clone()).unwrap()).await;
|
||||
assert!(new_app_resp.is_err());
|
||||
let new_app_err = new_app_resp.err().unwrap();
|
||||
assert!(new_app_err.to_string().contains("Insufficient funds"));
|
||||
|
||||
let airdrop_amount = 10;
|
||||
airdrop(&brain_channel, &key.pubkey, airdrop_amount).await.unwrap();
|
||||
|
||||
let new_app_resp = client_app_cli
|
||||
.new_app(key.sign_request(new_app_req.clone()).unwrap())
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
let active_app =
|
||||
db.select::<Option<db::ActiveApp>>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap();
|
||||
assert!(active_app.is_some());
|
||||
|
||||
let daemon_key_02 =
|
||||
mock_app_daemon(&brain_channel, Some("something went wrong 01".to_string())).await.unwrap();
|
||||
|
||||
new_app_req.node_pubkey = daemon_key_02.clone();
|
||||
|
||||
let new_app_resp = client_app_cli
|
||||
.new_app(key.sign_request(new_app_req.clone()).unwrap())
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
assert!(!new_app_resp.error.is_empty());
|
||||
|
||||
let app_req_db =
|
||||
db.select::<Option<db::NewAppReq>>((NEW_APP_REQ, new_app_resp.uuid)).await.unwrap();
|
||||
|
||||
assert!(!app_req_db.unwrap().error.is_empty());
|
||||
|
||||
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
|
||||
assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100));
|
||||
assert_eq!(acc_db.tmp_locked, 0);
|
||||
|
||||
let locking_nano = 1288;
|
||||
new_app_req.node_pubkey = daemon_key;
|
||||
new_app_req.locked_nano = locking_nano;
|
||||
|
||||
let new_app_resp =
|
||||
client_app_cli.new_app(key.sign_request(new_app_req).unwrap()).await.unwrap().into_inner();
|
||||
assert!(new_app_resp.error.is_empty());
|
||||
|
||||
let active_app =
|
||||
db.select::<Option<db::ActiveApp>>((ACTIVE_APP, new_app_resp.uuid)).await.unwrap();
|
||||
assert!(active_app.is_some());
|
||||
|
||||
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
|
||||
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100));
|
||||
assert_eq!(acc_db.tmp_locked, 0);
|
||||
}
|
@ -73,7 +73,7 @@ async fn test_vm_creation() {
|
||||
}
|
||||
|
||||
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
|
||||
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL);
|
||||
assert_eq!(acc_db.balance, (airdrop_amount * TOKEN_DECIMAL - 100));
|
||||
assert_eq!(acc_db.tmp_locked, 0);
|
||||
|
||||
new_vm_req.node_pubkey = daemon_key;
|
||||
@ -87,7 +87,7 @@ async fn test_vm_creation() {
|
||||
assert!(active_vm.is_some());
|
||||
|
||||
let acc_db: db::Account = db.select((ACCOUNT, key.pubkey.clone())).await.unwrap().unwrap();
|
||||
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - locking_nano);
|
||||
assert_eq!(acc_db.balance, airdrop_amount * TOKEN_DECIMAL - (locking_nano + 100));
|
||||
assert_eq!(acc_db.tmp_locked, 0);
|
||||
}
|
||||
|
||||
@ -109,7 +109,7 @@ async fn test_timeout_vm_creation() {
|
||||
node_pubkey: daemon_key.pubkey,
|
||||
price_per_unit: 1200,
|
||||
extra_ports: vec![8080, 8081],
|
||||
locked_nano: 0,
|
||||
locked_nano: 100,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
@ -39,7 +39,7 @@ async fn test_brain_message() {
|
||||
node_pubkey: daemon_key,
|
||||
price_per_unit: 1200,
|
||||
extra_ports: vec![8080, 8081],
|
||||
locked_nano: 0,
|
||||
locked_nano: 100,
|
||||
..Default::default()
|
||||
};
|
||||
airdrop(&brain_channel, &cli_key.pubkey, 10).await.unwrap();
|
||||
|
Loading…
Reference in New Issue
Block a user