Compare commits

...

5 Commits

Author SHA1 Message Date
933e3cb893
Fix db migration
migrating all db schemas
integrate app data in migration
fix vm node registration to new architecture
2025-05-20 20:34:01 +05:30
952ea971ca
kick contract transaction
one single transaction query to execute all the kick operation
Remove unused fields from deleted_app schema
a basic test for kick and possibilities
2025-05-20 20:34:00 +05:30
dd8710f4a0
kick contract implemented
app pricing calculation
add node in kick schema and type
improved handling
Clone on all app types
handle expected error on kick contract
validate both app and vm contracts
2025-05-20 20:34:00 +05:30
d1e5244a18
register operator 2025-05-20 20:33:02 +05:30
198f43f472
Fix: vm creating error
catching Insufficient fund error from transaction
returning approprate error variant
2025-05-20 20:20:02 +05:30
12 changed files with 772 additions and 321 deletions

@ -129,7 +129,7 @@ operators:
app_nodes: [] app_nodes: []
7V3rEuh6j8VuwMVB5PyGqWKLmjJ4fYSv6WtrTL51NZTB: 7V3rEuh6j8VuwMVB5PyGqWKLmjJ4fYSv6WtrTL51NZTB:
escrow: 0 escrow: 0
email: '' email: ""
banned_users: [] banned_users: []
vm_nodes: [] vm_nodes: []
app_nodes: app_nodes:
@ -238,7 +238,7 @@ vm_contracts:
node_pubkey: Du3UfPSUUZmA5thQmc9Vrxdy7UimpygcpDsQNnwRQPtu node_pubkey: Du3UfPSUUZmA5thQmc9Vrxdy7UimpygcpDsQNnwRQPtu
exposed_ports: [] exposed_ports: []
public_ipv4: 156.146.63.216 public_ipv4: 156.146.63.216
public_ipv6: '' public_ipv6: ""
disk_size_gb: 10 disk_size_gb: 10
vcpus: 2 vcpus: 2
memory_mb: 3000 memory_mb: 3000
@ -255,7 +255,7 @@ vm_contracts:
node_pubkey: 7Xw3RxbP5pvfjZ8U6yA3HHVSS9YXjKH5Vkas3JRbQYd9 node_pubkey: 7Xw3RxbP5pvfjZ8U6yA3HHVSS9YXjKH5Vkas3JRbQYd9
exposed_ports: [] exposed_ports: []
public_ipv4: 173.234.136.154 public_ipv4: 173.234.136.154
public_ipv6: '' public_ipv6: ""
disk_size_gb: 10 disk_size_gb: 10
vcpus: 2 vcpus: 2
memory_mb: 3000 memory_mb: 3000
@ -272,8 +272,8 @@ vm_contracts:
node_pubkey: 7Xw3RxbP5pvfjZ8U6yA3HHVSS9YXjKH5Vkas3JRbQYd9 node_pubkey: 7Xw3RxbP5pvfjZ8U6yA3HHVSS9YXjKH5Vkas3JRbQYd9
exposed_ports: exposed_ports:
- 38288 - 38288
public_ipv4: '' public_ipv4: ""
public_ipv6: '' public_ipv6: ""
disk_size_gb: 10 disk_size_gb: 10
vcpus: 1 vcpus: 1
memory_mb: 1000 memory_mb: 1000
@ -290,7 +290,7 @@ vm_contracts:
node_pubkey: DgkbsrwttkZXvzxY5kDwQQoDd79GLmZ5tc7fYJUFkQQb node_pubkey: DgkbsrwttkZXvzxY5kDwQQoDd79GLmZ5tc7fYJUFkQQb
exposed_ports: [] exposed_ports: []
public_ipv4: 149.22.95.2 public_ipv4: 149.22.95.2
public_ipv6: '' public_ipv6: ""
disk_size_gb: 10 disk_size_gb: 10
vcpus: 2 vcpus: 2
memory_mb: 3000 memory_mb: 3000
@ -307,7 +307,7 @@ vm_contracts:
node_pubkey: Du3UfPSUUZmA5thQmc9Vrxdy7UimpygcpDsQNnwRQPtu node_pubkey: Du3UfPSUUZmA5thQmc9Vrxdy7UimpygcpDsQNnwRQPtu
exposed_ports: [] exposed_ports: []
public_ipv4: 156.146.63.217 public_ipv4: 156.146.63.217
public_ipv6: '' public_ipv6: ""
disk_size_gb: 10 disk_size_gb: 10
vcpus: 2 vcpus: 2
memory_mb: 3000 memory_mb: 3000
@ -324,7 +324,7 @@ vm_contracts:
node_pubkey: DgkbsrwttkZXvzxY5kDwQQoDd79GLmZ5tc7fYJUFkQQb node_pubkey: DgkbsrwttkZXvzxY5kDwQQoDd79GLmZ5tc7fYJUFkQQb
exposed_ports: [] exposed_ports: []
public_ipv4: 149.22.95.2 public_ipv4: 149.22.95.2
public_ipv6: '' public_ipv6: ""
disk_size_gb: 30 disk_size_gb: 30
vcpus: 1 vcpus: 1
memory_mb: 1000 memory_mb: 1000
@ -341,7 +341,7 @@ vm_contracts:
node_pubkey: 3zRxiGRnf46vd3zAEmpaYBJocTV9oJB6yXf5GZFR1Sq4 node_pubkey: 3zRxiGRnf46vd3zAEmpaYBJocTV9oJB6yXf5GZFR1Sq4
exposed_ports: [] exposed_ports: []
public_ipv4: 149.36.48.100 public_ipv4: 149.36.48.100
public_ipv6: '' public_ipv6: ""
disk_size_gb: 10 disk_size_gb: 10
vcpus: 4 vcpus: 4
memory_mb: 4000 memory_mb: 4000
@ -358,8 +358,8 @@ vm_contracts:
node_pubkey: 3zRxiGRnf46vd3zAEmpaYBJocTV9oJB6yXf5GZFR1Sq4 node_pubkey: 3zRxiGRnf46vd3zAEmpaYBJocTV9oJB6yXf5GZFR1Sq4
exposed_ports: exposed_ports:
- 46393 - 46393
public_ipv4: '' public_ipv4: ""
public_ipv6: '' public_ipv6: ""
disk_size_gb: 10 disk_size_gb: 10
vcpus: 1 vcpus: 1
memory_mb: 1000 memory_mb: 1000
@ -384,4 +384,57 @@ app_nodes:
max_ports_per_app: 9 max_ports_per_app: 9
price: 20000 price: 20000
offline_minutes: 0 offline_minutes: 0
app_contracts: [] app_contracts:
- uuid: e3d01f25-2b2a-410b-80e3-12f44e474334
package_url: https://registry.detee.ltd/sgx/packages/base_package_2025-04-17_11-01-08.tar.gz
admin_pubkey: H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc
node_pubkey: BiqoPUEoAxYxMRXUmyofoS9H1TBQgQqvLJ6MbWh88AQg
mapped_ports:
- - 27158
- 34500
- - 28667
- 8080
host_ipv4: 212.95.45.139
disk_size_mb: 1000
vcpus: 1
memory_mb: 1000
created_at: 2025-04-21T11:27:28.833236909Z
updated_at: 2025-04-21T11:27:28.833237729Z
price_per_unit: 200000
locked_nano: 121200000
collected_at: 2025-04-21T11:28:24.905665571Z
hratls_pubkey: 7E0F887AA6BB9104EEC1066F454D4C2D9063D676715F55F919D3FBCEDC63240B
public_package_mr_enclave:
- 52
- 183
- 102
- 210
- 251
- 219
- 218
- 140
- 168
- 118
- 10
- 193
- 98
- 240
- 147
- 124
- 240
- 189
- 46
- 95
- 138
- 172
- 15
- 246
- 227
- 114
- 70
- 159
- 232
- 212
- 9
- 234
app_name: diligent-seahorse

