From 928a7d029be115c4b02f8a206365dbfb6e68289d Mon Sep 17 00:00:00 2001 From: Noor Date: Tue, 11 Mar 2025 10:36:20 +0000 Subject: [PATCH] Implement app contracts pricing methods with cron job --- src/data.rs | 101 ++++++++++++++++++++++++++++++++++++++++++++++++++-- src/main.rs | 1 + 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/src/data.rs b/src/data.rs index 0aeef2f..fd2039c 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,7 +1,6 @@ use crate::grpc::snp_proto::{self as grpc}; use chrono::Utc; use dashmap::DashMap; -use detee_shared::sgx::pb::brain::DelAppReq; use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; use std::str::FromStr; @@ -14,10 +13,12 @@ use std::{ use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Sender as OneshotSender; +use detee_shared::sgx::pb::brain::brain_message_app; use detee_shared::sgx::pb::brain::AppContract as AppContractPB; use detee_shared::sgx::pb::brain::AppNodeResources; use detee_shared::sgx::pb::brain::AppResource as AppResourcePB; use detee_shared::sgx::pb::brain::BrainMessageApp; +use detee_shared::sgx::pb::brain::DelAppReq; use detee_shared::sgx::pb::brain::MappedPort; use detee_shared::sgx::pb::brain::NewAppReq; use detee_shared::sgx::pb::brain::NewAppRes; @@ -39,7 +40,7 @@ pub enum Error { AccessDenied, } -#[derive(Clone, Default, Serialize, Deserialize)] +#[derive(Clone, Default, Serialize, Deserialize, Debug)] pub struct AccountData { pub balance: u64, pub tmp_locked: u64, @@ -193,6 +194,24 @@ pub struct AppContract { pub public_package_mr_enclave: Option>, } +impl AppContract { + fn total_units(&self) -> u64 { + // TODO: Optimize this based on price of hardware. + (self.vcpus as u64 * 10) + + (self.memory_mb as u64 / 200) + + (self.disk_size_mb as u64 / 10000) + } + + pub fn price_estimate(vcpus: u32, memory_mb: u32, disk_size_mb: u32) -> u64 { + (vcpus as u64 * 10) + (memory_mb as u64 / 200) + (disk_size_mb as u64 / 10000) + } + + /// Returns price per minute in nanoLP + fn price_per_minute(&self) -> u64 { + self.total_units() * self.price_per_unit + } +} + impl From for AppContractPB { fn from(value: AppContract) -> Self { let mapped_ports = value @@ -243,6 +262,8 @@ pub struct AppNode { pub max_ports_per_app: u32, // nanotokens per unit per minute pub price: u64, + + pub offline_minutes: u64, } #[derive(Default, Serialize, Deserialize)] @@ -1183,6 +1204,11 @@ impl BrainData { contracts.iter().find(|c| c.uuid == uuid).cloned() } + pub fn find_app_node_by_pubkey(&self, public_key: &str) -> Option { + let nodes = self.app_nodes.read().unwrap(); + nodes.iter().cloned().find(|n| n.node_pubkey == public_key) + } + pub fn find_app_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec { debug!("Searching contracts for admin pubkey {admin_pubkey}"); let contracts: Vec = self @@ -1206,6 +1232,60 @@ impl BrainData { .collect() } + pub async fn app_contracts_cron(&self) { + let mut deleted_app_contracts: Vec<(String, String)> = Vec::new(); + log::debug!("Running app contracts cron..."); + { + let mut app_contracts = self.app_contracts.write().unwrap(); + app_contracts.retain_mut(|c| { + let node = self.find_app_node_by_pubkey(&c.node_pubkey).unwrap(); + if node.offline_minutes == 0 { + let operator_wallet = node.operator_wallet.clone(); + let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; + c.collected_at = Utc::now(); + dbg!(&minutes_to_collect); + dbg!(&c.price_per_minute()); + let mut nanolp_to_collect = + c.price_per_minute().saturating_mul(minutes_to_collect); + if nanolp_to_collect > c.locked_nano { + nanolp_to_collect = c.locked_nano; + } + dbg!(&nanolp_to_collect); + log::debug!("Removing {nanolp_to_collect} nanoLP from {}", c.uuid); + c.locked_nano -= nanolp_to_collect; + let escrow_multiplier = match self.operators.get(&operator_wallet) { + Some(op) if op.escrow > 5000 => match self.operators.get(&c.admin_pubkey) { + Some(user_is_op) if user_is_op.escrow > 5000 => 1, + _ => 5, + }, + _ => 1, + }; + self.add_nano_to_wallet( + &operator_wallet, + nanolp_to_collect * escrow_multiplier, + ); + if c.locked_nano == 0 { + deleted_app_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); + } + } + c.locked_nano > 0 + }); + } + // inform daemons of the deletion of the contracts + for (uuid, node_pubkey) in deleted_app_contracts.iter() { + if let Some(app_daemon_tx) = self.app_daemon_tx.get(&node_pubkey.clone()) { + let msg = BrainMessageApp { + msg: Some(brain_message_app::Msg::DeleteAppReq(DelAppReq { + uuid: uuid.to_string(), + admin_pubkey: String::new(), + })), + }; + let app_daemon_tx = app_daemon_tx.clone(); + let _ = app_daemon_tx.send(msg).await; + } + } + } + pub fn submit_app_node_resources(&self, node_resource: AppNodeResources) { debug!("{:#?}", &node_resource); let mut nodes = self.app_nodes.write().unwrap(); @@ -1231,6 +1311,23 @@ impl BrainData { } pub async fn send_new_container_req(&self, mut req: NewAppReq, tx: OneshotSender) { + // TODO: make sure locked_nano in cli + if req.locked_nano == 0 { + let resource = req.resource.clone().unwrap_or_default(); + req.locked_nano = + AppContract::price_estimate(resource.vcpu, resource.memory_mb, resource.disk_mb) + } + + if let Err(e) = self.lock_nanotockens(&req.admin_pubkey, req.locked_nano) { + let _ = tx.send(NewAppRes { + uuid: String::new(), + error: e.to_string(), + status: "failed".to_string(), + ..Default::default() + }); + return; + } + req.uuid = uuid::Uuid::new_v4().to_string(); info!("Inserting new container request in memory: {req:?}"); diff --git a/src/main.rs b/src/main.rs index ccc93bf..f9b1041 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,7 @@ async fn main() { tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; data_clone.vm_nodes_cron().await; data_clone.vm_contracts_cron().await; + data_clone.app_contracts_cron().await; if let Err(e) = data_clone.save_to_disk() { log::error!("Could not save data to disk due to error: {e}") }