diff --git a/src/data.rs b/src/data.rs index 13d3d6a..5d5d998 100644 --- a/src/data.rs +++ b/src/data.rs @@ -2,6 +2,7 @@ use crate::grpc::snp_proto::{self as grpc}; use chrono::Utc; use dashmap::DashMap; use log::{debug, info, warn}; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::RwLock; use tokio::sync::mpsc::Sender; @@ -11,20 +12,34 @@ use tokio::sync::oneshot::Sender as OneshotSender; pub enum Error { #[error("We do not allow locking of more than 100000 LP.")] TxTooBig, + #[error("Escrow must be at least 5000 LP.")] + MinimalEscrow, #[error("Account has insufficient funds for this operation")] InsufficientFunds, #[error("Could not find contract {0}")] VmContractNotFound(String), } -#[derive(Clone)] -pub struct AccountNanoLP { +#[derive(Clone, Default)] +pub struct AccountData { pub balance: u64, pub tmp_locked: u64, + // holds reasons why VMs of this account got kicked + pub kicked_for: Vec, + // holds accounts that banned this account + pub banned_by: HashSet, } -impl From for grpc::AccountBalance { - fn from(value: AccountNanoLP) -> Self { +#[derive(Clone, Default)] +pub struct OperatorData { + pub escrow: u64, + pub email: String, + pub banned_users: HashSet, + pub vm_nodes: HashSet, +} + +impl From for grpc::AccountBalance { + fn from(value: AccountData) -> Self { grpc::AccountBalance { balance: value.balance, tmp_locked: value.tmp_locked, @@ -32,10 +47,10 @@ impl From for grpc::AccountBalance { } } -#[derive(Eq, Hash, PartialEq, Clone, Debug, Default)] +#[derive(Eq, PartialEq, Clone, Debug, Default)] pub struct VmNode { pub public_key: String, - pub owner_key: String, + pub operator_wallet: String, pub country: String, pub region: String, pub city: String, @@ -49,19 +64,21 @@ pub struct VmNode { pub max_ports_per_vm: u32, // nanoLP per unit per minute pub price: u64, + // 1st String is user wallet and 2nd String is report message + pub reports: HashMap, } impl Into for VmNode { fn into(self) -> grpc::VmNodeListResp { grpc::VmNodeListResp { + operator: self.operator_wallet, node_pubkey: self.public_key, country: self.country, region: self.region, city: self.city, ip: self.ip, - server_rating: 0, - provider_rating: 0, price: self.price, + reports: self.reports.into_values().collect(), } } } @@ -134,7 +151,8 @@ impl Into for VmContract { #[derive(Default)] pub struct BrainData { // amount of nanoLP in each account - accounts: DashMap, + accounts: DashMap, + operators: DashMap, vm_nodes: RwLock>, vm_contracts: RwLock>, tmp_newvm_reqs: DashMap)>, @@ -146,6 +164,7 @@ impl BrainData { pub fn new() -> Self { Self { accounts: DashMap::new(), + operators: DashMap::new(), vm_nodes: RwLock::new(Vec::new()), vm_contracts: RwLock::new(Vec::new()), tmp_newvm_reqs: DashMap::new(), @@ -154,43 +173,58 @@ impl BrainData { } } - pub fn get_balance(&self, account: &str) -> AccountNanoLP { + pub fn get_balance(&self, account: &str) -> AccountData { if let Some(account) = self.accounts.get(account) { return account.value().clone(); } else { - let balance = AccountNanoLP { + let balance = AccountData { balance: 0, tmp_locked: 0, + kicked_for: Vec::new(), + banned_by: HashSet::new(), }; return balance; } } pub fn give_airdrop(&self, account: &str, tokens: u64) { + warn!("Airdropping {tokens} to {account}."); self.add_nano_to_wallet(account, tokens.saturating_mul(1_000_000_000)); } + pub fn slash_account(&self, account: &str, tokens: u64) { + warn!("Slashing {tokens} from {account}."); + self.rm_nano_from_wallet(account, tokens.saturating_mul(1_000_000_000)); + } + fn add_nano_to_wallet(&self, account: &str, nano_lp: u64) { log::debug!("Adding {nano_lp} nanoLP to {account}"); self.accounts .entry(account.to_string()) .and_modify(|d| d.balance += nano_lp) - .or_insert(AccountNanoLP { + .or_insert(AccountData { balance: nano_lp, - tmp_locked: 0, + ..Default::default() }); } + fn rm_nano_from_wallet(&self, account: &str, nano_lp: u64) { + log::debug!("Adding {nano_lp} nanoLP to {account}"); + self.accounts.entry(account.to_string()).and_modify(|d| { + d.balance.saturating_sub(nano_lp); + }); + } + pub async fn vm_contracts_cron(&self) { let mut deleted_contracts: Vec<(String, String)> = Vec::new(); log::debug!("Running contracts cron..."); { let mut contracts = self.vm_contracts.write().unwrap(); contracts.retain_mut(|c| { - let owner_key = self - .find_nodes_by_pubkey(&c.node_pubkey) + let operator_wallet = self + .find_node_by_pubkey(&c.node_pubkey) .unwrap() - .owner_key + .operator_wallet .clone(); let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; c.collected_at = Utc::now(); @@ -200,7 +234,7 @@ impl BrainData { } log::debug!("Removing {nanolp_to_collect} nanoLP from {}", c.uuid); c.locked_nano -= nanolp_to_collect; - self.add_nano_to_wallet(&owner_key, nanolp_to_collect); + self.add_nano_to_wallet(&operator_wallet, nanolp_to_collect); if c.locked_nano == 0 { deleted_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); } @@ -222,8 +256,9 @@ impl BrainData { } } - pub fn insert_node(&self, node: VmNode) { + pub fn register_node(&self, node: VmNode) { info!("Registering node {node:?}"); + self.add_vmnode_to_operator(&node.operator_wallet, &node.public_key); let mut nodes = self.vm_nodes.write().unwrap(); for n in nodes.iter_mut() { if n.public_key == node.public_key { @@ -236,6 +271,13 @@ impl BrainData { nodes.push(node); } + pub fn report_node(&self, admin_pubkey: String, node: &str, report: String) { + let mut nodes = self.vm_nodes.write().unwrap(); + if let Some(node) = nodes.iter_mut().find(|n| n.public_key == node) { + node.reports.insert(admin_pubkey, report); + } + } + pub fn lock_nanotockens(&self, account: &str, nano_lp: u64) -> Result<(), Error> { if nano_lp > 100_000_000_000_000 { return Err(Error::TxTooBig); @@ -602,12 +644,69 @@ impl BrainData { } } - pub fn find_nodes_by_pubkey(&self, public_key: &str) -> Option { + pub fn find_node_by_pubkey(&self, public_key: &str) -> Option { let nodes = self.vm_nodes.read().unwrap(); nodes.iter().cloned().find(|n| n.public_key == public_key) } - pub fn find_nodes_by_filters( + pub fn add_vmnode_to_operator(&self, operator_wallet: &str, node_pubkey: &str) { + self.operators + .entry(operator_wallet.to_string()) + .and_modify(|op| { + op.vm_nodes.insert(node_pubkey.to_string()); + }) + .or_insert(OperatorData { + escrow: 0, + email: String::new(), + banned_users: HashSet::new(), + vm_nodes: HashSet::from([node_pubkey.to_string()]), + }); + } + + pub fn register_operator(&self, req: grpc::RegOperatorReq) -> Result<(), Error> { + let mut operator = match self.operators.get(&req.pubkey) { + Some(o) => (*(o.value())).clone(), + None => OperatorData { + escrow: req.escrow, + email: req.email, + banned_users: HashSet::new(), + vm_nodes: HashSet::new(), + }, + }; + if req.escrow < 5000 { + return Err(Error::MinimalEscrow); + } + if let Some(mut account) = self.accounts.get_mut(&req.pubkey) { + if (account.balance + operator.escrow) < req.escrow { + return Err(Error::InsufficientFunds); + } + account.balance = account.balance + operator.escrow - req.escrow; + operator.escrow = req.escrow; + } + self.operators.insert(req.pubkey, operator); + Err(Error::InsufficientFunds) + } + + pub fn find_vm_nodes_by_operator(&self, operator_wallet: &str) -> Vec { + let nodes = self.vm_nodes.read().unwrap(); + nodes + .iter() + .filter(|node| node.operator_wallet == operator_wallet) + .cloned() + .collect() + } + + pub fn total_operator_reports(&self, operator_wallet: &str) -> usize { + let nodes = self.vm_nodes.read().unwrap(); + nodes + .iter() + .cloned() + .filter(|n| n.operator_wallet == operator_wallet) + .map(|node| node.reports.len()) + .sum() + } + + pub fn find_vm_nodes_by_filters( &self, filters: &crate::grpc::snp_proto::VmNodeFilters, ) -> Vec { @@ -675,6 +774,41 @@ impl BrainData { .collect() } + pub fn list_operators(&self) -> Vec { + self.operators + .iter() + .map(|op| grpc::ListOperatorsResp { + pubkey: op.key().to_string(), + escrow: op.escrow, + email: op.email.clone(), + app_nodes: 0, + vm_nodes: op.vm_nodes.len() as u64, + reports: self.total_operator_reports(op.key()) as u64, + }) + .collect() + } + + pub fn inspect_operator(&self, wallet: &str) -> Option { + self.operators.get(wallet).map(|op| { + let nodes = self + .find_vm_nodes_by_operator(wallet) + .into_iter() + .map(|n| n.into()) + .collect(); + grpc::InspectOperatorResp { + operator: Some(grpc::ListOperatorsResp { + pubkey: op.key().to_string(), + escrow: op.escrow, + email: op.email.clone(), + app_nodes: 0, + vm_nodes: op.vm_nodes.len() as u64, + reports: self.total_operator_reports(op.key()) as u64, + }), + nodes, + } + }) + } + pub fn find_vm_contracts_by_admin(&self, admin_pubkey: &str) -> Vec { debug!("Searching contracts for admin pubkey {admin_pubkey}"); let contracts: Vec = self diff --git a/src/grpc.rs b/src/grpc.rs index a80f74b..4bc83c9 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,10 +1,9 @@ - pub mod snp_proto { tonic::include_proto!("vm_proto"); } -use crate::grpc::vm_daemon_message; use crate::data::BrainData; +use crate::grpc::vm_daemon_message; use log::info; use snp_proto::brain_cli_server::BrainCli; use snp_proto::brain_vm_daemon_server::BrainVmDaemon; @@ -52,7 +51,7 @@ impl BrainVmDaemon for BrainDaemonMock { info!("Starting registration process for {:?}", req); let node = crate::data::VmNode { public_key: req.node_pubkey.clone(), - owner_key: req.owner_pubkey, + operator_wallet: req.operator_wallet, country: req.country, region: req.region, city: req.city, @@ -60,7 +59,7 @@ impl BrainVmDaemon for BrainDaemonMock { price: req.price, ..Default::default() }; - self.data.insert_node(node); + self.data.register_node(node); info!("Sending existing contracts to {}", req.node_pubkey); let contracts = self.data.find_vm_contracts_by_node(&req.node_pubkey); @@ -205,6 +204,31 @@ impl BrainCli for BrainCliMock { } } + async fn delete_vm(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + info!("Unknown CLI requested to delete vm {}", req.uuid); + match self.data.delete_vm(req).await { + Ok(()) => Ok(Response::new(Empty {})), + Err(e) => Err(Status::not_found(e.to_string())), + } + } + + async fn report_node(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + match self.data.find_contract_by_uuid(&req.contract) { + Some(contract) + if contract.admin_pubkey == req.admin_pubkey + && contract.node_pubkey == req.node_pubkey => + { + () + } + _ => return Err(Status::unauthenticated("No contract found by this ID.")), + }; + self.data + .report_node(req.admin_pubkey, &req.node_pubkey, req.reason); + Ok(Response::new(Empty {})) + } + type ListVmContractsStream = Pin> + Send>>; async fn list_vm_contracts( &self, @@ -238,7 +262,7 @@ impl BrainCli for BrainCliMock { ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("CLI requested ListVmNodesStream: {req:?}"); - let nodes = self.data.find_nodes_by_filters(&req); + let nodes = self.data.find_vm_nodes_by_filters(&req); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for node in nodes { @@ -265,12 +289,48 @@ impl BrainCli for BrainCliMock { } } - async fn delete_vm(&self, req: Request) -> Result, Status> { + async fn register_operator( + &self, + req: tonic::Request, + ) -> std::result::Result, tonic::Status> { let req = check_sig_from_req(req)?; - info!("Unknown CLI requested to delete vm {}", req.uuid); - match self.data.delete_vm(req).await { + info!("Regitering new operator: {req:?}"); + match self.data.register_operator(req) { Ok(()) => Ok(Response::new(Empty {})), - Err(e) => Err(Status::not_found(e.to_string())), + Err(e) => Err(Status::failed_precondition(e.to_string())), + } + } + + type ListOperatorsStream = + Pin> + Send>>; + async fn list_operators( + &self, + req: Request, + ) -> Result, Status> { + let _ = check_sig_from_req(req)?; + let operators = self.data.list_operators(); + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for op in operators { + let _ = tx.send(Ok(op.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::ListOperatorsStream + )) + } + + async fn inspect_operator( + &self, + req: Request, + ) -> Result, Status> { + let req = check_sig_from_req(req)?; + match self.data.inspect_operator(&req.pubkey) { + Some(op) => Ok(Response::new(op.into())), + None => Err(Status::not_found( + "The wallet you specified is not an operator", + )), } } @@ -278,7 +338,14 @@ impl BrainCli for BrainCliMock { check_admin_key(&req)?; let req = check_sig_from_req(req)?; self.data.give_airdrop(&req.pubkey, req.tokens); - Ok(Response::new(Empty{})) + Ok(Response::new(Empty {})) + } + + async fn slash(&self, req: Request) -> Result, Status> { + check_admin_key(&req)?; + let req = check_sig_from_req(req)?; + self.data.slash_account(&req.pubkey, req.tokens); + Ok(Response::new(Empty {})) } type ListAllVmContractsStream = Pin> + Send>>; @@ -320,7 +387,6 @@ impl BrainCli for BrainCliMock { Box::pin(output_stream) as Self::ListAccountsStream )) } - } trait PubkeyGetter { @@ -349,12 +415,15 @@ impl_pubkey_getter!(NewVmReq, admin_pubkey); impl_pubkey_getter!(DeleteVmReq, admin_pubkey); impl_pubkey_getter!(UpdateVmReq, admin_pubkey); impl_pubkey_getter!(ExtendVmReq, admin_pubkey); +impl_pubkey_getter!(ReportNodeReq, admin_pubkey); impl_pubkey_getter!(ListVmContractsReq, admin_pubkey); impl_pubkey_getter!(RegisterVmNodeReq, node_pubkey); +impl_pubkey_getter!(RegOperatorReq, pubkey); impl_pubkey_getter!(VmNodeFilters); impl_pubkey_getter!(Empty); impl_pubkey_getter!(AirdropReq); +impl_pubkey_getter!(SlashReq); fn check_sig_from_req(req: Request) -> Result { let time = match req.metadata().get("timestamp") { diff --git a/vm.proto b/vm.proto index e755772..7a999fb 100644 --- a/vm.proto +++ b/vm.proto @@ -55,7 +55,7 @@ message MeasurementIP { // This should also include a block hash or similar, for auth message RegisterVmNodeReq { string node_pubkey = 1; - string owner_pubkey = 2; + string operator_wallet = 2; string main_ip = 3; string country = 4; string region = 5; @@ -174,15 +174,14 @@ message VmNodeFilters { } message VmNodeListResp { - string node_pubkey = 1; - string country = 2; - string region = 3; - string city = 4; - string ip = 5; // required for latency test - uint32 server_rating = 6; - uint32 provider_rating = 7; - // nanoLP per unit per minute - uint64 price = 8; + string operator = 1; + string node_pubkey = 2; + string country = 3; + string region = 4; + string city = 5; + string ip = 6; // required for latency test + repeated string reports = 7; // TODO: this will become an enum + uint64 price = 8; // nanoLP per unit per minute } message ExtendVmReq { @@ -196,12 +195,49 @@ message AirdropReq { uint64 tokens = 2; } +message SlashReq { + string pubkey = 1; + uint64 tokens = 2; +} + message Account { string pubkey = 1; uint64 balance = 2; uint64 tmp_locked = 3; } +message RegOperatorReq { + string pubkey = 1; + uint64 escrow = 2; + string email = 3; +} + +message ListOperatorsResp { + string pubkey = 1; + uint64 escrow = 2; + string email = 3; + uint64 app_nodes = 4; + uint64 vm_nodes = 5; + uint64 reports = 6; +} + +message InspectOperatorResp { + ListOperatorsResp operator = 1; + repeated VmNodeListResp nodes = 2; +} + +message ReportNodeReq { + string admin_pubkey = 1; + string node_pubkey = 2; + string contract = 3; + string reason = 4; +} + +message KickReq { + string uuid = 1; + string reason = 2; +} + service BrainCli { rpc GetBalance (Pubkey) returns (AccountBalance); rpc NewVm (NewVmReq) returns (NewVmResp); @@ -211,8 +247,15 @@ service BrainCli { rpc DeleteVm (DeleteVmReq) returns (Empty); rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp); rpc ExtendVm (ExtendVmReq) returns (Empty); + rpc ReportNode (ReportNodeReq) returns (Empty); + rpc ListOperators (Empty) returns (stream ListOperatorsResp); + rpc InspectOperator (Pubkey) returns (InspectOperatorResp); + rpc RegisterOperator (RegOperatorReq) returns (Empty); + rpc KickContract (KickReq) returns (Empty); + rpc BanUser (Pubkey) returns (Empty); // admin commands rpc Airdrop (AirdropReq) returns (Empty); + rpc Slash (SlashReq) returns (Empty); rpc ListAllVmContracts (Empty) returns (stream VmContract); rpc ListAccounts (Empty) returns (stream Account); }