From 02be48fd96fff98fcdce3f1f6945063bd4402962 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Thu, 13 Feb 2025 01:47:56 +0200 Subject: [PATCH] add support for operators --- Cargo.lock | 31 +++- Cargo.toml | 8 +- src/data.rs | 469 +++++++++++++++++++++++++++++++++++++++++++++------- src/grpc.rs | 146 +++++++++++++--- src/main.rs | 4 + vm.proto | 77 +++++++-- 6 files changed, 632 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ad4f555..14d3b2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -244,7 +244,7 @@ dependencies = [ "prost-types", "reqwest", "serde", - "serde_json", + "serde_yaml", "thiserror", "tokio", "tokio-stream", @@ -305,6 +305,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets", ] @@ -401,6 +402,7 @@ dependencies = [ "lock_api", "once_cell", "parking_lot_core", + "serde", ] [[package]] @@ -1607,18 +1609,18 @@ checksum = "f79dfe2d285b0488816f30e700a7438c5a73d816b5b7d3ac72fbc48b0d185e03" [[package]] name = "serde" -version = "1.0.216" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.216" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", @@ -1649,6 +1651,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap 2.7.0", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha2" version = "0.10.8" @@ -2049,6 +2064,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 00e4298..3ae3262 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,16 +5,16 @@ edition = "2021" [dependencies] bs58 = "0.5.1" -chrono = "0.4.39" -dashmap = "6.1.0" +chrono = { version = "0.4.39", features = ["serde"] } +dashmap = { version = "6.1.0", features = ["serde"] } ed25519-dalek = "2.1.1" env_logger = "0.11.6" log = "0.4.22" prost = "0.13.4" prost-types = "0.13.4" reqwest = "0.12.10" -serde = { version = "1.0.216", features = ["derive"] } -serde_json = "1.0.134" +serde = { version = "1.0.217", features = ["derive"] } +serde_yaml = "0.9.34" thiserror = "2.0.11" tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } tokio-stream = "0.1.17" diff --git a/src/data.rs b/src/data.rs index 13d3d6a..9f7182e 100644 --- a/src/data.rs +++ b/src/data.rs @@ -2,29 +2,56 @@ use crate::grpc::snp_proto::{self as grpc}; use chrono::Utc; use dashmap::DashMap; use log::{debug, info, warn}; +use serde::{Deserialize, Serialize}; use std::str::FromStr; use std::sync::RwLock; +use std::{ + collections::{HashMap, HashSet}, + fs::File, + io::Write, +}; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Sender as OneshotSender; +const DATA_PATH: &str = "/etc/detee/brain-mock/saved_data.yaml"; + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("We do not allow locking of more than 100000 LP.")] TxTooBig, + #[error("Escrow must be at least 5000 LP.")] + MinimalEscrow, #[error("Account has insufficient funds for this operation")] InsufficientFunds, #[error("Could not find contract {0}")] VmContractNotFound(String), + #[error("This error should never happen.")] + ImpossibleError, + #[error("You don't have the required permissions for this operation.")] + AccessDenied, } -#[derive(Clone)] -pub struct AccountNanoLP { +#[derive(Clone, Default, Serialize, Deserialize)] +pub struct AccountData { pub balance: u64, pub tmp_locked: u64, + // holds reasons why VMs of this account got kicked + pub kicked_for: Vec, + pub last_kick: chrono::DateTime, + // holds accounts that banned this account + pub banned_by: HashSet, } -impl From for grpc::AccountBalance { - fn from(value: AccountNanoLP) -> Self { +#[derive(Clone, Default, Serialize, Deserialize)] +pub struct OperatorData { + pub escrow: u64, + pub email: String, + pub banned_users: HashSet, + pub vm_nodes: HashSet, +} + +impl From for grpc::AccountBalance { + fn from(value: AccountData) -> Self { grpc::AccountBalance { balance: value.balance, tmp_locked: value.tmp_locked, @@ -32,10 +59,10 @@ impl From for grpc::AccountBalance { } } -#[derive(Eq, Hash, PartialEq, Clone, Debug, Default)] +#[derive(Eq, PartialEq, Clone, Debug, Default, Serialize, Deserialize)] pub struct VmNode { pub public_key: String, - pub owner_key: String, + pub operator_wallet: String, pub country: String, pub region: String, pub city: String, @@ -49,24 +76,27 @@ pub struct VmNode { pub max_ports_per_vm: u32, // nanoLP per unit per minute pub price: u64, + // 1st String is user wallet and 2nd String is report message + pub reports: HashMap, + pub offline_minutes: u64, } impl Into for VmNode { fn into(self) -> grpc::VmNodeListResp { grpc::VmNodeListResp { + operator: self.operator_wallet, node_pubkey: self.public_key, country: self.country, region: self.region, city: self.city, ip: self.ip, - server_rating: 0, - provider_rating: 0, price: self.price, + reports: self.reports.into_values().collect(), } } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct VmContract { pub uuid: String, pub hostname: String, @@ -82,14 +112,15 @@ pub struct VmContract { pub dtrfs_sha: String, pub created_at: chrono::DateTime, pub updated_at: chrono::DateTime, - // price per unit per minute // recommended value is 20000 + /// price per unit per minute pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: chrono::DateTime, } impl VmContract { + /// total hardware units of this VM fn total_units(&self) -> u64 { // TODO: Optimize this based on price of hardware. // I tried, but this can be done better. @@ -100,7 +131,7 @@ impl VmContract { + (!self.public_ipv4.is_empty() as u64 * 10) } - // Returns price per minute in nanoLP + /// Returns price per minute in nanoLP fn price_per_minute(&self) -> u64 { self.total_units() * self.price_per_unit } @@ -131,78 +162,178 @@ impl Into for VmContract { } } -#[derive(Default)] +#[derive(Default, Serialize, Deserialize)] pub struct BrainData { // amount of nanoLP in each account - accounts: DashMap, + accounts: DashMap, + operators: DashMap, vm_nodes: RwLock>, vm_contracts: RwLock>, + #[serde(skip_serializing, skip_deserializing)] tmp_newvm_reqs: DashMap)>, + #[serde(skip_serializing, skip_deserializing)] tmp_updatevm_reqs: DashMap)>, + #[serde(skip_serializing, skip_deserializing)] daemon_tx: DashMap>, } impl BrainData { + pub fn save_to_disk(&self) -> Result<(), Box> { + let mut file = File::create(DATA_PATH)?; + file.write_all(serde_yaml::to_string(self)?.as_bytes())?; + Ok(()) + } + + fn load_from_disk() -> Result> { + let content = std::fs::read_to_string(DATA_PATH)?; + let data: Self = serde_yaml::from_str(&content)?; + Ok(data) + } + pub fn new() -> Self { - Self { - accounts: DashMap::new(), - vm_nodes: RwLock::new(Vec::new()), - vm_contracts: RwLock::new(Vec::new()), - tmp_newvm_reqs: DashMap::new(), - tmp_updatevm_reqs: DashMap::new(), - daemon_tx: DashMap::new(), + match Self::load_from_disk() { + Ok(data) => data, + Err(e) => { + warn!("Could not data {DATA_PATH} due to error: {e:?}"); + info!("Creating new instance of brain."); + Self { + accounts: DashMap::new(), + operators: DashMap::new(), + vm_nodes: RwLock::new(Vec::new()), + vm_contracts: RwLock::new(Vec::new()), + tmp_newvm_reqs: DashMap::new(), + tmp_updatevm_reqs: DashMap::new(), + daemon_tx: DashMap::new(), + } + } } } - pub fn get_balance(&self, account: &str) -> AccountNanoLP { + pub fn get_balance(&self, account: &str) -> AccountData { if let Some(account) = self.accounts.get(account) { return account.value().clone(); } else { - let balance = AccountNanoLP { + let balance = AccountData { balance: 0, tmp_locked: 0, + kicked_for: Vec::new(), + banned_by: HashSet::new(), + last_kick: chrono::Utc::now(), }; return balance; } } pub fn give_airdrop(&self, account: &str, tokens: u64) { + warn!("Airdropping {tokens} to {account}."); self.add_nano_to_wallet(account, tokens.saturating_mul(1_000_000_000)); } + pub fn slash_account(&self, account: &str, tokens: u64) { + warn!("Slashing {tokens} from {account}."); + self.rm_nano_from_wallet(account, tokens.saturating_mul(1_000_000_000)); + } + fn add_nano_to_wallet(&self, account: &str, nano_lp: u64) { log::debug!("Adding {nano_lp} nanoLP to {account}"); self.accounts .entry(account.to_string()) .and_modify(|d| d.balance += nano_lp) - .or_insert(AccountNanoLP { + .or_insert(AccountData { balance: nano_lp, - tmp_locked: 0, + ..Default::default() }); } + fn rm_nano_from_wallet(&self, account: &str, nano_lp: u64) { + log::debug!("Slashing {nano_lp} nanoLP to {account}"); + self.accounts.entry(account.to_string()).and_modify(|d| { + d.balance = d.balance.saturating_sub(nano_lp); + }); + } + + /// This is written to run every minute + pub async fn vm_nodes_cron(&self) { + log::debug!("Running vm nodes cron..."); + let mut nodes = self.vm_nodes.write().unwrap(); + let mut vm_contracts = self.vm_contracts.write().unwrap(); + for node in nodes.iter_mut() { + if self.daemon_tx.contains_key(&node.public_key) { + node.offline_minutes = 0; + continue; + } + let mut operator = match self + .operators + .iter_mut() + .find(|o| o.vm_nodes.contains(&node.public_key)) + { + Some(op) => op, + None => continue, + }; + node.offline_minutes += 1; + // compensate contract admin if the node is offline more then 5 minutes + if node.offline_minutes > 5 { + for c in vm_contracts + .iter() + .filter(|c| c.node_pubkey == node.public_key) + { + let compensation = c.price_per_minute() * 24; + if compensation < operator.escrow { + operator.escrow -= compensation; + self.add_nano_to_wallet(&c.admin_pubkey, compensation); + } + } + } + } + // delete nodes that are offline more than 3 hours, and clean contracts + nodes.retain(|n| { + if n.offline_minutes > 180 { + vm_contracts.retain_mut(|c| { + if c.node_pubkey == n.public_key { + self.add_nano_to_wallet(&c.admin_pubkey, c.locked_nano); + } + c.node_pubkey != n.public_key + }); + for mut op in self.operators.iter_mut() { + op.vm_nodes.remove(&n.public_key); + } + } + n.offline_minutes <= 180 + }); + } + pub async fn vm_contracts_cron(&self) { let mut deleted_contracts: Vec<(String, String)> = Vec::new(); log::debug!("Running contracts cron..."); { let mut contracts = self.vm_contracts.write().unwrap(); contracts.retain_mut(|c| { - let owner_key = self - .find_nodes_by_pubkey(&c.node_pubkey) - .unwrap() - .owner_key - .clone(); - let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; - c.collected_at = Utc::now(); - let mut nanolp_to_collect = c.price_per_minute().saturating_mul(minutes_to_collect); - if nanolp_to_collect > c.locked_nano { - nanolp_to_collect = c.locked_nano; - } - log::debug!("Removing {nanolp_to_collect} nanoLP from {}", c.uuid); - c.locked_nano -= nanolp_to_collect; - self.add_nano_to_wallet(&owner_key, nanolp_to_collect); - if c.locked_nano == 0 { - deleted_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); + let node = self.find_node_by_pubkey(&c.node_pubkey).unwrap(); + if node.offline_minutes == 0 { + let operator_wallet = node.operator_wallet.clone(); + let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; + c.collected_at = Utc::now(); + let mut nanolp_to_collect = + c.price_per_minute().saturating_mul(minutes_to_collect); + if nanolp_to_collect > c.locked_nano { + nanolp_to_collect = c.locked_nano; + } + log::debug!("Removing {nanolp_to_collect} nanoLP from {}", c.uuid); + c.locked_nano -= nanolp_to_collect; + let escrow_multiplier = match self.operators.get(&operator_wallet) { + Some(op) if op.escrow > 5000 => match self.operators.get(&c.admin_pubkey) { + Some(user_is_op) if user_is_op.escrow > 5000 => 1, + _ => 5, + }, + _ => 1, + }; + self.add_nano_to_wallet( + &operator_wallet, + nanolp_to_collect * escrow_multiplier, + ); + if c.locked_nano == 0 { + deleted_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); + } } c.locked_nano > 0 }); @@ -222,8 +353,9 @@ impl BrainData { } } - pub fn insert_node(&self, node: VmNode) { + pub fn register_node(&self, node: VmNode) { info!("Registering node {node:?}"); + self.add_vmnode_to_operator(&node.operator_wallet, &node.public_key); let mut nodes = self.vm_nodes.write().unwrap(); for n in nodes.iter_mut() { if n.public_key == node.public_key { @@ -236,6 +368,101 @@ impl BrainData { nodes.push(node); } + // todo: this should also support Apps + /// Receives: operator, contract uuid, reason of kick + pub async fn kick_contract( + &self, + operator: &str, + uuid: &str, + reason: &str, + ) -> Result { + log::debug!("Operator {operator} requested a kick of {uuid} for reason: {reason}"); + let contract = self.find_contract_by_uuid(uuid)?; + let mut operator_data = self + .operators + .get_mut(operator) + .ok_or(Error::AccessDenied)?; + if !operator_data.vm_nodes.contains(&contract.node_pubkey) { + return Err(Error::AccessDenied); + } + + let mut minutes_to_refund = chrono::Utc::now() + .signed_duration_since(contract.updated_at) + .num_minutes() + .abs() as u64; + // cap refund at 1 week + if minutes_to_refund > 10080 { + minutes_to_refund = 10080; + } + + let mut refund_amount = minutes_to_refund * contract.price_per_minute(); + let mut admin_account = self + .accounts + .get_mut(&contract.admin_pubkey) + .ok_or(Error::ImpossibleError)?; + + // check if he got kicked within the last day + if !chrono::Utc::now() + .signed_duration_since(admin_account.last_kick) + .gt(&chrono::Duration::days(1)) + { + refund_amount = 0; + } + + if operator_data.escrow < refund_amount { + refund_amount = operator_data.escrow; + } + + log::debug!( + "Removing {refund_amount} escrow from {} and giving it to {}", + operator_data.key(), + admin_account.key() + ); + admin_account.balance += refund_amount; + admin_account.kicked_for.push(reason.to_string()); + operator_data.escrow -= refund_amount; + + let admin_pubkey = contract.admin_pubkey.clone(); + drop(admin_account); + drop(contract); + + self.delete_vm(grpc::DeleteVmReq { + uuid: uuid.to_string(), + admin_pubkey, + }) + .await?; + + Ok(refund_amount) + } + + pub fn ban_user(&self, operator: &str, user: &str) { + self.accounts + .entry(user.to_string()) + .and_modify(|a| { + a.banned_by.insert(operator.to_string()); + }) + .or_insert(AccountData { + banned_by: HashSet::from([operator.to_string()]), + ..Default::default() + }); + self.operators + .entry(operator.to_string()) + .and_modify(|o| { + o.banned_users.insert(user.to_string()); + }) + .or_insert(OperatorData { + banned_users: HashSet::from([user.to_string()]), + ..Default::default() + }); + } + + pub fn report_node(&self, admin_pubkey: String, node: &str, report: String) { + let mut nodes = self.vm_nodes.write().unwrap(); + if let Some(node) = nodes.iter_mut().find(|n| n.public_key == node) { + node.reports.insert(admin_pubkey, report); + } + } + pub fn lock_nanotockens(&self, account: &str, nano_lp: u64) -> Result<(), Error> { if nano_lp > 100_000_000_000_000 { return Err(Error::TxTooBig); @@ -321,17 +548,11 @@ impl BrainData { } pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) -> Result<(), Error> { - let contract = match self.find_contract_by_uuid(&delete_vm.uuid) { - Some(contract) => { - if contract.admin_pubkey != delete_vm.admin_pubkey { - return Err(Error::VmContractNotFound(delete_vm.uuid)); - } - contract - } - None => { - return Err(Error::VmContractNotFound(delete_vm.uuid)); - } - }; + log::debug!("Starting deletion of VM {}", delete_vm.uuid); + let contract = self.find_contract_by_uuid(&delete_vm.uuid)?; + if contract.admin_pubkey != delete_vm.admin_pubkey { + return Err(Error::AccessDenied); + } info!("Found vm {}. Deleting...", delete_vm.uuid); if let Some(daemon_tx) = self.daemon_tx.get(&contract.node_pubkey) { debug!( @@ -540,7 +761,7 @@ impl BrainData { let uuid = req.uuid.clone(); info!("Inserting new vm update request in memory: {req:?}"); let node_pubkey = match self.find_contract_by_uuid(&req.uuid) { - Some(contract) => { + Ok(contract) => { if contract.admin_pubkey != req.admin_pubkey { let _ = tx.send(grpc::UpdateVmResp { uuid, @@ -551,7 +772,7 @@ impl BrainData { } contract.node_pubkey } - None => { + Err(_) => { log::warn!( "Received UpdateVMReq for a contract that does not exist: {}", req.uuid @@ -602,12 +823,81 @@ impl BrainData { } } - pub fn find_nodes_by_pubkey(&self, public_key: &str) -> Option { + pub fn find_node_by_pubkey(&self, public_key: &str) -> Option { let nodes = self.vm_nodes.read().unwrap(); nodes.iter().cloned().find(|n| n.public_key == public_key) } - pub fn find_nodes_by_filters( + pub fn is_user_banned_by_node(&self, user_wallet: &str, node_pubkey: &str) -> bool { + if let Some(node) = self.find_node_by_pubkey(&node_pubkey) { + if let Some(account) = self.accounts.get(user_wallet) { + if account.banned_by.contains(&node.operator_wallet) { + return true; + } + } + } + false + } + + pub fn add_vmnode_to_operator(&self, operator_wallet: &str, node_pubkey: &str) { + self.operators + .entry(operator_wallet.to_string()) + .and_modify(|op| { + op.vm_nodes.insert(node_pubkey.to_string()); + }) + .or_insert(OperatorData { + escrow: 0, + email: String::new(), + banned_users: HashSet::new(), + vm_nodes: HashSet::from([node_pubkey.to_string()]), + }); + } + + pub fn register_operator(&self, req: grpc::RegOperatorReq) -> Result<(), Error> { + let mut operator = match self.operators.get(&req.pubkey) { + Some(o) => (*(o.value())).clone(), + None => OperatorData { + ..Default::default() + }, + }; + if req.escrow < 5000 { + return Err(Error::MinimalEscrow); + } + let escrow = req.escrow * 1_000_000_000; + if let Some(mut account) = self.accounts.get_mut(&req.pubkey) { + if (account.balance + operator.escrow) < escrow { + return Err(Error::InsufficientFunds); + } + account.balance = account.balance + operator.escrow - escrow; + operator.escrow = escrow; + } else { + return Err(Error::InsufficientFunds); + } + operator.email = req.email; + self.operators.insert(req.pubkey, operator); + Ok(()) + } + + pub fn find_vm_nodes_by_operator(&self, operator_wallet: &str) -> Vec { + let nodes = self.vm_nodes.read().unwrap(); + nodes + .iter() + .filter(|node| node.operator_wallet == operator_wallet) + .cloned() + .collect() + } + + pub fn total_operator_reports(&self, operator_wallet: &str) -> usize { + let nodes = self.vm_nodes.read().unwrap(); + nodes + .iter() + .cloned() + .filter(|n| n.operator_wallet == operator_wallet) + .map(|node| node.reports.len()) + .sum() + } + + pub fn find_vm_nodes_by_filters( &self, filters: &crate::grpc::snp_proto::VmNodeFilters, ) -> Vec { @@ -654,9 +944,13 @@ impl BrainData { .cloned() } - pub fn find_contract_by_uuid(&self, uuid: &str) -> Option { + pub fn find_contract_by_uuid(&self, uuid: &str) -> Result { let contracts = self.vm_contracts.read().unwrap(); - contracts.iter().cloned().find(|c| c.uuid == uuid) + contracts + .iter() + .cloned() + .find(|c| c.uuid == uuid) + .ok_or(Error::VmContractNotFound(uuid.to_string())) } pub fn list_all_contracts(&self) -> Vec { @@ -675,17 +969,68 @@ impl BrainData { .collect() } - pub fn find_vm_contracts_by_admin(&self, admin_pubkey: &str) -> Vec { - debug!("Searching contracts for admin pubkey {admin_pubkey}"); + pub fn list_operators(&self) -> Vec { + self.operators + .iter() + .map(|op| grpc::ListOperatorsResp { + pubkey: op.key().to_string(), + escrow: op.escrow / 1_000_000_000, + email: op.email.clone(), + app_nodes: 0, + vm_nodes: op.vm_nodes.len() as u64, + reports: self.total_operator_reports(op.key()) as u64, + }) + .collect() + } + + pub fn inspect_operator(&self, wallet: &str) -> Option { + self.operators.get(wallet).map(|op| { + let nodes = self + .find_vm_nodes_by_operator(wallet) + .into_iter() + .map(|n| n.into()) + .collect(); + grpc::InspectOperatorResp { + operator: Some(grpc::ListOperatorsResp { + pubkey: op.key().to_string(), + escrow: op.escrow, + email: op.email.clone(), + app_nodes: 0, + vm_nodes: op.vm_nodes.len() as u64, + reports: self.total_operator_reports(op.key()) as u64, + }), + nodes, + } + }) + } + + pub fn find_vm_contracts_by_operator(&self, wallet: &str) -> Vec { + debug!("Searching contracts for operator {wallet}"); + let nodes = match self.operators.get(wallet) { + Some(op) => op.vm_nodes.clone(), + None => return Vec::new(), + }; let contracts: Vec = self .vm_contracts .read() .unwrap() .iter() - .filter(|c| c.admin_pubkey == admin_pubkey) + .filter(|c| nodes.contains(&c.node_pubkey)) + .cloned() + .collect(); + contracts + } + + pub fn find_vm_contracts_by_admin(&self, admin_wallet: &str) -> Vec { + debug!("Searching contracts for admin pubkey {admin_wallet}"); + let contracts: Vec = self + .vm_contracts + .read() + .unwrap() + .iter() + .filter(|c| c.admin_pubkey == admin_wallet) .cloned() .collect(); - debug!("Found {} contracts or {admin_pubkey}.", contracts.len()); contracts } diff --git a/src/grpc.rs b/src/grpc.rs index a80f74b..f3a6327 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,10 +1,9 @@ - pub mod snp_proto { tonic::include_proto!("vm_proto"); } -use crate::grpc::vm_daemon_message; use crate::data::BrainData; +use crate::grpc::vm_daemon_message; use log::info; use snp_proto::brain_cli_server::BrainCli; use snp_proto::brain_vm_daemon_server::BrainVmDaemon; @@ -52,7 +51,7 @@ impl BrainVmDaemon for BrainDaemonMock { info!("Starting registration process for {:?}", req); let node = crate::data::VmNode { public_key: req.node_pubkey.clone(), - owner_key: req.owner_pubkey, + operator_wallet: req.operator_wallet, country: req.country, region: req.region, city: req.city, @@ -60,7 +59,7 @@ impl BrainVmDaemon for BrainDaemonMock { price: req.price, ..Default::default() }; - self.data.insert_node(node); + self.data.register_node(node); info!("Sending existing contracts to {}", req.node_pubkey); let contracts = self.data.find_vm_contracts_by_node(&req.node_pubkey); @@ -161,6 +160,14 @@ impl BrainCli for BrainCliMock { async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("New VM requested via CLI: {req:?}"); + if self + .data + .is_user_banned_by_node(&req.admin_pubkey, &req.node_pubkey) + { + return Err(Status::permission_denied( + "This operator banned you. What did you do?", + )); + } let admin_pubkey = req.admin_pubkey.clone(); let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); self.data.submit_newvm_req(req, oneshot_tx).await; @@ -205,20 +212,55 @@ impl BrainCli for BrainCliMock { } } + async fn delete_vm(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + match self.data.delete_vm(req).await { + Ok(()) => Ok(Response::new(Empty {})), + Err(e) => Err(Status::not_found(e.to_string())), + } + } + + async fn report_node(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + match self.data.find_contract_by_uuid(&req.contract) { + Ok(contract) + if contract.admin_pubkey == req.admin_pubkey + && contract.node_pubkey == req.node_pubkey => + { + () + } + _ => return Err(Status::unauthenticated("No contract found by this ID.")), + }; + self.data + .report_node(req.admin_pubkey, &req.node_pubkey, req.reason); + Ok(Response::new(Empty {})) + } + type ListVmContractsStream = Pin> + Send>>; async fn list_vm_contracts( &self, req: Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; - info!("CLI {} requested ListVMVmContractsStream", req.admin_pubkey); - let contracts = match req.uuid.is_empty() { - false => match self.data.find_contract_by_uuid(&req.uuid) { - Some(contract) => vec![contract], - None => Vec::new(), - }, - true => self.data.find_vm_contracts_by_admin(&req.admin_pubkey), - }; + info!( + "CLI {} requested ListVMVmContractsStream. As operator: {}", + req.wallet, req.as_operator + ); + let mut contracts = Vec::new(); + if !req.uuid.is_empty() { + if let Ok(specific_contract) = self.data.find_contract_by_uuid(&req.uuid) { + if specific_contract.admin_pubkey == req.wallet { + contracts.push(specific_contract); + } + // TODO: allow operator to inspect contracts + } + } else { + if req.as_operator { + contracts.append(&mut self.data.find_vm_contracts_by_operator(&req.wallet)); + } else { + contracts.append(&mut self.data.find_vm_contracts_by_admin(&req.wallet)); + } + } let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { @@ -238,7 +280,7 @@ impl BrainCli for BrainCliMock { ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("CLI requested ListVmNodesStream: {req:?}"); - let nodes = self.data.find_nodes_by_filters(&req); + let nodes = self.data.find_vm_nodes_by_filters(&req); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for node in nodes { @@ -265,12 +307,65 @@ impl BrainCli for BrainCliMock { } } - async fn delete_vm(&self, req: Request) -> Result, Status> { + async fn register_operator( + &self, + req: Request, + ) -> Result, Status> { let req = check_sig_from_req(req)?; - info!("Unknown CLI requested to delete vm {}", req.uuid); - match self.data.delete_vm(req).await { + info!("Regitering new operator: {req:?}"); + match self.data.register_operator(req) { Ok(()) => Ok(Response::new(Empty {})), - Err(e) => Err(Status::not_found(e.to_string())), + Err(e) => Err(Status::failed_precondition(e.to_string())), + } + } + + async fn kick_contract(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + match self + .data + .kick_contract(&req.operator_wallet, &req.contract_uuid, &req.reason) + .await + { + Ok(nano_lp) => Ok(Response::new(KickResp { nano_lp })), + Err(e) => Err(Status::permission_denied(e.to_string())), + } + } + + async fn ban_user(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + self.data.ban_user(&req.operator_wallet, &req.user_wallet); + Ok(Response::new(Empty {})) + } + + type ListOperatorsStream = + Pin> + Send>>; + async fn list_operators( + &self, + req: Request, + ) -> Result, Status> { + let _ = check_sig_from_req(req)?; + let operators = self.data.list_operators(); + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for op in operators { + let _ = tx.send(Ok(op.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::ListOperatorsStream + )) + } + + async fn inspect_operator( + &self, + req: Request, + ) -> Result, Status> { + match self.data.inspect_operator(&req.into_inner().pubkey) { + Some(op) => Ok(Response::new(op.into())), + None => Err(Status::not_found( + "The wallet you specified is not an operator", + )), } } @@ -278,7 +373,14 @@ impl BrainCli for BrainCliMock { check_admin_key(&req)?; let req = check_sig_from_req(req)?; self.data.give_airdrop(&req.pubkey, req.tokens); - Ok(Response::new(Empty{})) + Ok(Response::new(Empty {})) + } + + async fn slash(&self, req: Request) -> Result, Status> { + check_admin_key(&req)?; + let req = check_sig_from_req(req)?; + self.data.slash_account(&req.pubkey, req.tokens); + Ok(Response::new(Empty {})) } type ListAllVmContractsStream = Pin> + Send>>; @@ -320,7 +422,6 @@ impl BrainCli for BrainCliMock { Box::pin(output_stream) as Self::ListAccountsStream )) } - } trait PubkeyGetter { @@ -349,12 +450,17 @@ impl_pubkey_getter!(NewVmReq, admin_pubkey); impl_pubkey_getter!(DeleteVmReq, admin_pubkey); impl_pubkey_getter!(UpdateVmReq, admin_pubkey); impl_pubkey_getter!(ExtendVmReq, admin_pubkey); -impl_pubkey_getter!(ListVmContractsReq, admin_pubkey); +impl_pubkey_getter!(ReportNodeReq, admin_pubkey); +impl_pubkey_getter!(ListVmContractsReq, wallet); impl_pubkey_getter!(RegisterVmNodeReq, node_pubkey); +impl_pubkey_getter!(RegOperatorReq, pubkey); +impl_pubkey_getter!(KickReq, operator_wallet); +impl_pubkey_getter!(BanUserReq, operator_wallet); impl_pubkey_getter!(VmNodeFilters); impl_pubkey_getter!(Empty); impl_pubkey_getter!(AirdropReq); +impl_pubkey_getter!(SlashReq); fn check_sig_from_req(req: Request) -> Result { let time = match req.metadata().get("timestamp") { diff --git a/src/main.rs b/src/main.rs index ffc72f3..64936a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,11 @@ async fn main() { tokio::spawn(async move { loop { tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + data_clone.vm_nodes_cron().await; data_clone.vm_contracts_cron().await; + if let Err(e) = data_clone.save_to_disk() { + log::error!("Could not save data to disk due to error: {e}") + } } }); let addr = "0.0.0.0:31337".parse().unwrap(); diff --git a/vm.proto b/vm.proto index e755772..f0f4d2e 100644 --- a/vm.proto +++ b/vm.proto @@ -55,7 +55,7 @@ message MeasurementIP { // This should also include a block hash or similar, for auth message RegisterVmNodeReq { string node_pubkey = 1; - string owner_pubkey = 2; + string operator_wallet = 2; string main_ip = 3; string country = 4; string region = 5; @@ -154,8 +154,8 @@ service BrainVmDaemon { } message ListVmContractsReq { - string admin_pubkey = 1; - string node_pubkey = 2; + string wallet = 1; + bool as_operator = 2; string uuid = 3; } @@ -174,15 +174,14 @@ message VmNodeFilters { } message VmNodeListResp { - string node_pubkey = 1; - string country = 2; - string region = 3; - string city = 4; - string ip = 5; // required for latency test - uint32 server_rating = 6; - uint32 provider_rating = 7; - // nanoLP per unit per minute - uint64 price = 8; + string operator = 1; + string node_pubkey = 2; + string country = 3; + string region = 4; + string city = 5; + string ip = 6; // required for latency test + repeated string reports = 7; // TODO: this will become an enum + uint64 price = 8; // nanoLP per unit per minute } message ExtendVmReq { @@ -196,12 +195,59 @@ message AirdropReq { uint64 tokens = 2; } +message SlashReq { + string pubkey = 1; + uint64 tokens = 2; +} + message Account { string pubkey = 1; uint64 balance = 2; uint64 tmp_locked = 3; } +message RegOperatorReq { + string pubkey = 1; + uint64 escrow = 2; + string email = 3; +} + +message ListOperatorsResp { + string pubkey = 1; + uint64 escrow = 2; + string email = 3; + uint64 app_nodes = 4; + uint64 vm_nodes = 5; + uint64 reports = 6; +} + +message InspectOperatorResp { + ListOperatorsResp operator = 1; + repeated VmNodeListResp nodes = 2; +} + +message ReportNodeReq { + string admin_pubkey = 1; + string node_pubkey = 2; + string contract = 3; + string reason = 4; +} + +message KickReq { + string operator_wallet = 1; + string contract_uuid = 2; + string reason = 3; +} + +message BanUserReq { + string operator_wallet = 1; + string user_wallet = 2; +} + +message KickResp { + uint64 nano_lp = 1; +} + service BrainCli { rpc GetBalance (Pubkey) returns (AccountBalance); rpc NewVm (NewVmReq) returns (NewVmResp); @@ -211,8 +257,15 @@ service BrainCli { rpc DeleteVm (DeleteVmReq) returns (Empty); rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp); rpc ExtendVm (ExtendVmReq) returns (Empty); + rpc ReportNode (ReportNodeReq) returns (Empty); + rpc ListOperators (Empty) returns (stream ListOperatorsResp); + rpc InspectOperator (Pubkey) returns (InspectOperatorResp); + rpc RegisterOperator (RegOperatorReq) returns (Empty); + rpc KickContract (KickReq) returns (KickResp); + rpc BanUser (BanUserReq) returns (Empty); // admin commands rpc Airdrop (AirdropReq) returns (Empty); + rpc Slash (SlashReq) returns (Empty); rpc ListAllVmContracts (Empty) returns (stream VmContract); rpc ListAccounts (Empty) returns (stream Account); }