use crate::grpc::snp_proto::{self as grpc}; use chrono::Utc; use dashmap::DashMap; use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; use std::str::FromStr; use std::sync::RwLock; use std::{ collections::{HashMap, HashSet}, fs::File, io::Write, }; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Sender as OneshotSender; use detee_shared::sgx::pb::brain::brain_message_app; use detee_shared::sgx::pb::brain::AppContract as AppContractPB; use detee_shared::sgx::pb::brain::AppNodeFilters; use detee_shared::sgx::pb::brain::AppNodeListResp; use detee_shared::sgx::pb::brain::AppNodeResources; use detee_shared::sgx::pb::brain::AppResource as AppResourcePB; use detee_shared::sgx::pb::brain::BrainMessageApp; use detee_shared::sgx::pb::brain::DelAppReq; use detee_shared::sgx::pb::brain::MappedPort; use detee_shared::sgx::pb::brain::NewAppReq; use detee_shared::sgx::pb::brain::NewAppRes; const DATA_PATH: &str = "/etc/detee/brain-mock/saved_data.yaml"; #[derive(thiserror::Error, Debug)] pub enum Error { #[error("We do not allow locking of more than 100000 LP.")] TxTooBig, #[error("Escrow must be at least 5000 LP.")] MinimalEscrow, #[error("Account has insufficient funds for this operation")] InsufficientFunds, #[error("Could not find contract {0}")] VmContractNotFound(String), #[error("This error should never happen.")] ImpossibleError, #[error("You don't have the required permissions for this operation.")] AccessDenied, #[error("Could not find contract {0}")] AppContractNotFound(String), } #[derive(Clone, Default, Serialize, Deserialize, Debug)] pub struct AccountData { pub balance: u64, pub tmp_locked: u64, // holds reasons why VMs of this account got kicked pub kicked_for: Vec, pub last_kick: chrono::DateTime, // holds accounts that banned this account pub banned_by: HashSet, } #[derive(Clone, Default, Serialize, Deserialize)] pub struct OperatorData { pub escrow: u64, pub email: String, pub banned_users: HashSet, pub vm_nodes: HashSet, pub app_nodes: HashSet, } impl From for grpc::AccountBalance { fn from(value: AccountData) -> Self { grpc::AccountBalance { balance: value.balance, tmp_locked: value.tmp_locked, } } } #[derive(Eq, PartialEq, Clone, Debug, Default, Serialize, Deserialize)] pub struct VmNode { pub public_key: String, pub operator_wallet: String, pub country: String, pub region: String, pub city: String, pub ip: String, pub avail_mem_mb: u32, pub avail_vcpus: u32, pub avail_storage_gbs: u32, pub avail_ipv4: u32, pub avail_ipv6: u32, pub avail_ports: u32, pub max_ports_per_vm: u32, // nanoLP per unit per minute pub price: u64, // 1st String is user wallet and 2nd String is report message pub reports: HashMap, pub offline_minutes: u64, } impl Into for VmNode { fn into(self) -> grpc::VmNodeListResp { grpc::VmNodeListResp { operator: self.operator_wallet, node_pubkey: self.public_key, country: self.country, region: self.region, city: self.city, ip: self.ip, price: self.price, reports: self.reports.into_values().collect(), } } } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct VmContract { pub uuid: String, pub hostname: String, pub admin_pubkey: String, pub node_pubkey: String, pub exposed_ports: Vec, pub public_ipv4: String, pub public_ipv6: String, pub disk_size_gb: u32, pub vcpus: u32, pub memory_mb: u32, pub kernel_sha: String, pub dtrfs_sha: String, pub created_at: chrono::DateTime, pub updated_at: chrono::DateTime, // recommended value is 20000 /// price per unit per minute pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: chrono::DateTime, } impl VmContract { /// total hardware units of this VM fn total_units(&self) -> u64 { // TODO: Optimize this based on price of hardware. // I tried, but this can be done better. // Storage cost should also be based on tier (self.vcpus as u64 * 10) + ((self.memory_mb + 256) as u64 / 200) + (self.disk_size_gb as u64 / 10) + (!self.public_ipv4.is_empty() as u64 * 10) } /// Returns price per minute in nanoLP fn price_per_minute(&self) -> u64 { self.total_units() * self.price_per_unit } } impl Into for VmContract { fn into(self) -> grpc::VmContract { let nano_per_minute = self.price_per_minute(); grpc::VmContract { uuid: self.uuid, hostname: self.hostname, admin_pubkey: self.admin_pubkey, node_pubkey: self.node_pubkey, exposed_ports: self.exposed_ports, public_ipv4: self.public_ipv4, public_ipv6: self.public_ipv6, disk_size_gb: self.disk_size_gb, vcpus: self.vcpus, memory_mb: self.memory_mb, kernel_sha: self.kernel_sha, dtrfs_sha: self.dtrfs_sha, created_at: self.created_at.to_rfc3339(), updated_at: self.updated_at.to_rfc3339(), nano_per_minute, locked_nano: self.locked_nano, collected_at: self.collected_at.to_rfc3339(), } } } #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct AppContract { pub uuid: String, pub package_url: String, pub admin_pubkey: String, pub node_pubkey: String, pub mapped_ports: Vec<(u16, u16)>, pub host_ipv4: String, pub disk_size_mb: u32, pub vcpus: u32, pub memory_mb: u32, pub created_at: chrono::DateTime, pub updated_at: chrono::DateTime, // price per unit per minute // recommended value is 20000 pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: chrono::DateTime, pub hratls_pubkey: String, pub public_package_mr_enclave: Option>, } impl AppContract { fn total_units(&self) -> f64 { // TODO: Optimize this based on price of hardware. (self.vcpus as f64 * 10f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_mb as f64 / 10000f64) } /// Returns price per minute in nanoLP fn price_per_minute(&self) -> u64 { (self.total_units() * self.price_per_unit as f64) as u64 } } impl From for AppContractPB { fn from(value: AppContract) -> Self { let mapped_ports = value .mapped_ports .clone() .into_iter() .map(MappedPort::from) .collect(); let nano_per_minute = value.price_per_minute(); let resource = Some(AppResourcePB { memory_mb: value.memory_mb, disk_mb: value.disk_size_mb, vcpu: value.vcpus, ports: value.mapped_ports.iter().map(|p| p.1 as u32).collect(), }); Self { uuid: value.uuid, package_url: value.package_url, admin_pubkey: value.admin_pubkey, node_pubkey: value.node_pubkey, mapped_ports, public_ipv4: value.host_ipv4, resource, created_at: value.created_at.to_rfc3339(), updated_at: value.updated_at.to_rfc3339(), // TODO: check while implementing pricing nano_per_minute, locked_nano: value.locked_nano, collected_at: value.collected_at.to_rfc3339(), hratls_pubkey: value.hratls_pubkey, public_package_mr_enclave: value.public_package_mr_enclave, } } } #[derive(Eq, Hash, PartialEq, Clone, Debug, Default, Serialize, Deserialize)] pub struct AppNode { pub node_pubkey: String, pub operator_wallet: String, pub country: String, pub region: String, pub city: String, pub ip: String, pub avail_mem_mb: u32, pub avail_vcpus: u32, pub avail_storage_mb: u32, pub avail_no_of_port: u32, pub max_ports_per_app: u32, // nanotokens per unit per minute pub price: u64, pub offline_minutes: u64, } impl From for AppNodeListResp { fn from(value: AppNode) -> Self { Self { operator: value.operator_wallet, node_pubkey: value.node_pubkey, country: value.country, region: value.region, city: value.city, ip: value.ip, price: value.price, reports: Vec::new(), } } } #[derive(Default, Serialize, Deserialize)] pub struct BrainData { accounts: DashMap, operators: DashMap, vm_nodes: RwLock>, vm_contracts: RwLock>, #[serde(skip_serializing, skip_deserializing)] tmp_newvm_reqs: DashMap)>, #[serde(skip_serializing, skip_deserializing)] tmp_updatevm_reqs: DashMap)>, #[serde(skip_serializing, skip_deserializing)] daemon_tx: DashMap>, app_nodes: RwLock>, #[serde(skip_serializing, skip_deserializing)] app_daemon_tx: DashMap>, #[serde(skip_serializing, skip_deserializing)] tmp_new_container_reqs: DashMap)>, app_contracts: RwLock>, } impl BrainData { pub fn save_to_disk(&self) -> Result<(), Box> { let mut file = File::create(DATA_PATH)?; file.write_all(serde_yaml::to_string(self)?.as_bytes())?; Ok(()) } fn load_from_disk() -> Result> { let content = std::fs::read_to_string(DATA_PATH)?; let data: Self = serde_yaml::from_str(&content)?; Ok(data) } pub fn new() -> Self { match Self::load_from_disk() { Ok(data) => data, Err(e) => { warn!("Could not data {DATA_PATH} due to error: {e:?}"); info!("Creating new instance of brain."); Self { accounts: DashMap::new(), operators: DashMap::new(), vm_nodes: RwLock::new(Vec::new()), vm_contracts: RwLock::new(Vec::new()), tmp_newvm_reqs: DashMap::new(), tmp_updatevm_reqs: DashMap::new(), daemon_tx: DashMap::new(), app_nodes: RwLock::new(Vec::new()), app_daemon_tx: DashMap::new(), tmp_new_container_reqs: DashMap::new(), app_contracts: RwLock::new(Vec::new()), } } } } pub fn get_balance(&self, account: &str) -> AccountData { if let Some(account) = self.accounts.get(account) { return account.value().clone(); } else { let balance = AccountData { balance: 0, tmp_locked: 0, kicked_for: Vec::new(), banned_by: HashSet::new(), last_kick: chrono::Utc::now(), }; return balance; } } pub fn give_airdrop(&self, account: &str, tokens: u64) { warn!("Airdropping {tokens} to {account}."); self.add_nano_to_wallet(account, tokens.saturating_mul(1_000_000_000)); } pub fn slash_account(&self, account: &str, tokens: u64) { warn!("Slashing {tokens} from {account}."); self.rm_nano_from_wallet(account, tokens.saturating_mul(1_000_000_000)); } fn add_nano_to_wallet(&self, account: &str, nano_lp: u64) { log::debug!("Adding {nano_lp} nanoLP to {account}"); self.accounts .entry(account.to_string()) .and_modify(|d| d.balance += nano_lp) .or_insert(AccountData { balance: nano_lp, ..Default::default() }); } fn rm_nano_from_wallet(&self, account: &str, nano_lp: u64) { log::debug!("Slashing {nano_lp} nanoLP to {account}"); self.accounts.entry(account.to_string()).and_modify(|d| { d.balance = d.balance.saturating_sub(nano_lp); }); } /// This is written to run every minute pub async fn vm_nodes_cron(&self) { log::debug!("Running vm nodes cron..."); let mut nodes = self.vm_nodes.write().unwrap(); let mut vm_contracts = self.vm_contracts.write().unwrap(); for node in nodes.iter_mut() { if self.daemon_tx.contains_key(&node.public_key) { node.offline_minutes = 0; continue; } let mut operator = match self .operators .iter_mut() .find(|o| o.vm_nodes.contains(&node.public_key)) { Some(op) => op, None => continue, }; node.offline_minutes += 1; // compensate contract admin if the node is offline more then 5 minutes if node.offline_minutes > 5 { for c in vm_contracts .iter() .filter(|c| c.node_pubkey == node.public_key) { let compensation = c.price_per_minute() * 10; if compensation < operator.escrow { operator.escrow -= compensation; self.add_nano_to_wallet(&c.admin_pubkey, compensation); } } } } // delete nodes that are offline more than 3 hours, and clean contracts nodes.retain(|n| { if n.offline_minutes > 1600 { vm_contracts.retain_mut(|c| { if c.node_pubkey == n.public_key { self.add_nano_to_wallet(&c.admin_pubkey, c.locked_nano); } c.node_pubkey != n.public_key }); for mut op in self.operators.iter_mut() { op.vm_nodes.remove(&n.public_key); } } n.offline_minutes <= 180 }); } pub async fn vm_contracts_cron(&self) { let mut deleted_contracts: Vec<(String, String)> = Vec::new(); log::debug!("Running contracts cron..."); { let mut contracts = self.vm_contracts.write().unwrap(); contracts.retain_mut(|c| { let node = self.find_node_by_pubkey(&c.node_pubkey).unwrap(); if node.offline_minutes == 0 { let operator_wallet = node.operator_wallet.clone(); let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; c.collected_at = Utc::now(); let mut nanolp_to_collect = c.price_per_minute().saturating_mul(minutes_to_collect); if nanolp_to_collect > c.locked_nano { nanolp_to_collect = c.locked_nano; } log::debug!("Removing {nanolp_to_collect} nanoLP from {}", c.uuid); c.locked_nano -= nanolp_to_collect; let escrow_multiplier = match self.operators.get(&operator_wallet) { Some(op) if op.escrow > 5000 => match self.operators.get(&c.admin_pubkey) { Some(user_is_op) if user_is_op.escrow > 5000 => 1, _ => 5, }, _ => 1, }; self.add_nano_to_wallet( &operator_wallet, nanolp_to_collect * escrow_multiplier, ); if c.locked_nano == 0 { deleted_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); } } c.locked_nano > 0 }); } // inform daemons of the deletion of the contracts for (uuid, node_pubkey) in deleted_contracts.iter() { if let Some(daemon_tx) = self.daemon_tx.get(&node_pubkey.clone()) { let msg = grpc::BrainVmMessage { msg: Some(grpc::brain_vm_message::Msg::DeleteVm(grpc::DeleteVmReq { uuid: uuid.to_string(), admin_pubkey: String::new(), })), }; let daemon_tx = daemon_tx.clone(); let _ = daemon_tx.send(msg).await; } } } pub fn register_node(&self, node: VmNode) { info!("Registering node {node:?}"); self.add_vmnode_to_operator(&node.operator_wallet, &node.public_key); let mut nodes = self.vm_nodes.write().unwrap(); for n in nodes.iter_mut() { if n.public_key == node.public_key { // TODO: figure what to do in this case. warn!("VM Node {} already exists. Updating data.", n.public_key); *n = node; return; } } nodes.push(node); } // todo: this should also support Apps /// Receives: operator, contract uuid, reason of kick pub async fn kick_contract( &self, operator: &str, uuid: &str, reason: &str, ) -> Result { log::debug!("Operator {operator} requested a kick of {uuid} for reason: {reason}"); let contract = self.find_contract_by_uuid(uuid)?; let mut operator_data = self .operators .get_mut(operator) .ok_or(Error::AccessDenied)?; if !operator_data.vm_nodes.contains(&contract.node_pubkey) { return Err(Error::AccessDenied); } let mut minutes_to_refund = chrono::Utc::now() .signed_duration_since(contract.updated_at) .num_minutes() .abs() as u64; // cap refund at 1 week if minutes_to_refund > 10080 { minutes_to_refund = 10080; } let mut refund_amount = minutes_to_refund * contract.price_per_minute(); let mut admin_account = self .accounts .get_mut(&contract.admin_pubkey) .ok_or(Error::ImpossibleError)?; // check if he got kicked within the last day if !chrono::Utc::now() .signed_duration_since(admin_account.last_kick) .gt(&chrono::Duration::days(1)) { refund_amount = 0; } if operator_data.escrow < refund_amount { refund_amount = operator_data.escrow; } log::debug!( "Removing {refund_amount} escrow from {} and giving it to {}", operator_data.key(), admin_account.key() ); admin_account.balance += refund_amount; admin_account.kicked_for.push(reason.to_string()); operator_data.escrow -= refund_amount; let admin_pubkey = contract.admin_pubkey.clone(); drop(admin_account); drop(contract); self.delete_vm(grpc::DeleteVmReq { uuid: uuid.to_string(), admin_pubkey, }) .await?; Ok(refund_amount) } pub fn ban_user(&self, operator: &str, user: &str) { self.accounts .entry(user.to_string()) .and_modify(|a| { a.banned_by.insert(operator.to_string()); }) .or_insert(AccountData { banned_by: HashSet::from([operator.to_string()]), ..Default::default() }); self.operators .entry(operator.to_string()) .and_modify(|o| { o.banned_users.insert(user.to_string()); }) .or_insert(OperatorData { banned_users: HashSet::from([user.to_string()]), ..Default::default() }); } pub fn report_node(&self, admin_pubkey: String, node: &str, report: String) { let mut nodes = self.vm_nodes.write().unwrap(); if let Some(node) = nodes.iter_mut().find(|n| n.public_key == node) { node.reports.insert(admin_pubkey, report); } } pub fn lock_nanotockens(&self, account: &str, nano_lp: u64) -> Result<(), Error> { if nano_lp > 100_000_000_000_000 { return Err(Error::TxTooBig); } if let Some(mut account) = self.accounts.get_mut(account) { if nano_lp > account.balance { return Err(Error::InsufficientFunds); } account.balance = account.balance.saturating_sub(nano_lp); account.tmp_locked = account.tmp_locked.saturating_add(nano_lp); Ok(()) } else { Err(Error::InsufficientFunds) } } pub fn extend_vm_contract_time( &self, uuid: &str, wallet: &str, nano_lp: u64, ) -> Result<(), Error> { if nano_lp > 100_000_000_000_000 { return Err(Error::TxTooBig); } let mut account = match self.accounts.get_mut(wallet) { Some(account) => account, None => return Err(Error::InsufficientFunds), }; match self .vm_contracts .write() .unwrap() .iter_mut() .find(|c| c.uuid == uuid) { Some(contract) => { if contract.admin_pubkey != wallet { return Err(Error::VmContractNotFound(uuid.to_string())); } if account.balance + contract.locked_nano < nano_lp { return Err(Error::InsufficientFunds); } account.balance = account.balance + contract.locked_nano - nano_lp; contract.locked_nano = nano_lp; Ok(()) } None => Err(Error::VmContractNotFound(uuid.to_string())), } } pub fn submit_node_resources(&self, res: grpc::VmNodeResources) { let mut nodes = self.vm_nodes.write().unwrap(); for n in nodes.iter_mut() { if n.public_key == res.node_pubkey { debug!( "Found node {}. Updating resources to {:?}", n.public_key, res ); n.avail_ipv4 = res.avail_ipv4; n.avail_ipv6 = res.avail_ipv6; n.avail_vcpus = res.avail_vcpus; n.avail_mem_mb = res.avail_memory_mb; n.avail_storage_gbs = res.avail_storage_gb; n.max_ports_per_vm = res.max_ports_per_vm; n.avail_ports = res.avail_ports; return; } } debug!( "VM Node {} not found when trying to update resources.", res.node_pubkey ); debug!("VM Node list:\n{:?}", nodes); } pub fn add_daemon_tx(&self, node_pubkey: &str, tx: Sender) { self.daemon_tx.insert(node_pubkey.to_string(), tx); } pub fn del_daemon_tx(&self, node_pubkey: &str) { self.daemon_tx.remove(node_pubkey); } pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) -> Result<(), Error> { log::debug!("Starting deletion of VM {}", delete_vm.uuid); let contract = self.find_contract_by_uuid(&delete_vm.uuid)?; if contract.admin_pubkey != delete_vm.admin_pubkey { return Err(Error::AccessDenied); } info!("Found vm {}. Deleting...", delete_vm.uuid); if let Some(daemon_tx) = self.daemon_tx.get(&contract.node_pubkey) { debug!( "TX for daemon {} found. Informing daemon about deletion of {}.", contract.node_pubkey, delete_vm.uuid ); let msg = grpc::BrainVmMessage { msg: Some(grpc::brain_vm_message::Msg::DeleteVm(delete_vm.clone())), }; if let Err(e) = daemon_tx.send(msg).await { warn!( "Failed to send deletion request to {} due to error: {e:?}", contract.node_pubkey ); info!("Deleting daemon TX for {}", contract.node_pubkey); self.del_daemon_tx(&contract.node_pubkey); } } self.add_nano_to_wallet(&contract.admin_pubkey, contract.locked_nano); let mut contracts = self.vm_contracts.write().unwrap(); contracts.retain(|c| c.uuid != delete_vm.uuid); Ok(()) } // also unlocks nanotokens in case VM creation failed pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) { let new_vm_req = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) { Some((_, r)) => r, None => { log::error!( "Received confirmation for ghost NewVMReq {}", new_vm_resp.uuid ); return; } }; if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) { log::error!( "CLI RX for {} dropped before receiving confirmation {:?}.", &new_vm_req.0.admin_pubkey, new_vm_resp, ); } if new_vm_resp.error != "" { if let Some(mut admin_wallet) = self.accounts.get_mut(&new_vm_req.0.admin_pubkey) { admin_wallet.balance += new_vm_req.0.locked_nano; admin_wallet.tmp_locked -= new_vm_req.0.locked_nano; } return; } let mut public_ipv4 = String::new(); let mut public_ipv6 = String::new(); let args = new_vm_resp.args.as_ref().unwrap(); for ip in args.ips.iter() { if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { public_ipv4 = ipv4_addr.to_string(); } continue; } if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) { public_ipv6 = ipv6_addr.to_string(); } } if let Some(mut admin_wallet) = self.accounts.get_mut(&new_vm_req.0.admin_pubkey) { admin_wallet.tmp_locked -= new_vm_req.0.locked_nano; } let contract = VmContract { uuid: new_vm_resp.uuid, exposed_ports: args.exposed_ports.clone(), public_ipv4, public_ipv6, created_at: Utc::now(), updated_at: Utc::now(), hostname: new_vm_req.0.hostname, admin_pubkey: new_vm_req.0.admin_pubkey, node_pubkey: new_vm_req.0.node_pubkey.clone(), disk_size_gb: new_vm_req.0.disk_size_gb, vcpus: new_vm_req.0.vcpus, memory_mb: new_vm_req.0.memory_mb, kernel_sha: new_vm_req.0.kernel_sha, dtrfs_sha: new_vm_req.0.dtrfs_sha, price_per_unit: new_vm_req.0.price_per_unit, locked_nano: new_vm_req.0.locked_nano, collected_at: Utc::now(), }; info!("Created new contract: {contract:?}"); self.vm_contracts.write().unwrap().push(contract); } pub async fn submit_updatevm_resp(&self, mut update_vm_resp: grpc::UpdateVmResp) { let update_vm_req = match self.tmp_updatevm_reqs.remove(&update_vm_resp.uuid) { Some((_, r)) => r, None => { log::error!( "Received confirmation for ghost UpdateVMRequest {}", update_vm_resp.uuid ); update_vm_resp.error = "Received confirmation for ghost UpdateVMRequest.".to_string(); return; } }; if let Err(e) = update_vm_req.1.send(update_vm_resp.clone()) { log::warn!( "CLI RX dropped before receiving UpdateVMResp {update_vm_resp:?}. Error: {e:?}" ); } if update_vm_resp.error != "" { return; } let mut contracts = self.vm_contracts.write().unwrap(); match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) { Some(contract) => { if update_vm_req.0.vcpus != 0 { contract.vcpus = update_vm_req.0.vcpus; } if update_vm_req.0.memory_mb != 0 { contract.memory_mb = update_vm_req.0.memory_mb; } if update_vm_req.0.disk_size_gb != 0 { contract.disk_size_gb = update_vm_req.0.disk_size_gb; } if !update_vm_req.0.kernel_sha.is_empty() { debug!( "Updating kernel sha for {} to {}", contract.uuid, update_vm_req.0.kernel_sha ); contract.kernel_sha = update_vm_req.0.kernel_sha; } if !update_vm_req.0.dtrfs_sha.is_empty() { debug!( "Updating dtrfs sha for {} to {}", contract.uuid, update_vm_req.0.dtrfs_sha ); contract.dtrfs_sha = update_vm_req.0.dtrfs_sha; } contract.updated_at = Utc::now(); } None => { log::error!("VM Contract not found for {}.", update_vm_req.0.uuid); update_vm_resp.error = "VM Contract not found.".to_string(); } } } pub async fn submit_newvm_req( &self, mut req: grpc::NewVmReq, tx: OneshotSender, ) { if let Err(e) = self.lock_nanotockens(&req.admin_pubkey, req.locked_nano) { let _ = tx.send(grpc::NewVmResp { uuid: String::new(), error: e.to_string(), args: None, }); return; } req.uuid = uuid::Uuid::new_v4().to_string(); info!("Inserting new vm request in memory: {req:?}"); self.tmp_newvm_reqs .insert(req.uuid.clone(), (req.clone(), tx)); if let Some(daemon_tx) = self.daemon_tx.get(&req.node_pubkey) { debug!( "Found daemon TX for {}. Sending newVMReq {}", req.node_pubkey, req.uuid ); let msg = grpc::BrainVmMessage { msg: Some(grpc::brain_vm_message::Msg::NewVmReq(req.clone())), }; if let Err(e) = daemon_tx.send(msg).await { warn!( "Failed to send new VM request to {} due to error: {e:?}", req.node_pubkey ); info!("Deleting daemon TX for {}", req.node_pubkey); self.del_daemon_tx(&req.node_pubkey); self.submit_newvm_resp(grpc::NewVmResp { error: "Daemon is offline.".to_string(), uuid: req.uuid, args: None, }) .await; } } else { self.submit_newvm_resp(grpc::NewVmResp { error: "Daemon is offline.".to_string(), uuid: req.uuid, args: None, }) .await; } } pub async fn submit_updatevm_req( &self, req: grpc::UpdateVmReq, tx: OneshotSender, ) { let uuid = req.uuid.clone(); info!("Inserting new vm update request in memory: {req:?}"); let node_pubkey = match self.find_contract_by_uuid(&req.uuid) { Ok(contract) => { if contract.admin_pubkey != req.admin_pubkey { let _ = tx.send(grpc::UpdateVmResp { uuid, error: "VM Contract does not exist.".to_string(), args: None, }); return; } contract.node_pubkey } Err(_) => { log::warn!( "Received UpdateVMReq for a contract that does not exist: {}", req.uuid ); let _ = tx.send(grpc::UpdateVmResp { uuid, error: "VM Contract does not exist.".to_string(), args: None, }); return; } }; self.tmp_updatevm_reqs .insert(req.uuid.clone(), (req.clone(), tx)); if let Some(server_tx) = self.daemon_tx.get(&node_pubkey) { debug!( "Found daemon TX for {}. Sending updateVMReq {}", node_pubkey, req.uuid ); let msg = grpc::BrainVmMessage { msg: Some(grpc::brain_vm_message::Msg::UpdateVmReq(req.clone())), }; match server_tx.send(msg).await { Ok(_) => { debug!("Successfully sent updateVMReq to {}", node_pubkey); return; } Err(e) => { warn!("Failed to send update VM request to {node_pubkey} due to error {e}"); info!("Deleting daemon TX for {}", node_pubkey); self.del_daemon_tx(&node_pubkey); self.submit_updatevm_resp(grpc::UpdateVmResp { uuid, error: "Daemon is offline.".to_string(), args: None, }) .await; } } } else { warn!("No daemon TX found for {}", node_pubkey); self.submit_updatevm_resp(grpc::UpdateVmResp { uuid, error: "Daemon is offline.".to_string(), args: None, }) .await; } } pub fn find_node_by_pubkey(&self, public_key: &str) -> Option { let nodes = self.vm_nodes.read().unwrap(); nodes.iter().cloned().find(|n| n.public_key == public_key) } pub fn is_user_banned_by_node(&self, user_wallet: &str, node_pubkey: &str) -> bool { if let Some(node) = self.find_node_by_pubkey(&node_pubkey) { if let Some(account) = self.accounts.get(user_wallet) { if account.banned_by.contains(&node.operator_wallet) { return true; } } } false } pub fn add_vmnode_to_operator(&self, operator_wallet: &str, node_pubkey: &str) { self.operators .entry(operator_wallet.to_string()) .and_modify(|op| { op.vm_nodes.insert(node_pubkey.to_string()); }) .or_insert(OperatorData { escrow: 0, email: String::new(), banned_users: HashSet::new(), vm_nodes: HashSet::from([node_pubkey.to_string()]), app_nodes: HashSet::new(), }); } pub fn register_operator(&self, req: grpc::RegOperatorReq) -> Result<(), Error> { let mut operator = match self.operators.get(&req.pubkey) { Some(o) => (*(o.value())).clone(), None => OperatorData { ..Default::default() }, }; if req.escrow < 5000 { return Err(Error::MinimalEscrow); } let escrow = req.escrow * 1_000_000_000; if let Some(mut account) = self.accounts.get_mut(&req.pubkey) { if (account.balance + operator.escrow) < escrow { return Err(Error::InsufficientFunds); } account.balance = account.balance + operator.escrow - escrow; operator.escrow = escrow; } else { return Err(Error::InsufficientFunds); } operator.email = req.email; self.operators.insert(req.pubkey, operator); Ok(()) } pub fn find_vm_nodes_by_operator(&self, operator_wallet: &str) -> Vec { let nodes = self.vm_nodes.read().unwrap(); nodes .iter() .filter(|node| node.operator_wallet == operator_wallet) .cloned() .collect() } pub fn total_operator_reports(&self, operator_wallet: &str) -> usize { let nodes = self.vm_nodes.read().unwrap(); nodes .iter() .cloned() .filter(|n| n.operator_wallet == operator_wallet) .map(|node| node.reports.len()) .sum() } pub fn find_vm_nodes_by_filters( &self, filters: &crate::grpc::snp_proto::VmNodeFilters, ) -> Vec { let nodes = self.vm_nodes.read().unwrap(); nodes .iter() .filter(|n| { n.avail_ports >= filters.free_ports && (!filters.offers_ipv4 || n.avail_ipv4 > 0) && (!filters.offers_ipv6 || n.avail_ipv6 > 0) && n.avail_vcpus >= filters.vcpus && n.avail_mem_mb >= filters.memory_mb && n.avail_storage_gbs >= filters.storage_gb && (filters.country.is_empty() || (n.country == filters.country)) && (filters.city.is_empty() || (n.city == filters.city)) && (filters.region.is_empty() || (n.region == filters.region)) && (filters.ip.is_empty() || (n.ip == filters.ip)) }) .cloned() .collect() } // TODO: sort by rating pub fn get_one_node_by_filters( &self, filters: &crate::grpc::snp_proto::VmNodeFilters, ) -> Option { let nodes = self.vm_nodes.read().unwrap(); nodes .iter() .find(|n| { n.avail_ports >= filters.free_ports && (!filters.offers_ipv4 || n.avail_ipv4 > 0) && (!filters.offers_ipv6 || n.avail_ipv6 > 0) && n.avail_vcpus >= filters.vcpus && n.avail_mem_mb >= filters.memory_mb && n.avail_storage_gbs >= filters.storage_gb && (filters.country.is_empty() || (n.country == filters.country)) && (filters.city.is_empty() || (n.city == filters.city)) && (filters.region.is_empty() || (n.region == filters.region)) && (filters.ip.is_empty() || (n.ip == filters.ip)) && (filters.node_pubkey.is_empty() || (n.public_key == filters.node_pubkey)) }) .cloned() } pub fn find_contract_by_uuid(&self, uuid: &str) -> Result { let contracts = self.vm_contracts.read().unwrap(); contracts .iter() .cloned() .find(|c| c.uuid == uuid) .ok_or(Error::VmContractNotFound(uuid.to_string())) } pub fn list_all_contracts(&self) -> Vec { let contracts = self.vm_contracts.read().unwrap(); contracts.iter().cloned().collect() } pub fn list_accounts(&self) -> Vec { self.accounts .iter() .map(|a| grpc::Account { pubkey: a.key().to_string(), balance: a.balance, tmp_locked: a.tmp_locked, }) .collect() } pub fn list_operators(&self) -> Vec { self.operators .iter() .map(|op| grpc::ListOperatorsResp { pubkey: op.key().to_string(), escrow: op.escrow / 1_000_000_000, email: op.email.clone(), app_nodes: 0, vm_nodes: op.vm_nodes.len() as u64, reports: self.total_operator_reports(op.key()) as u64, }) .collect() } pub fn inspect_operator(&self, wallet: &str) -> Option { self.operators.get(wallet).map(|op| { let nodes = self .find_vm_nodes_by_operator(wallet) .into_iter() .map(|n| n.into()) .collect(); grpc::InspectOperatorResp { operator: Some(grpc::ListOperatorsResp { pubkey: op.key().to_string(), escrow: op.escrow, email: op.email.clone(), app_nodes: 0, vm_nodes: op.vm_nodes.len() as u64, reports: self.total_operator_reports(op.key()) as u64, }), nodes, } }) } pub fn find_vm_contracts_by_operator(&self, wallet: &str) -> Vec { debug!("Searching contracts for operator {wallet}"); let nodes = match self.operators.get(wallet) { Some(op) => op.vm_nodes.clone(), None => return Vec::new(), }; let contracts: Vec = self .vm_contracts .read() .unwrap() .iter() .filter(|c| nodes.contains(&c.node_pubkey)) .cloned() .collect(); contracts } pub fn find_vm_contracts_by_admin(&self, admin_wallet: &str) -> Vec { debug!("Searching contracts for admin pubkey {admin_wallet}"); let contracts: Vec = self .vm_contracts .read() .unwrap() .iter() .filter(|c| c.admin_pubkey == admin_wallet) .cloned() .collect(); contracts } pub fn find_vm_contracts_by_node(&self, node_pubkey: &str) -> Vec { let contracts = self.vm_contracts.read().unwrap(); contracts .iter() .filter(|c| c.node_pubkey == node_pubkey) .cloned() .collect() } } impl BrainData { pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender) { self.app_daemon_tx.insert(node_pubkey.to_string(), tx); } pub fn del_app_daemon_tx(&self, node_pubkey: &str) { self.app_daemon_tx.remove(node_pubkey); } pub fn register_app_node(&self, node: AppNode) { info!("Registering app node {node:?}"); self.add_app_node_to_operator(&node.operator_wallet, &node.node_pubkey); let mut nodes = self.app_nodes.write().unwrap(); for n in nodes.iter_mut() { if n.node_pubkey == node.node_pubkey { // TODO: figure what to do in this case. warn!("Node {} already exists. Updating data.", n.node_pubkey); *n = node; return; } } nodes.push(node); } pub fn add_app_node_to_operator(&self, operator_wallet: &str, node_pubkey: &str) { self.operators .entry(operator_wallet.to_string()) .and_modify(|op| { op.app_nodes.insert(node_pubkey.to_string()); }) .or_insert(OperatorData { escrow: 0, email: String::new(), banned_users: HashSet::new(), vm_nodes: HashSet::new(), app_nodes: HashSet::from([node_pubkey.to_string()]), }); } pub fn find_app_nodes_by_filters(&self, filters: &AppNodeFilters) -> Vec { let nodes = self.app_nodes.read().unwrap(); nodes .iter() .filter(|n| { n.avail_vcpus >= filters.vcpus && n.avail_mem_mb >= filters.memory_mb && n.avail_storage_mb >= filters.storage_mb && (filters.country.is_empty() || (n.country == filters.country)) && (filters.city.is_empty() || (n.city == filters.city)) && (filters.region.is_empty() || (n.region == filters.region)) && (filters.ip.is_empty() || (n.ip == filters.ip)) }) .cloned() .collect() } // TODO: sort by rating pub fn get_one_app_node_by_filters(&self, filters: &AppNodeFilters) -> Option { let nodes = self.app_nodes.read().unwrap(); nodes .iter() .find(|n| { n.avail_vcpus >= filters.vcpus && n.avail_mem_mb >= filters.memory_mb && n.avail_storage_mb >= filters.storage_mb && (filters.country.is_empty() || (n.country == filters.country)) && (filters.city.is_empty() || (n.city == filters.city)) && (filters.region.is_empty() || (n.region == filters.region)) && (filters.ip.is_empty() || (n.ip == filters.ip)) && (filters.node_pubkey.is_empty() || (n.node_pubkey == filters.node_pubkey)) }) .cloned() } pub fn find_app_contract_by_uuid(&self, uuid: &str) -> Result { let contracts = self.app_contracts.read().unwrap(); contracts .iter() .find(|c| c.uuid == uuid) .cloned() .ok_or(Error::AppContractNotFound(uuid.to_string())) } pub fn find_app_node_by_pubkey(&self, public_key: &str) -> Option { let nodes = self.app_nodes.read().unwrap(); nodes.iter().cloned().find(|n| n.node_pubkey == public_key) } pub fn find_app_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec { debug!("Searching contracts for admin pubkey {admin_pubkey}"); let contracts: Vec = self .app_contracts .read() .unwrap() .iter() .filter(|c| c.admin_pubkey == admin_pubkey) .cloned() .collect(); debug!("Found {} contracts or {admin_pubkey}.", contracts.len()); contracts } pub fn find_app_contracts_by_node(&self, node_pubkey: &str) -> Vec { let app_contracts = self.app_contracts.read().unwrap(); app_contracts .iter() .filter(|c| c.node_pubkey == node_pubkey) .cloned() .collect() } pub async fn app_contracts_cron(&self) { let mut deleted_app_contracts: Vec<(String, String)> = Vec::new(); log::debug!("Running app contracts cron..."); { let mut app_contracts = self.app_contracts.write().unwrap(); app_contracts.retain_mut(|c| { let node = self.find_app_node_by_pubkey(&c.node_pubkey).unwrap(); if node.offline_minutes == 0 { let operator_wallet = node.operator_wallet.clone(); let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; c.collected_at = Utc::now(); dbg!(&minutes_to_collect); dbg!(&c.price_per_minute()); let mut nanolp_to_collect = c.price_per_minute().saturating_mul(minutes_to_collect); if nanolp_to_collect > c.locked_nano { nanolp_to_collect = c.locked_nano; } dbg!(&nanolp_to_collect); log::debug!("Removing {nanolp_to_collect} nanoLP from {}", c.uuid); c.locked_nano -= nanolp_to_collect; let escrow_multiplier = match self.operators.get(&operator_wallet) { Some(op) if op.escrow > 5000 => match self.operators.get(&c.admin_pubkey) { Some(user_is_op) if user_is_op.escrow > 5000 => 1, _ => 5, }, _ => 1, }; self.add_nano_to_wallet( &operator_wallet, nanolp_to_collect * escrow_multiplier, ); if c.locked_nano == 0 { deleted_app_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); } } c.locked_nano > 0 }); } // inform daemons of the deletion of the contracts for (uuid, node_pubkey) in deleted_app_contracts.iter() { if let Some(app_daemon_tx) = self.app_daemon_tx.get(&node_pubkey.clone()) { let msg = BrainMessageApp { msg: Some(brain_message_app::Msg::DeleteAppReq(DelAppReq { uuid: uuid.to_string(), admin_pubkey: String::new(), })), }; let app_daemon_tx = app_daemon_tx.clone(); let _ = app_daemon_tx.send(msg).await; } } } pub fn submit_app_node_resources(&self, node_resource: AppNodeResources) { debug!("{:#?}", &node_resource); let mut nodes = self.app_nodes.write().unwrap(); for n in nodes.iter_mut() { if n.node_pubkey == node_resource.node_pubkey { debug!( "Found node {}. Updating resources to {:?}", n.node_pubkey, node_resource ); n.avail_vcpus = node_resource.avail_vcpus; n.avail_mem_mb = node_resource.avail_memory_mb; n.avail_storage_mb = node_resource.avail_storage_mb; n.max_ports_per_app = node_resource.max_ports_per_app; n.avail_no_of_port = node_resource.avail_no_of_port; return; } } debug!( "VM Node {} not found when trying to update resources.", node_resource.node_pubkey ); debug!("VM Node list:\n{:?}", nodes); } pub async fn send_new_container_req(&self, mut req: NewAppReq, tx: OneshotSender) { if let Err(e) = self.lock_nanotockens(&req.admin_pubkey, req.locked_nano) { let _ = tx.send(NewAppRes { uuid: String::new(), error: e.to_string(), status: "failed".to_string(), ..Default::default() }); return; } req.uuid = uuid::Uuid::new_v4().to_string(); info!("Inserting new container request in memory: {req:?}"); self.tmp_new_container_reqs .insert(req.uuid.clone(), (req.clone(), tx)); if let Some(app_daemon_tx) = self.app_daemon_tx.get(&req.node_pubkey) { debug!( "Found daemon TX for {}. Sending newVMReq {}", req.node_pubkey, req.uuid ); let msg = BrainMessageApp { msg: Some( detee_shared::sgx::pb::brain::brain_message_app::Msg::NewAppReq(req.clone()), ), }; if let Err(e) = app_daemon_tx.send(msg).await { warn!( "Failed to send new container request to {} due to error: {e:?}", req.node_pubkey ); info!("Deleting daemon TX for {}", req.node_pubkey); self.del_app_daemon_tx(&req.node_pubkey); self.send_new_container_resp(NewAppRes { uuid: req.uuid, status: "failed".to_string(), error: "Daemon is offline.".to_string(), ..Default::default() }) .await; } } else { self.send_new_container_resp(NewAppRes { status: "failed".to_string(), error: "Daemon is offline.".to_string(), uuid: req.uuid, ..Default::default() }) .await; } } pub async fn send_del_container_req(&self, req: DelAppReq) -> Result<(), Error> { log::debug!("Starting deletion of app {}", req.uuid); let app_contract = self.find_app_contract_by_uuid(&req.uuid)?; if app_contract.admin_pubkey != req.admin_pubkey { return Err(Error::AccessDenied); } info!("Found app contract {}. Deleting...", &req.uuid); if let Some(app_daemon_tx) = self.app_daemon_tx.get(&app_contract.node_pubkey) { debug!( "TX for daemon {} found. Informing daemon about deletion of {}.", app_contract.node_pubkey, &req.uuid ); let msg = BrainMessageApp { msg: Some(brain_message_app::Msg::DeleteAppReq(req.clone())), }; if let Err(e) = app_daemon_tx.send(msg).await { warn!( "Failed to send deletion request to {} due to error: {e:?}", app_contract.node_pubkey ); info!("Deleting daemon TX for {}", app_contract.node_pubkey); self.del_app_daemon_tx(&app_contract.node_pubkey); } } self.add_nano_to_wallet(&app_contract.admin_pubkey, app_contract.locked_nano); let mut app_contracts = self.app_contracts.write().unwrap(); app_contracts.retain(|c| c.uuid != req.uuid); Ok(()) } pub async fn send_new_container_resp(&self, new_container_resp: NewAppRes) { let new_container_req = match self.tmp_new_container_reqs.remove(&new_container_resp.uuid) { Some((_, r)) => r, None => { log::error!( "Received confirmation for ghost new container req {}", new_container_resp.uuid ); return; } }; if let Err(err) = new_container_req.1.send(new_container_resp.clone()) { log::error!( "CLI RX for {} dropped before receiving confirmation {:?}.\n{:?}", &new_container_req.0.admin_pubkey, new_container_resp, err ); } if new_container_resp.error != "" { if let Some(mut admin_wallet) = self.accounts.get_mut(&new_container_req.0.admin_pubkey) { admin_wallet.balance += new_container_req.0.locked_nano; admin_wallet.tmp_locked -= new_container_req.0.locked_nano; } return; } if let Some(mut admin_wallet) = self.accounts.get_mut(&new_container_req.0.admin_pubkey) { admin_wallet.tmp_locked -= new_container_req.0.locked_nano; } let requested_resource = new_container_req.0.resource.clone().unwrap_or_default(); let app_contracts = AppContract { uuid: new_container_req.0.uuid, package_url: new_container_req.0.package_url, admin_pubkey: new_container_req.0.admin_pubkey, node_pubkey: new_container_req.0.node_pubkey.clone(), mapped_ports: new_container_resp .mapped_ports .iter() .map(|p| (p.host_port as u16, p.app_port as u16)) .collect::>(), host_ipv4: new_container_resp.ip_address, disk_size_mb: requested_resource.disk_mb, vcpus: requested_resource.vcpu, memory_mb: requested_resource.memory_mb, created_at: Utc::now(), updated_at: Utc::now(), price_per_unit: new_container_req.0.price_per_unit, locked_nano: new_container_req.0.locked_nano, collected_at: Utc::now(), hratls_pubkey: new_container_req.0.hratls_pubkey, public_package_mr_enclave: new_container_req.0.public_package_mr_enclave, }; log::info!("Created new app contract: {app_contracts:?}"); self.app_contracts.write().unwrap().push(app_contracts); } }