@ -5,7 +5,8 @@ pub const CERT_PATH: &str = "/etc/detee/brain/brain-crt.pem";
pub const CERT_KEY_PATH: &str = "/etc/detee/brain/brain-key.pem"; pub const CERT_KEY_PATH: &str = "/etc/detee/brain/brain-key.pem";
pub const CONFIG_PATH: &str = "/etc/detee/brain/config.ini"; pub const CONFIG_PATH: &str = "/etc/detee/brain/config.ini";
pub const DB_SCHEMA_FILE: &str = "interim_tables.surql"; pub const DB_SCHEMA_FILES: [&str; 3] =
["surql/tables.sql", "surql/timer.sql", "surql/functions.sql"];
pub static ADMIN_ACCOUNTS: LazyLock<Vec<String>> = LazyLock::new(|| { pub static ADMIN_ACCOUNTS: LazyLock<Vec<String>> = LazyLock::new(|| {
let default_admin_keys = vec![ let default_admin_keys = vec![
@ -23,6 +24,8 @@ pub static ADMIN_ACCOUNTS: LazyLock<Vec<String>> = LazyLock::new(|| {
pub const OLD_BRAIN_DATA_PATH: &str = "./saved_data.yaml"; pub const OLD_BRAIN_DATA_PATH: &str = "./saved_data.yaml";
pub const ACCOUNT: &str = "account"; pub const ACCOUNT: &str = "account";
pub const KICK: &str = "kick";
pub const VM_NODE: &str = "vm_node"; pub const VM_NODE: &str = "vm_node";
pub const ACTIVE_VM: &str = "active_vm"; pub const ACTIVE_VM: &str = "active_vm";
pub const VM_UPDATE_EVENT: &str = "vm_update_event"; pub const VM_UPDATE_EVENT: &str = "vm_update_event";
@ -42,3 +45,6 @@ pub const ID_ALPHABET: [char; 62] = [
'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U',
'V', 'W', 'X', 'Y', 'Z', 'V', 'W', 'X', 'Y', 'Z',
]; ];
pub const MIN_ESCROW: u64 = 5000;
pub const TOKEN_DECIMAL: u64 = 1_000_000_000;

@ -12,7 +12,7 @@ use surrealdb::sql::Datetime;
use surrealdb::{Notification, RecordId, Surreal}; use surrealdb::{Notification, RecordId, Surreal};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AppNode { pub struct AppNode {
pub id: RecordId, pub id: RecordId,
pub operator: RecordId, pub operator: RecordId,
@ -32,8 +32,9 @@ pub struct AppNode {
impl AppNode { impl AppNode {
pub async fn register(self, db: &Surreal<Client>) -> Result<AppNode, Error> { pub async fn register(self, db: &Surreal<Client>) -> Result<AppNode, Error> {
db::Account::get_or_create(db, &self.operator.key().to_string()).await?; db::Account::get_or_create(db, &self.operator.key().to_string()).await?;
let app_node: Option<AppNode> = db.upsert(self.id.clone()).content(self).await?; let app_node_id = self.id.clone();
app_node.ok_or(Error::FailedToCreateDBEntry) let app_node: Option<AppNode> = db.upsert(app_node_id.clone()).content(self).await?;
app_node.ok_or(Error::FailedToCreateDBEntry(format!("{APP_NODE}:{app_node_id}")))
} }
} }
@ -54,7 +55,7 @@ impl From<DeletedApp> for AppDaemonMsg {
} }
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct NewAppReq { pub struct NewAppReq {
pub id: RecordId, pub id: RecordId,
#[serde(rename = "in")] #[serde(rename = "in")]
@ -97,6 +98,7 @@ impl NewAppReq {
} }
pub async fn submit(self, db: &Surreal<Client>) -> Result<Vec<Self>, Error> { pub async fn submit(self, db: &Surreal<Client>) -> Result<Vec<Self>, Error> {
// TODO: handle financial transaction
let new_app_req: Vec<Self> = db.insert(NEW_APP_REQ).relation(self).await?; let new_app_req: Vec<Self> = db.insert(NEW_APP_REQ).relation(self).await?;
Ok(new_app_req) Ok(new_app_req)
} }
@ -164,7 +166,7 @@ impl AppNodeWithReports {
} }
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ActiveApp { pub struct ActiveApp {
pub id: RecordId, pub id: RecordId,
#[serde(rename = "in")] #[serde(rename = "in")]
@ -172,11 +174,11 @@ pub struct ActiveApp {
#[serde(rename = "out")] #[serde(rename = "out")]
pub app_node: RecordId, pub app_node: RecordId,
pub app_name: String, pub app_name: String,
pub mapped_ports: Vec<(u64, u64)>, pub mapped_ports: Vec<(u32, u32)>,
pub host_ipv4: String, pub host_ipv4: String,
pub vcpus: u64, pub vcpus: u32,
pub memory_mb: u64, pub memory_mb: u32,
pub disk_size_gb: u64, pub disk_size_gb: u32,
pub created_at: Datetime, pub created_at: Datetime,
pub price_per_unit: u64, pub price_per_unit: u64,
pub locked_nano: u64, pub locked_nano: u64,
@ -210,6 +212,15 @@ impl From<ActiveApp> for DeletedApp {
} }
impl ActiveApp { impl ActiveApp {
pub fn price_per_minute(&self) -> u64 {
(self.total_units() * self.price_per_unit as f64) as u64
}
fn total_units(&self) -> f64 {
// TODO: Optimize this based on price of hardware.
(self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64)
}
pub async fn activate(db: &Surreal<Client>, id: &str) -> Result<(), Error> { pub async fn activate(db: &Surreal<Client>, id: &str) -> Result<(), Error> {
let new_app_req = match NewAppReq::get(db, id).await? { let new_app_req = match NewAppReq::get(db, id).await? {
Some(r) => r, Some(r) => r,
@ -223,9 +234,9 @@ impl ActiveApp {
app_name: new_app_req.app_name, app_name: new_app_req.app_name,
mapped_ports: vec![], mapped_ports: vec![],
host_ipv4: String::new(), host_ipv4: String::new(),
vcpus: new_app_req.vcpu as u64, vcpus: new_app_req.vcpu,
memory_mb: new_app_req.memory_mb as u64, memory_mb: new_app_req.memory_mb,
disk_size_gb: new_app_req.disk_mb as u64, disk_size_gb: new_app_req.disk_mb,
created_at: new_app_req.created_at.clone(), created_at: new_app_req.created_at.clone(),
price_per_unit: new_app_req.price_per_unit, price_per_unit: new_app_req.price_per_unit,
locked_nano: new_app_req.locked_nano, locked_nano: new_app_req.locked_nano,
@ -293,7 +304,7 @@ impl ActiveApp {
} }
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ActiveAppWithNode { pub struct ActiveAppWithNode {
pub id: RecordId, pub id: RecordId,
#[serde(rename = "in")] #[serde(rename = "in")]
@ -301,11 +312,11 @@ pub struct ActiveAppWithNode {
#[serde(rename = "out")] #[serde(rename = "out")]
pub app_node: AppNode, pub app_node: AppNode,
pub app_name: String, pub app_name: String,
pub mapped_ports: Vec<(u64, u64)>, pub mapped_ports: Vec<(u32, u32)>,
pub host_ipv4: String, pub host_ipv4: String,
pub vcpus: u64, pub vcpus: u32,
pub memory_mb: u64, pub memory_mb: u32,
pub disk_size_gb: u64, pub disk_size_gb: u32,
pub created_at: Datetime, pub created_at: Datetime,
pub price_per_unit: u64, pub price_per_unit: u64,
pub locked_nano: u64, pub locked_nano: u64,
@ -315,6 +326,29 @@ pub struct ActiveAppWithNode {
pub hratls_pubkey: String, pub hratls_pubkey: String,
} }
impl From<ActiveAppWithNode> for ActiveApp {
fn from(val: ActiveAppWithNode) -> Self {
Self {
id: val.id,
admin: val.admin,
app_node: val.app_node.id,
app_name: val.app_name,
mapped_ports: val.mapped_ports,
host_ipv4: val.host_ipv4,
vcpus: val.vcpus,
memory_mb: val.memory_mb,
disk_size_gb: val.disk_size_gb,
created_at: val.created_at,
price_per_unit: val.price_per_unit,
locked_nano: val.locked_nano,
collected_at: val.collected_at,
mr_enclave: val.mr_enclave,
package_url: val.package_url,
hratls_pubkey: val.hratls_pubkey,
}
}
}
impl ActiveAppWithNode { impl ActiveAppWithNode {
pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> { pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> {
let contract: Option<Self> = let contract: Option<Self> =
@ -393,7 +427,7 @@ impl From<&old_brain::BrainData> for Vec<AppNode> {
let mut nodes = Vec::new(); let mut nodes = Vec::new();
for old_node in old_data.app_nodes.iter() { for old_node in old_data.app_nodes.iter() {
nodes.push(AppNode { nodes.push(AppNode {
id: RecordId::from(("app_node", old_node.node_pubkey.clone())), id: RecordId::from((APP_NODE, old_node.node_pubkey.clone())),
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
country: old_node.country.clone(), country: old_node.country.clone(),
region: old_node.region.clone(), region: old_node.region.clone(),
@ -412,6 +446,46 @@ impl From<&old_brain::BrainData> for Vec<AppNode> {
} }
} }
impl From<&old_brain::BrainData> for Vec<ActiveApp> {
fn from(old_data: &old_brain::BrainData) -> Self {
let mut contracts = Vec::new();
for old_c in old_data.app_contracts.iter() {
let mut mapped_ports = Vec::new();
for port in old_c.mapped_ports.clone().into_iter().map(|(b, c)| (b as u32, c as u32)) {
mapped_ports.push(port);
}
let mr_enclave_hex = old_c
.public_package_mr_enclave
.clone()
.unwrap_or_default()
.iter()
.map(|byte| format!("{:02X}", byte))
.collect();
contracts.push(ActiveApp {
id: RecordId::from((ACTIVE_APP, old_c.uuid.replace("-", ""))),
admin: RecordId::from((ACCOUNT, old_c.admin_pubkey.clone())),
app_node: RecordId::from((APP_NODE, old_c.node_pubkey.clone())),
mapped_ports,
host_ipv4: old_c.host_ipv4.clone(),
disk_size_gb: old_c.disk_size_mb * 1024,
vcpus: old_c.vcpus,
memory_mb: old_c.memory_mb,
price_per_unit: old_c.price_per_unit,
locked_nano: old_c.locked_nano,
created_at: old_c.created_at.into(),
collected_at: old_c.collected_at.into(),
app_name: old_c.app_name.clone(),
mr_enclave: mr_enclave_hex,
package_url: old_c.package_url.clone(),
hratls_pubkey: old_c.hratls_pubkey.clone(),
});
}
contracts
}
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct DeletedApp { pub struct DeletedApp {
pub id: RecordId, pub id: RecordId,
@ -420,11 +494,11 @@ pub struct DeletedApp {
#[serde(rename = "out")] #[serde(rename = "out")]
pub app_node: RecordId, pub app_node: RecordId,
pub app_name: String, pub app_name: String,
pub mapped_ports: Vec<(u64, u64)>, pub mapped_ports: Vec<(u32, u32)>,
pub host_ipv4: String, pub host_ipv4: String,
pub vcpus: u64, pub vcpus: u32,
pub memory_mb: u64, pub memory_mb: u32,
pub disk_size_gb: u64, pub disk_size_gb: u32,
pub created_at: Datetime, pub created_at: Datetime,
pub price_per_unit: u64, pub price_per_unit: u64,
pub locked_nano: u64, pub locked_nano: u64,

@ -1,7 +1,6 @@
use crate::constants::ACCOUNT;
use crate::db::prelude::*;
use super::Error; use super::Error;
use crate::constants::{ACCOUNT, KICK, MIN_ESCROW, TOKEN_DECIMAL};
use crate::db::prelude::*;
use crate::old_brain; use crate::old_brain;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::Client; use surrealdb::engine::remote::ws::Client;
@ -37,7 +36,7 @@ impl Account {
Some(account) => Ok(account), Some(account) => Ok(account),
None => { None => {
let account: Option<Self> = db.create(id).await?; let account: Option<Self> = db.create(id).await?;
account.ok_or(Error::FailedToCreateDBEntry) account.ok_or(Error::FailedToCreateDBEntry(ACCOUNT.to_string()))
} }
} }
} }
@ -49,6 +48,33 @@ impl Account {
.await?; .await?;
Ok(()) Ok(())
} }
pub async fn save(self, db: &Surreal<Client>) -> Result<Option<Self>, Error> {
let account: Option<Self> = db.upsert(self.id.clone()).content(self).await?;
Ok(account)
}
pub async fn operator_reg(
db: &Surreal<Client>,
wallet: &str,
email: &str,
escrow: u64,
) -> Result<(), Error> {
if escrow < MIN_ESCROW {
return Err(Error::MinimalEscrow);
}
let mut op_account = Self::get(db, wallet).await?;
let escrow = escrow.saturating_mul(TOKEN_DECIMAL);
let op_total_balance = op_account.balance.saturating_add(op_account.escrow);
if op_total_balance < escrow {
return Err(Error::InsufficientFunds);
}
op_account.email = email.to_string();
op_account.balance = op_total_balance.saturating_sub(escrow);
op_account.escrow = escrow;
op_account.save(db).await?;
Ok(())
}
} }
impl Account { impl Account {
@ -84,7 +110,7 @@ impl From<&old_brain::BrainData> for Vec<Account> {
let mut accounts = Vec::new(); let mut accounts = Vec::new();
for old_account in old_data.accounts.iter() { for old_account in old_data.accounts.iter() {
let mut a = Account { let mut a = Account {
id: RecordId::from(("account", old_account.key())), id: RecordId::from((ACCOUNT, old_account.key())),
balance: old_account.value().balance, balance: old_account.value().balance,
tmp_locked: old_account.value().tmp_locked, tmp_locked: old_account.value().tmp_locked,
escrow: 0, escrow: 0,
@ -120,6 +146,24 @@ pub struct Kick {
created_at: Datetime, created_at: Datetime,
reason: String, reason: String,
contract: RecordId, contract: RecordId,
node: RecordId,
}
impl Kick {
pub async fn kicked_in_a_day(db: &Surreal<Client>, account: &str) -> Result<Vec<Self>, Error> {
let mut result = db
.query(format!(
"select * from {KICK} where out = {ACCOUNT}:{account} and created_at > time::now() - 24h;"
))
.await?;
let kicks: Vec<Self> = result.take(0)?;
Ok(kicks)
}
pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> {
let _: Vec<Self> = db.insert(KICK).relation(self).await?;
Ok(())
}
} }
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
@ -158,7 +202,7 @@ impl Report {
/// This is the operator obtained from the DB, /// This is the operator obtained from the DB,
/// however the relation is defined using OperatorRelation /// however the relation is defined using OperatorRelation
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Operator { pub struct Operator {
pub account: RecordId, pub account: RecordId,
pub app_nodes: u64, pub app_nodes: u64,
@ -231,3 +275,149 @@ impl Operator {
Ok((operator, vm_nodes, app_nodes)) Ok((operator, vm_nodes, app_nodes))
} }
} }
pub enum WrapperContract {
Vm(ActiveVmWithNode),
App(ActiveAppWithNode),
}
impl WrapperContract {
pub async fn kick_contract(
db: &Surreal<Client>,
operator_wallet: &str,
contract_uuid: &str,
reason: &str,
) -> Result<u64, Error> {
let (
operator_id,
admin_id,
contract_id,
collected_at,
price_per_mint,
deleted_table,
platform_specific_query,
) = if let Some(active_vm) = ActiveVmWithNode::get_by_uuid(db, contract_uuid).await? {
let price_per_minute = active_vm.price_per_minute();
(
active_vm.vm_node.operator,
active_vm.admin,
active_vm.id,
active_vm.collected_at,
price_per_minute,
"deleted_vm",
"
hostname = $contract.hostname,
public_ipv4 = $contract.public_ipv4,
public_ipv6 = $contract.public_ipv6,
dtrfs_sha = $contract.dtrfs_sha,
kernel_sha = $contract.kernel_sha,
",
)
} else if let Some(active_app) = ActiveAppWithNode::get_by_uuid(db, contract_uuid).await? {
let price_per_minute = Into::<ActiveApp>::into(active_app.clone()).price_per_minute();
(
active_app.app_node.operator,
active_app.admin,
active_app.id,
active_app.collected_at,
price_per_minute,
"deleted_app",
"
app_name = $contract.app_name,
host_ipv4 = $contract.host_ipv4,
mr_enclave = $contract.mr_enclave,
package_url= $contract.package_url,
hratls_pubkey = $contract.hratls_pubkey,
",
)
} else {
return Err(Error::ContractNotFound);
};
let operator = operator_id.key().to_string();
let admin = admin_id.key().to_string();
if operator != operator_wallet {
return Err(Error::AccessDenied);
}
let mut minutes_to_refund =
chrono::Utc::now().signed_duration_since(*collected_at).num_minutes().unsigned_abs();
let one_week_minute = 10080;
if minutes_to_refund > one_week_minute {
minutes_to_refund = one_week_minute;
}
let refund_amount = minutes_to_refund * price_per_mint;
log::debug!("Removing {refund_amount} escrow from {} and giving it to {}", operator, admin);
let transaction_query = format!(
"
BEGIN TRANSACTION;
LET $contract = {contract_id};
LET $operator_account = {operator_id};
LET $reason = '{reason}';
LET $refund_amount = {refund_amount};
LET $deleted_contract = {deleted_table}:{contract_uuid};
LET $id = record::id($contract.id);
LET $admin = $contract.in;
LET $node = $contract.out;
-- move contract into deleted state
RELATE $admin->{deleted_table}->$node
SET id = $id,
{platform_specific_query}
mapped_ports = $contract.mapped_ports,
disk_size_gb = $contract.disk_size_gb,
vcpus = $contract.vcpus,
memory_mb = $contract.memory_mb,
created_at = $contract.created_at,
deleted_at = time::now(),
price_per_unit = $contract.price_per_unit
;
DELETE $contract;
-- calculating refund amount
LET $refund = IF SELECT * FROM {KICK} WHERE out = $admin.id AND created_at > time::now() - 24h {{
0
}} ELSE IF $operator_account.escrow <= $refund_amount {{
$operator_account.escrow
}} ELSE {{
$refund_amount;
}};
RELATE $operator_account->{KICK}->$admin
SET id = $id,
reason = $reason,
contract = $deleted_contract,
node = $node,
created_at = time::now()
;
-- update balances
UPDATE $operator_account SET escrow -= $refund;
IF $operator_account.escrow < 0 {{
THROW 'Insufficient funds.'
}};
UPDATE $admin SET balance += $refund;
SELECT * FROM $refund;
COMMIT TRANSACTION;
",
);
log::trace!("kick_contract transaction_query: {}", &transaction_query);
let refunded: Option<u64> = db.query(transaction_query).await?.take(14)?;
let refunded_amount = refunded.ok_or(Error::FailedToCreateDBEntry("Refund".to_string()))?;
Ok(refunded_amount)
}
}

@ -2,7 +2,10 @@ pub mod app;
pub mod general; pub mod general;
pub mod vm; pub mod vm;
use crate::constants::{APP_NODE, DELETED_APP, DELETED_VM, NEW_APP_REQ, NEW_VM_REQ, UPDATE_VM_REQ}; use crate::constants::{
APP_NODE, DB_SCHEMA_FILES, DELETED_APP, DELETED_VM, MIN_ESCROW, NEW_APP_REQ, NEW_VM_REQ,
UPDATE_VM_REQ,
};
use crate::old_brain; use crate::old_brain;
use prelude::*; use prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -22,14 +25,24 @@ pub enum Error {
StdIo(#[from] std::io::Error), StdIo(#[from] std::io::Error),
#[error(transparent)] #[error(transparent)]
TimeOut(#[from] tokio::time::error::Elapsed), TimeOut(#[from] tokio::time::error::Elapsed),
#[error("Failed to create account")] #[error("Failed to create {0}")]
FailedToCreateDBEntry, FailedToCreateDBEntry(String),
#[error("Unknown Table: {0}")] #[error("Unknown Table: {0}")]
UnknownTable(String), UnknownTable(String),
#[error("Daemon channel got closed: {0}")] #[error("Daemon channel got closed: {0}")]
AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>), AppDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<AppDaemonMsg>),
#[error("AppDaemon Error {0}")] #[error("AppDaemon Error {0}")]
NewAppDaemonResp(String), NewAppDaemonResp(String),
#[error("Minimum escrow amount is {MIN_ESCROW}")]
MinimalEscrow,
#[error("Insufficient funds, deposit more tokens")]
InsufficientFunds,
#[error("Contract not found")]
ContractNotFound,
#[error("Access denied")]
AccessDenied,
#[error("Failed to delete contract {0}")]
FailedToDeleteContract(String),
} }
pub mod prelude { pub mod prelude {
@ -61,10 +74,12 @@ pub async fn migration0(
let accounts: Vec<Account> = old_data.into(); let accounts: Vec<Account> = old_data.into();
let vm_nodes: Vec<VmNode> = old_data.into(); let vm_nodes: Vec<VmNode> = old_data.into();
let app_nodes: Vec<AppNode> = old_data.into(); let app_nodes: Vec<AppNode> = old_data.into();
let vm_contracts: Vec<ActiveVm> = old_data.into(); let active_vm: Vec<ActiveVm> = old_data.into();
let active_app: Vec<ActiveApp> = old_data.into();
let schema = std::fs::read_to_string(crate::constants::DB_SCHEMA_FILE)?; for schema in DB_SCHEMA_FILES.map(std::fs::read_to_string) {
db.query(schema).await?; db.query(schema?).await?;
}
println!("Inserting accounts..."); println!("Inserting accounts...");
let _: Vec<Account> = db.insert(()).content(accounts).await?; let _: Vec<Account> = db.insert(()).content(accounts).await?;
@ -72,8 +87,10 @@ pub async fn migration0(
let _: Vec<VmNode> = db.insert(()).content(vm_nodes).await?; let _: Vec<VmNode> = db.insert(()).content(vm_nodes).await?;
println!("Inserting app nodes..."); println!("Inserting app nodes...");
let _: Vec<AppNode> = db.insert(()).content(app_nodes).await?; let _: Vec<AppNode> = db.insert(()).content(app_nodes).await?;
println!("Inserting vm contracts..."); println!("Inserting active vm contracts...");
let _: Vec<ActiveVm> = db.insert("vm_contract").relation(vm_contracts).await?; let _: Vec<ActiveVm> = db.insert(()).relation(active_vm).await?;
println!("Inserting app contracts...");
let _: Vec<ActiveApp> = db.insert(()).relation(active_app).await?;
Ok(()) Ok(())
} }

@ -239,7 +239,18 @@ impl NewVmReq {
self.price_per_unit self.price_per_unit
); );
//let _: Vec<Self> = db.insert(NEW_VM_REQ).relation(self).await?; //let _: Vec<Self> = db.insert(NEW_VM_REQ).relation(self).await?;
db.query(query).await?; let mut query_resp = db.query(query).await?;
let resp_err = query_resp.take_errors();
if let Some(insufficient_funds_error) = resp_err.get(&1) {
if let surrealdb::Error::Api(surrealdb::error::Api::Query(tx_query_error)) =
insufficient_funds_error
{
log::error!("Transaction error: {}", tx_query_error);
return Err(Error::InsufficientFunds);
}
}
Ok(()) Ok(())
} }
} }
@ -276,6 +287,7 @@ impl WrappedMeasurement {
if let Some(args) = args { if let Some(args) = args {
return Ok(Self::Args(vm_id.to_string(), args)); return Ok(Self::Args(vm_id.to_string(), args));
} }
log::trace!("listening for table: {table}");
tokio::time::timeout(Duration::from_secs(10), async { tokio::time::timeout(Duration::from_secs(10), async {
loop { loop {
@ -716,6 +728,29 @@ pub struct ActiveVmWithNode {
pub collected_at: Datetime, pub collected_at: Datetime,
} }
impl From<ActiveVmWithNode> for ActiveVm {
fn from(val: ActiveVmWithNode) -> Self {
Self {
id: val.id,
admin: val.admin,
vm_node: val.vm_node.id,
hostname: val.hostname,
mapped_ports: val.mapped_ports,
public_ipv4: val.public_ipv4,
public_ipv6: val.public_ipv6,
disk_size_gb: val.disk_size_gb,
vcpus: val.vcpus,
memory_mb: val.memory_mb,
dtrfs_sha: val.dtrfs_sha,
kernel_sha: val.kernel_sha,
created_at: val.created_at,
price_per_unit: val.price_per_unit,
locked_nano: val.locked_nano,
collected_at: val.collected_at,
}
}
}
impl ActiveVmWithNode { impl ActiveVmWithNode {
pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> { pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> {
let contract: Option<Self> = let contract: Option<Self> =

@ -107,24 +107,52 @@ impl BrainGeneralCli for GeneralCliServer {
async fn register_operator( async fn register_operator(
&self, &self,
_req: Request<RegOperatorReq>, req: Request<RegOperatorReq>,
) -> Result<Response<Empty>, Status> { ) -> Result<Response<Empty>, Status> {
todo!(); let req = check_sig_from_req(req)?;
// let req = check_sig_from_req(req)?; log::info!("Regitering new operator: {req:?}");
// info!("Regitering new operator: {req:?}"); match db::Account::operator_reg(&self.db, &req.pubkey, &req.email, req.escrow).await {
// match self.data.register_operator(req) { Ok(()) => Ok(Response::new(Empty {})),
// Ok(()) => Ok(Response::new(Empty {})), Err(e) if matches!(e, db::Error::InsufficientFunds | db::Error::MinimalEscrow) => {
// Err(e) => Err(Status::failed_precondition(e.to_string())), Err(Status::failed_precondition(e.to_string()))
// } }
Err(e) => {
log::info!("Failed to register operator: {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
} }
async fn kick_contract(&self, _req: Request<KickReq>) -> Result<Response<KickResp>, Status> { async fn kick_contract(&self, req: Request<KickReq>) -> Result<Response<KickResp>, Status> {
todo!(); let req = check_sig_from_req(req)?;
// let req = check_sig_from_req(req)?; match db::WrapperContract::kick_contract(
// match self.data.kick_contract(&req.operator_wallet, &req.contract_uuid, &req.reason).await { &self.db,
// Ok(nano_lp) => Ok(Response::new(KickResp { nano_lp })), &req.operator_wallet,
// Err(e) => Err(Status::permission_denied(e.to_string())), &req.contract_uuid,
// } &req.reason,
)
.await
{
Ok(nano_lp) => Ok(Response::new(KickResp { nano_lp })),
Err(e)
if matches!(
e,
db::Error::ContractNotFound
| db::Error::AccessDenied
| db::Error::FailedToDeleteContract(_)
) =>
{
Err(Status::failed_precondition(e.to_string()))
}
Err(e) => {
log::info!("Failed to kick contract: {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
} }
async fn ban_user(&self, _req: Request<BanUserReq>) -> Result<Response<Empty>, Status> { async fn ban_user(&self, _req: Request<BanUserReq>) -> Result<Response<Empty>, Status> {

@ -131,8 +131,6 @@ DEFINE FIELD disk_size_gb ON TABLE deleted_app TYPE int;
DEFINE FIELD created_at ON TABLE deleted_app TYPE datetime; DEFINE FIELD created_at ON TABLE deleted_app TYPE datetime;
DEFINE FIELD deleted_at ON TABLE deleted_app TYPE datetime; DEFINE FIELD deleted_at ON TABLE deleted_app TYPE datetime;
DEFINE FIELD price_per_unit ON TABLE deleted_app TYPE int; DEFINE FIELD price_per_unit ON TABLE deleted_app TYPE int;
DEFINE FIELD locked_nano ON TABLE deleted_app TYPE int;
DEFINE FIELD collected_at ON TABLE deleted_app TYPE datetime;
DEFINE FIELD mr_enclave ON TABLE deleted_app TYPE string; DEFINE FIELD mr_enclave ON TABLE deleted_app TYPE string;
DEFINE FIELD package_url ON TABLE deleted_app TYPE string; DEFINE FIELD package_url ON TABLE deleted_app TYPE string;
DEFINE FIELD hratls_pubkey ON TABLE deleted_app TYPE string; DEFINE FIELD hratls_pubkey ON TABLE deleted_app TYPE string;
@ -144,6 +142,7 @@ DEFINE TABLE kick TYPE RELATION FROM account TO account;
DEFINE FIELD created_at ON TABLE kick TYPE datetime; DEFINE FIELD created_at ON TABLE kick TYPE datetime;
DEFINE FIELD reason ON TABLE kick TYPE string; DEFINE FIELD reason ON TABLE kick TYPE string;
DEFINE FIELD contract ON TABLE kick TYPE record<deleted_vm|deleted_app>; DEFINE FIELD contract ON TABLE kick TYPE record<deleted_vm|deleted_app>;
DEFINE FIELD node ON TABLE kick TYPE record<vm_node|app_name>;
DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node; DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node;
DEFINE FIELD created_at ON TABLE report TYPE datetime; DEFINE FIELD created_at ON TABLE report TYPE datetime;

@ -6,6 +6,7 @@ use dotenv::dotenv;
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use surreal_brain::constants::DB_SCHEMA_FILES;
use surreal_brain::grpc::general::GeneralCliServer; use surreal_brain::grpc::general::GeneralCliServer;
use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer};
use surrealdb::engine::remote::ws::Client; use surrealdb::engine::remote::ws::Client;
@ -34,7 +35,9 @@ pub async fn prepare_test_db() -> Result<Surreal<Client>> {
.map_err(|e| anyhow!(e.to_string()))?; .map_err(|e| anyhow!(e.to_string()))?;
db.query(format!("REMOVE DATABASE {db_name}")).await?; db.query(format!("REMOVE DATABASE {db_name}")).await?;
db.query(std::fs::read_to_string("interim_tables.surql")?).await?; for schema in DB_SCHEMA_FILES.map(std::fs::read_to_string) {
db.query(schema?).await?;
}
surreal_brain::db::migration0(&db, &old_brain_data).await?; surreal_brain::db::migration0(&db, &old_brain_data).await?;
Ok::<(), anyhow::Error>(()) Ok::<(), anyhow::Error>(())
}) })

@ -37,7 +37,7 @@ pub async fn register_vm_node(
client: &mut BrainVmDaemonClient<Channel>, client: &mut BrainVmDaemonClient<Channel>,
key: &Key, key: &Key,
operator_wallet: &str, operator_wallet: &str,
) -> Result<Vec<vm_proto::VmContract>> { ) -> Result<Vec<vm_proto::DeleteVmReq>> {
log::info!("Registering vm_node: {}", key.pubkey); log::info!("Registering vm_node: {}", key.pubkey);
let node_pubkey = key.pubkey.clone(); let node_pubkey = key.pubkey.clone();
@ -53,18 +53,18 @@ pub async fn register_vm_node(
let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner(); let mut grpc_stream = client.register_vm_node(key.sign_request(req)?).await?.into_inner();
let mut vm_contracts = Vec::new(); let mut deleted_vm_reqs = Vec::new();
while let Some(stream_update) = grpc_stream.next().await { while let Some(stream_update) = grpc_stream.next().await {
match stream_update { match stream_update {
Ok(vm_c) => { Ok(del_vm_rq) => {
vm_contracts.push(vm_c); deleted_vm_reqs.push(del_vm_rq);
} }
Err(e) => { Err(e) => {
panic!("Received error instead of vm_contracts: {e:?}"); panic!("Received error instead of deleted_vm_reqs: {e:?}");
} }
} }
} }
Ok(vm_contracts) Ok(deleted_vm_reqs)
} }
pub async fn daemon_listener( pub async fn daemon_listener(

@ -211,3 +211,43 @@ async fn test_inspect_operator() {
assert!(!inspect_response.vm_nodes.is_empty()); assert!(!inspect_response.vm_nodes.is_empty());
assert_eq!(&inspect_response.vm_nodes[0].operator, &operator_key.pubkey); assert_eq!(&inspect_response.vm_nodes[0].operator, &operator_key.pubkey);
} }
#[tokio::test]
async fn test_kick_contract() {
// TODO: implement seed data to test
// possibilities
// 1. vm contract
// 2. app contract
// 3. non existent contract
// 4. other operator's contract
// 5. contract collected more than a week
// 6. refund amount calculation
// 7. refund of multiple contract kick in a day for same user
env_logger::builder()
.filter_level(log::LevelFilter::Trace)
.filter_module("tungstenite", log::LevelFilter::Debug)
.filter_module("tokio_tungstenite", log::LevelFilter::Debug)
.init();
let db_conn = prepare_test_db().await.unwrap();
let operator_wallet = "BFopWmwcZAMF1h2PFECZNdEucdZfnZZ32p6R9ZaBiVsS";
let contract_uuid = "26577f1c98674a1780a86cf0490f1270";
let reason = "test reason";
let kick_response = surreal_brain::db::general::WrapperContract::kick_contract(
&db_conn,
operator_wallet,
contract_uuid,
reason,
)
.await;
match kick_response {
Ok(refund_amount) => {
println!("Refund amount: {}", refund_amount);
}
Err(e) => {
println!("Error: {}", e);
}
}
}

@ -20,11 +20,17 @@ async fn test_vm_creation() {
let key = Key::new(); let key = Key::new();
let _ = create_new_vm(&db, &key, &daemon_key, &brain_channel).await; let new_vm_resp = create_new_vm(&db, &key, &daemon_key, &brain_channel).await;
assert!(new_vm_resp.is_err());
let grpc_error_message = new_vm_resp.err().unwrap().to_string();
assert!(grpc_error_message.contains("Insufficient funds"));
// TODO: Airdrop the user and try creating the VM again
} }
#[tokio::test] #[tokio::test]
async fn test_vm_creation_timeout() { async fn test_timeout_vm_creation() {
prepare_test_db().await.unwrap(); prepare_test_db().await.unwrap();
// env_logger::builder().filter_level(log::LevelFilter::Error).init(); // env_logger::builder().filter_level(log::LevelFilter::Error).init();