From 9fa62a1978d2e2e6390a91a18a39262a4a5c22c0 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Mon, 27 Jan 2025 17:45:28 +0200 Subject: [PATCH 01/11] inform daemon about VMs deleted by cron --- src/data.rs | 18 ++++++++++++++++++ src/grpc.rs | 8 +++----- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/data.rs b/src/data.rs index b2d0ce3..ff0620b 100644 --- a/src/data.rs +++ b/src/data.rs @@ -192,6 +192,7 @@ impl BrainData { } pub fn contracts_cron(&self) { + let mut deleted_contracts = Vec::new(); log::debug!("Running contracts cron..."); let mut contracts = self.contracts.write().unwrap(); contracts.retain_mut(|c| { @@ -213,8 +214,25 @@ impl BrainData { ); c.locked_nano -= nanotokens_to_collect; self.add_nano_to_wallet(&owner_key, nanotokens_to_collect); + if c.locked_nano == 0 { + deleted_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); + } c.locked_nano > 0 }); + // inform daemons of the deletion of the contracts + for (uuid, node_pubkey) in deleted_contracts.iter() { + if let Some(daemon_tx) = self.daemon_tx.get(&node_pubkey.clone()) { + let msg = grpc::BrainMessage { + msg: Some(grpc::brain_message::Msg::DeleteVm(grpc::DeleteVmReq { + uuid: uuid.to_string(), + })), + }; + let daemon_tx = daemon_tx.clone(); + tokio::runtime::Runtime::new().unwrap().spawn(async move { + let _ = daemon_tx.send(msg).await; + }); + } + } } pub fn insert_node(&self, node: Node) { diff --git a/src/grpc.rs b/src/grpc.rs index fcad76d..e2e9055 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -162,11 +162,9 @@ impl BrainCli for BrainCliMock { info!("Sending UpdateVMResp: {response:?}"); Ok(Response::new(response)) } - Err(e) => { - Err(Status::unknown( - "Update VM request failed due to error: {e}", - )) - } + Err(e) => Err(Status::unknown(format!( + "Update VM request failed due to error: {e}" + ))), } } From 928c68f550edc54768f1348def8aecded1fda6d1 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Mon, 27 Jan 2025 17:49:23 +0200 Subject: [PATCH 02/11] reduce the price of memory --- src/data.rs | 63 +++++++++++++++++++++++++++-------------------------- src/main.rs | 2 +- 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/data.rs b/src/data.rs index ff0620b..fb81bcb 100644 --- a/src/data.rs +++ b/src/data.rs @@ -98,7 +98,7 @@ impl Contract { // I tried, but this can be done better. // Storage cost should also be based on tier (self.vcpus as u64 * 10) - + ((self.memory_mb + 256) as u64 * 4 / 100) + + ((self.memory_mb + 256) as u64 / 200) + (self.disk_size_gb as u64 / 10) + (!self.public_ipv4.is_empty() as u64 * 10) } @@ -191,34 +191,37 @@ impl BrainData { }); } - pub fn contracts_cron(&self) { - let mut deleted_contracts = Vec::new(); + pub async fn contracts_cron(&self) { + let mut deleted_contracts: Vec<(String, String)> = Vec::new(); log::debug!("Running contracts cron..."); - let mut contracts = self.contracts.write().unwrap(); - contracts.retain_mut(|c| { - let owner_key = self - .find_nodes_by_pubkey(&c.node_pubkey) - .unwrap() - .owner_key - .clone(); - let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; - c.collected_at = Utc::now(); - log::debug!("{minutes_to_collect}"); - let mut nanotokens_to_collect = c.price_per_minute().saturating_mul(minutes_to_collect); - if nanotokens_to_collect > c.locked_nano { - nanotokens_to_collect = c.locked_nano; - } - log::debug!( - "Removing {nanotokens_to_collect} nanotokens from {}", - c.uuid - ); - c.locked_nano -= nanotokens_to_collect; - self.add_nano_to_wallet(&owner_key, nanotokens_to_collect); - if c.locked_nano == 0 { - deleted_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); - } - c.locked_nano > 0 - }); + { + let mut contracts = self.contracts.write().unwrap(); + contracts.retain_mut(|c| { + let owner_key = self + .find_nodes_by_pubkey(&c.node_pubkey) + .unwrap() + .owner_key + .clone(); + let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; + c.collected_at = Utc::now(); + log::debug!("{minutes_to_collect}"); + let mut nanotokens_to_collect = + c.price_per_minute().saturating_mul(minutes_to_collect); + if nanotokens_to_collect > c.locked_nano { + nanotokens_to_collect = c.locked_nano; + } + log::debug!( + "Removing {nanotokens_to_collect} nanotokens from {}", + c.uuid + ); + c.locked_nano -= nanotokens_to_collect; + self.add_nano_to_wallet(&owner_key, nanotokens_to_collect); + if c.locked_nano == 0 { + deleted_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); + } + c.locked_nano > 0 + }); + } // inform daemons of the deletion of the contracts for (uuid, node_pubkey) in deleted_contracts.iter() { if let Some(daemon_tx) = self.daemon_tx.get(&node_pubkey.clone()) { @@ -228,9 +231,7 @@ impl BrainData { })), }; let daemon_tx = daemon_tx.clone(); - tokio::runtime::Runtime::new().unwrap().spawn(async move { - let _ = daemon_tx.send(msg).await; - }); + let _ = daemon_tx.send(msg).await; } } } diff --git a/src/main.rs b/src/main.rs index 4bf30f7..ae04f30 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,7 @@ async fn main() { tokio::spawn(async move { loop { tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - data_clone.contracts_cron(); + data_clone.contracts_cron().await; } }); let addr = "0.0.0.0:31337".parse().unwrap(); From 7dfdf4844e0e022432d053eb2246fec81019b01a Mon Sep 17 00:00:00 2001 From: ghe0 Date: Tue, 28 Jan 2025 18:31:30 +0200 Subject: [PATCH 03/11] switch language from tokens to LP --- snp.proto | 6 ++--- src/data.rs | 70 ++++++++++++++++++++++++++--------------------------- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/snp.proto b/snp.proto index 1d6a2f7..849a656 100644 --- a/snp.proto +++ b/snp.proto @@ -28,7 +28,7 @@ message Contract { string dtrfs_sha = 12; string created_at = 13; string updated_at = 14; - // total nanotoken cost per minute (for all units) + // total nanoLP cost per minute (for all units) uint64 nano_per_minute = 15; uint64 locked_nano = 16; string collected_at = 17; @@ -59,7 +59,7 @@ message RegisterNodeReq { string country = 4; string region = 5; string city = 6; - // nanotokens per unit per minute + // nanoLP per unit per minute uint64 price = 7; } @@ -171,7 +171,7 @@ message NodeListResp { string ip = 5; // required for latency test uint32 server_rating = 6; uint32 provider_rating = 7; - // nanotokens per unit per minute + // nanoLP per unit per minute uint64 price = 8; } diff --git a/src/data.rs b/src/data.rs index fb81bcb..c8f9f79 100644 --- a/src/data.rs +++ b/src/data.rs @@ -10,7 +10,7 @@ use tokio::sync::oneshot::Sender as OneshotSender; #[derive(thiserror::Error, Debug)] pub enum Error { - #[error("We do not allow locking of more than 100000 tokens.")] + #[error("We do not allow locking of more than 100000 LP.")] TxTooBig, #[error("Account has insufficient funds for this operation")] InsufficientFunds, @@ -21,13 +21,13 @@ pub enum Error { } #[derive(Clone)] -pub struct AccountNanoTokens { +pub struct AccountNanoLP { pub balance: u64, pub tmp_locked: u64, } -impl From for grpc::AccountBalance { - fn from(value: AccountNanoTokens) -> Self { +impl From for grpc::AccountBalance { + fn from(value: AccountNanoLP) -> Self { grpc::AccountBalance { balance: value.balance, tmp_locked: value.tmp_locked, @@ -50,7 +50,7 @@ pub struct Node { pub avail_ipv6: u32, pub avail_ports: u32, pub max_ports_per_vm: u32, - // nanotokens per unit per minute + // nanoLP per unit per minute pub price: u64, } @@ -103,7 +103,7 @@ impl Contract { + (!self.public_ipv4.is_empty() as u64 * 10) } - // Returns price per minute in nanotokens + // Returns price per minute in nanoLP fn price_per_minute(&self) -> u64 { self.total_units() * self.price_per_unit } @@ -136,8 +136,8 @@ impl Into for Contract { #[derive(Default)] pub struct BrainData { - // amount of nanotokens in each account - accounts: DashMap, + // amount of nanoLP in each account + accounts: DashMap, nodes: RwLock>, contracts: RwLock>, tmp_newvm_reqs: DashMap)>, @@ -164,11 +164,11 @@ impl BrainData { } } - pub fn get_balance(&self, account: &str) -> AccountNanoTokens { + pub fn get_balance(&self, account: &str) -> AccountNanoLP { if let Some(account) = self.accounts.get(account) { return account.value().clone(); } else { - let balance = AccountNanoTokens { + let balance = AccountNanoLP { balance: 0, tmp_locked: 0, }; @@ -180,13 +180,13 @@ impl BrainData { self.add_nano_to_wallet(account, 1000_000000000); } - fn add_nano_to_wallet(&self, account: &str, nanotokens: u64) { - log::debug!("Adding {nanotokens} nanotokens to {account}"); + fn add_nano_to_wallet(&self, account: &str, nano_lp: u64) { + log::debug!("Adding {nano_lp} nanoLP to {account}"); self.accounts .entry(account.to_string()) - .and_modify(|tokens| tokens.balance += nanotokens) - .or_insert(AccountNanoTokens { - balance: nanotokens, + .and_modify(|d| d.balance += nano_lp) + .or_insert(AccountNanoLP { + balance: nano_lp, tmp_locked: 0, }); } @@ -205,17 +205,17 @@ impl BrainData { let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; c.collected_at = Utc::now(); log::debug!("{minutes_to_collect}"); - let mut nanotokens_to_collect = + let mut nanolp_to_collect = c.price_per_minute().saturating_mul(minutes_to_collect); - if nanotokens_to_collect > c.locked_nano { - nanotokens_to_collect = c.locked_nano; + if nanolp_to_collect > c.locked_nano { + nanolp_to_collect = c.locked_nano; } log::debug!( - "Removing {nanotokens_to_collect} nanotokens from {}", + "Removing {nanolp_to_collect} nanoLP from {}", c.uuid ); - c.locked_nano -= nanotokens_to_collect; - self.add_nano_to_wallet(&owner_key, nanotokens_to_collect); + c.locked_nano -= nanolp_to_collect; + self.add_nano_to_wallet(&owner_key, nanolp_to_collect); if c.locked_nano == 0 { deleted_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); } @@ -250,29 +250,29 @@ impl BrainData { nodes.push(node); } - pub fn lock_nanotockens(&self, account: &str, nanotokens: u64) -> Result<(), Error> { - if nanotokens > 100_000_000_000_000 { + pub fn lock_nanotockens(&self, account: &str, nano_lp: u64) -> Result<(), Error> { + if nano_lp > 100_000_000_000_000 { return Err(Error::TxTooBig); } if let Some(mut account) = self.accounts.get_mut(account) { - if nanotokens > account.balance { + if nano_lp > account.balance { return Err(Error::InsufficientFunds); } - account.balance = account.balance.saturating_sub(nanotokens); - account.tmp_locked = account.tmp_locked.saturating_add(nanotokens); + account.balance = account.balance.saturating_sub(nano_lp); + account.tmp_locked = account.tmp_locked.saturating_add(nano_lp); Ok(()) } else { Err(Error::InsufficientFunds) } } - pub fn unlock_nanotockens(&self, account: &str, nanotokens: u64) -> Result<(), Error> { + pub fn unlock_nanotockens(&self, account: &str, nano_lp: u64) -> Result<(), Error> { if let Some(mut account) = self.accounts.get_mut(account) { - if nanotokens > account.tmp_locked { + if nano_lp > account.tmp_locked { return Err(Error::ImpossibleError); } - account.balance = account.balance.saturating_add(nanotokens); - account.tmp_locked = account.tmp_locked.saturating_sub(nanotokens); + account.balance = account.balance.saturating_add(nano_lp); + account.tmp_locked = account.tmp_locked.saturating_sub(nano_lp); Ok(()) } else { Err(Error::ImpossibleError) @@ -283,9 +283,9 @@ impl BrainData { &self, uuid: &str, account: &str, - nanotokens: u64, + nano_lp: u64, ) -> Result<(), Error> { - if nanotokens > 100_000_000_000_000 { + if nano_lp > 100_000_000_000_000 { return Err(Error::TxTooBig); } let mut account = match self.accounts.get_mut(account) { @@ -300,11 +300,11 @@ impl BrainData { .find(|c| c.uuid == uuid) { Some(contract) => { - if account.balance + contract.locked_nano < nanotokens { + if account.balance + contract.locked_nano < nano_lp { return Err(Error::InsufficientFunds); } - account.balance = account.balance + contract.locked_nano - nanotokens; - contract.locked_nano = nanotokens; + account.balance = account.balance + contract.locked_nano - nano_lp; + contract.locked_nano = nano_lp; Ok(()) } None => Err(Error::ContractNotFound(uuid.to_string())), From 5359ba039ba5e789e93300d9d3f9f8bdc704ed8f Mon Sep 17 00:00:00 2001 From: ghe0 Date: Mon, 3 Feb 2025 16:25:29 +0200 Subject: [PATCH 04/11] added auth --- Cargo.lock | 220 ++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + snp.proto | 28 ++++--- src/data.rs | 83 +++++++++++-------- src/grpc.rs | 228 ++++++++++++++++++++++++++++++++++++++++++++++------ 5 files changed, 496 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af7229c..ad4f555 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -209,18 +209,35 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" + [[package]] name = "bitflags" version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "brain-mock" version = "0.1.0" dependencies = [ + "bs58", "chrono", "dashmap", + "ed25519-dalek", "env_logger", "log", "prost", @@ -236,6 +253,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "bs58" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf88ba1141d185c399bee5288d850d63b8369520c1eafc32a0430b5b6c287bf4" +dependencies = [ + "tinyvec", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -289,6 +315,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "core-foundation" version = "0.9.4" @@ -305,12 +337,58 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "rustc_version", + "subtle", + "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dashmap" version = "6.1.0" @@ -325,6 +403,26 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "der" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f55bf8e7b65898637379c1b74eb1551107c8294ed26d855ceb9fd1a09cfc9bc0" +dependencies = [ + "const-oid", + "zeroize", +] + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -336,6 +434,30 @@ dependencies = [ "syn", ] +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "pkcs8", + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" +dependencies = [ + "curve25519-dalek", + "ed25519", + "serde", + "sha2", + "subtle", + "zeroize", +] + [[package]] name = "either" version = "1.13.0" @@ -396,6 +518,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "fixedbitset" version = "0.4.2" @@ -471,6 +599,16 @@ dependencies = [ "pin-utils", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -1112,6 +1250,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.31" @@ -1340,6 +1488,15 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.42" @@ -1442,6 +1599,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f79dfe2d285b0488816f30e700a7438c5a73d816b5b7d3ac72fbc48b0d185e03" + [[package]] name = "serde" version = "1.0.216" @@ -1486,12 +1649,32 @@ dependencies = [ "serde", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "rand_core", +] + [[package]] name = "slab" version = "0.4.9" @@ -1523,6 +1706,16 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -1630,6 +1823,21 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinyvec" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "022db8904dfa342efe721985167e9fcd16c29b226db4397ed752a761cfce81e8" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.42.0" @@ -1829,6 +2037,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typenum" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" + [[package]] name = "unicode-ident" version = "1.0.14" @@ -1885,6 +2099,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "want" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 5ef1cd6..00e4298 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] +bs58 = "0.5.1" chrono = "0.4.39" dashmap = "6.1.0" +ed25519-dalek = "2.1.1" env_logger = "0.11.6" log = "0.4.22" prost = "0.13.4" diff --git a/snp.proto b/snp.proto index 849a656..fa9654e 100644 --- a/snp.proto +++ b/snp.proto @@ -52,6 +52,7 @@ message MeasurementIP { string gateway = 4; } +// This should also include a block hash or similar, for auth message RegisterNodeReq { string node_pubkey = 1; string owner_pubkey = 2; @@ -101,13 +102,14 @@ message NewVmResp { message UpdateVmReq { string uuid = 1; - uint32 disk_size_gb = 2; - uint32 vcpus = 3; - uint32 memory_mb = 4; - string kernel_url = 5; - string kernel_sha = 6; - string dtrfs_url = 7; - string dtrfs_sha = 8; + string admin_pubkey = 2; + uint32 disk_size_gb = 3; + uint32 vcpus = 4; + uint32 memory_mb = 5; + string kernel_url = 6; + string kernel_sha = 7; + string dtrfs_url = 8; + string dtrfs_sha = 9; } message UpdateVmResp { @@ -118,6 +120,7 @@ message UpdateVmResp { message DeleteVmReq { string uuid = 1; + string admin_pubkey = 2; } message BrainMessage { @@ -128,9 +131,16 @@ message BrainMessage { } } +message DaemonStreamAuth { + string timestamp = 1; + string pubkey = 2; + repeated string contracts = 3; + string signature = 4; +} + message DaemonMessage { oneof Msg { - Pubkey pubkey = 1; + DaemonStreamAuth auth = 1; NewVmResp new_vm_resp = 2; UpdateVmResp update_vm_resp = 3; NodeResources node_resources = 4; @@ -139,7 +149,7 @@ message DaemonMessage { service BrainDaemon { rpc RegisterNode (RegisterNodeReq) returns (stream Contract); - rpc BrainMessages (Pubkey) returns (stream BrainMessage); + rpc BrainMessages (DaemonStreamAuth) returns (stream BrainMessage); rpc DaemonMessages (stream DaemonMessage) returns (Empty); } diff --git a/src/data.rs b/src/data.rs index c8f9f79..d286c52 100644 --- a/src/data.rs +++ b/src/data.rs @@ -204,16 +204,11 @@ impl BrainData { .clone(); let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; c.collected_at = Utc::now(); - log::debug!("{minutes_to_collect}"); - let mut nanolp_to_collect = - c.price_per_minute().saturating_mul(minutes_to_collect); + let mut nanolp_to_collect = c.price_per_minute().saturating_mul(minutes_to_collect); if nanolp_to_collect > c.locked_nano { nanolp_to_collect = c.locked_nano; } - log::debug!( - "Removing {nanolp_to_collect} nanoLP from {}", - c.uuid - ); + log::debug!("Removing {nanolp_to_collect} nanoLP from {}", c.uuid); c.locked_nano -= nanolp_to_collect; self.add_nano_to_wallet(&owner_key, nanolp_to_collect); if c.locked_nano == 0 { @@ -228,6 +223,7 @@ impl BrainData { let msg = grpc::BrainMessage { msg: Some(grpc::brain_message::Msg::DeleteVm(grpc::DeleteVmReq { uuid: uuid.to_string(), + admin_pubkey: String::new(), })), }; let daemon_tx = daemon_tx.clone(); @@ -282,13 +278,13 @@ impl BrainData { pub fn extend_contract_time( &self, uuid: &str, - account: &str, + wallet: &str, nano_lp: u64, ) -> Result<(), Error> { if nano_lp > 100_000_000_000_000 { return Err(Error::TxTooBig); } - let mut account = match self.accounts.get_mut(account) { + let mut account = match self.accounts.get_mut(wallet) { Some(account) => account, None => return Err(Error::InsufficientFunds), }; @@ -300,6 +296,9 @@ impl BrainData { .find(|c| c.uuid == uuid) { Some(contract) => { + if contract.admin_pubkey != wallet { + return Err(Error::ContractNotFound(uuid.to_string())); + } if account.balance + contract.locked_nano < nano_lp { return Err(Error::InsufficientFunds); } @@ -344,31 +343,41 @@ impl BrainData { self.daemon_tx.remove(node_pubkey); } - pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) { - if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) { - info!("Found vm {}. Deleting...", delete_vm.uuid); - if let Some(daemon_tx) = self.daemon_tx.get(&contract.node_pubkey) { - debug!( - "TX for daemon {} found. Informing daemon about deletion of {}.", - contract.node_pubkey, delete_vm.uuid - ); - let msg = grpc::BrainMessage { - msg: Some(grpc::brain_message::Msg::DeleteVm(delete_vm.clone())), - }; - if let Err(e) = daemon_tx.send(msg).await { - warn!( - "Failed to send deletion request to {} due to error: {e:?}", - contract.node_pubkey - ); - info!("Deleting daemon TX for {}", contract.node_pubkey); - self.del_daemon_tx(&contract.node_pubkey); + pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) -> Result<(), Error> { + let contract = match self.find_contract_by_uuid(&delete_vm.uuid) { + Some(contract) => { + if contract.admin_pubkey != delete_vm.admin_pubkey { + return Err(Error::ContractNotFound(delete_vm.uuid)); } + contract + } + None => { + return Err(Error::ContractNotFound(delete_vm.uuid)); + } + }; + info!("Found vm {}. Deleting...", delete_vm.uuid); + if let Some(daemon_tx) = self.daemon_tx.get(&contract.node_pubkey) { + debug!( + "TX for daemon {} found. Informing daemon about deletion of {}.", + contract.node_pubkey, delete_vm.uuid + ); + let msg = grpc::BrainMessage { + msg: Some(grpc::brain_message::Msg::DeleteVm(delete_vm.clone())), + }; + if let Err(e) = daemon_tx.send(msg).await { + warn!( + "Failed to send deletion request to {} due to error: {e:?}", + contract.node_pubkey + ); + info!("Deleting daemon TX for {}", contract.node_pubkey); + self.del_daemon_tx(&contract.node_pubkey); } - - self.add_nano_to_wallet(&contract.admin_pubkey, contract.locked_nano); - let mut contracts = self.contracts.write().unwrap(); - contracts.retain(|c| c.uuid != delete_vm.uuid); } + + self.add_nano_to_wallet(&contract.admin_pubkey, contract.locked_nano); + let mut contracts = self.contracts.write().unwrap(); + contracts.retain(|c| c.uuid != delete_vm.uuid); + Ok(()) } pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) { @@ -546,7 +555,17 @@ impl BrainData { let uuid = req.uuid.clone(); info!("Inserting new vm update request in memory: {req:?}"); let node_pubkey = match self.find_contract_by_uuid(&req.uuid) { - Some(contract) => contract.node_pubkey, + Some(contract) => { + if contract.admin_pubkey != req.admin_pubkey { + let _ = tx.send(grpc::UpdateVmResp { + uuid, + error: "Contract does not exist.".to_string(), + args: None, + }); + return; + } + contract.node_pubkey + } None => { log::warn!( "Received UpdateVMReq for a contract that does not exist: {}", diff --git a/src/grpc.rs b/src/grpc.rs index e2e9055..6d3d102 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -42,7 +42,7 @@ impl BrainDaemon for BrainDaemonMock { &self, req: Request, ) -> Result, Status> { - let req = req.into_inner(); + let req = check_sig_from_req(req)?; info!("Starting registration process for {:?}", req); let node = crate::data::Node { public_key: req.node_pubkey.clone(), @@ -73,12 +73,19 @@ impl BrainDaemon for BrainDaemonMock { type BrainMessagesStream = Pin> + Send>>; async fn brain_messages( &self, - req: Request, + req: Request, ) -> Result, Status> { - let req = req.into_inner(); - info!("Daemon {} connected to receive brain messages", req.pubkey); + let auth = req.into_inner(); + let pubkey = auth.pubkey.clone(); + check_sig_from_parts( + &pubkey, + &auth.timestamp, + &format!("{:?}", auth.contracts), + &auth.signature, + )?; + info!("Daemon {} connected to receive brain messages", pubkey); let (tx, rx) = mpsc::channel(6); - self.data.add_daemon_tx(&req.pubkey, tx); + self.data.add_daemon_tx(&pubkey, tx); let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg)); Ok(Response::new( Box::pin(output_stream) as Self::BrainMessagesStream @@ -90,14 +97,30 @@ impl BrainDaemon for BrainDaemonMock { req: Request>, ) -> Result, Status> { let mut req_stream = req.into_inner(); - let mut pubkey = String::new(); + let pubkey: String; + if let Some(Ok(msg)) = req_stream.next().await { + log::debug!("demon_messages received the following auth message: {:?}", msg.msg); + if let Some(daemon_message::Msg::Auth(auth)) = msg.msg { + pubkey = auth.pubkey.clone(); + check_sig_from_parts( + &pubkey, + &auth.timestamp, + &format!("{:?}", auth.contracts), + &auth.signature, + )?; + } else { + return Err(Status::unauthenticated( + "Could not authenticate the daemon: could not extract auth signature", + )); + } + } else { + return Err(Status::unauthenticated("Could not authenticate the daemon")); + } + + // info!("Received a message from daemon {pubkey}: {daemon_message:?}"); while let Some(daemon_message) = req_stream.next().await { - info!("Received a message from daemon {pubkey}: {daemon_message:?}"); match daemon_message { Ok(msg) => match msg.msg { - Some(daemon_message::Msg::Pubkey(p)) => { - pubkey = p.pubkey; - } Some(daemon_message::Msg::NewVmResp(new_vm_resp)) => { self.data.submit_newvm_resp(new_vm_resp).await; } @@ -107,7 +130,7 @@ impl BrainDaemon for BrainDaemonMock { Some(daemon_message::Msg::NodeResources(node_resources)) => { self.data.submit_node_resources(node_resources); } - None => {} + _ => {} }, Err(e) => { log::warn!("Daemon disconnected: {e:?}"); @@ -122,18 +145,18 @@ impl BrainDaemon for BrainDaemonMock { #[tonic::async_trait] impl BrainCli for BrainCliMock { async fn get_balance(&self, req: Request) -> Result, Status> { - Ok(Response::new( - self.data.get_balance(&req.into_inner().pubkey).into(), - )) + let req = check_sig_from_req(req)?; + Ok(Response::new(self.data.get_balance(&req.pubkey).into())) } async fn get_airdrop(&self, req: Request) -> Result, Status> { - self.data.get_airdrop(&req.into_inner().pubkey); + let req = check_sig_from_req(req)?; + self.data.get_airdrop(&req.pubkey); Ok(Response::new(Empty {})) } async fn new_vm(&self, req: Request) -> Result, Status> { - let req = req.into_inner(); + let req = check_sig_from_req(req)?; info!("New VM requested via CLI: {req:?}"); let admin_pubkey = req.admin_pubkey.clone(); let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); @@ -153,7 +176,7 @@ impl BrainCli for BrainCliMock { } async fn update_vm(&self, req: Request) -> Result, Status> { - let req = req.into_inner(); + let req = check_sig_from_req(req)?; info!("Update VM requested via CLI: {req:?}"); let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); self.data.submit_updatevm_req(req, oneshot_tx).await; @@ -169,7 +192,7 @@ impl BrainCli for BrainCliMock { } async fn extend_vm(&self, req: Request) -> Result, Status> { - let req = req.into_inner(); + let req = check_sig_from_req(req)?; match self .data .extend_contract_time(&req.uuid, &req.admin_pubkey, req.locked_nano) @@ -184,7 +207,7 @@ impl BrainCli for BrainCliMock { &self, req: Request, ) -> Result, Status> { - let req = req.into_inner(); + let req = check_sig_from_req(req)?; info!("CLI {} requested ListVMContractsStream", req.admin_pubkey); let contracts = match req.uuid.is_empty() { false => match self.data.find_contract_by_uuid(&req.uuid) { @@ -210,7 +233,7 @@ impl BrainCli for BrainCliMock { &self, req: Request, ) -> Result, tonic::Status> { - let req = req.into_inner(); + let req = check_sig_from_req(req)?; info!("Unknown CLI requested ListNodesStream: {req:?}"); let nodes = self.data.find_nodes_by_filters(&req); let (tx, rx) = mpsc::channel(6); @@ -229,7 +252,7 @@ impl BrainCli for BrainCliMock { &self, req: Request, ) -> Result, Status> { - let req = req.into_inner(); + let req = check_sig_from_req(req)?; info!("Unknown CLI requested ListNodesStream: {req:?}"); match self.data.get_one_node_by_filters(&req) { Some(node) => Ok(Response::new(node.into())), @@ -240,9 +263,166 @@ impl BrainCli for BrainCliMock { } async fn delete_vm(&self, req: Request) -> Result, Status> { - let req = req.into_inner(); + let req = check_sig_from_req(req)?; info!("Unknown CLI requested to delete vm {}", req.uuid); - self.data.delete_vm(req).await; - Ok(Response::new(Empty {})) + match self.data.delete_vm(req).await { + Ok(()) => Ok(Response::new(Empty {})), + Err(e) => Err(Status::not_found(e.to_string())), + } } } + +trait PubkeyGetter { + fn get_pubkey(&self) -> Option; +} + +impl PubkeyGetter for Pubkey { + fn get_pubkey(&self) -> Option { + Some(self.pubkey.clone()) + } +} + +impl PubkeyGetter for NewVmReq { + fn get_pubkey(&self) -> Option { + Some(self.admin_pubkey.clone()) + } +} + +impl PubkeyGetter for DeleteVmReq { + fn get_pubkey(&self) -> Option { + Some(self.admin_pubkey.clone()) + } +} + +impl PubkeyGetter for UpdateVmReq { + fn get_pubkey(&self) -> Option { + Some(self.admin_pubkey.clone()) + } +} + +impl PubkeyGetter for ExtendVmReq { + fn get_pubkey(&self) -> Option { + Some(self.admin_pubkey.clone()) + } +} + +impl PubkeyGetter for ListContractsReq { + fn get_pubkey(&self) -> Option { + Some(self.admin_pubkey.clone()) + } +} + +impl PubkeyGetter for NodeFilters { + fn get_pubkey(&self) -> Option { + None + } +} + +impl PubkeyGetter for RegisterNodeReq { + fn get_pubkey(&self) -> Option { + Some(self.node_pubkey.clone()) + } +} + +fn check_sig_from_req(req: Request) -> Result { + let time = match req.metadata().get("timestamp") { + Some(t) => t.clone(), + None => return Err(Status::unauthenticated("Timestamp not found in metadata.")), + }; + let time = time + .to_str() + .map_err(|_| Status::unauthenticated("Timestamp in metadata is not a string"))?; + + let now = chrono::Utc::now(); + let parsed_time = chrono::DateTime::parse_from_rfc3339(time) + .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; + let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); + if seconds_elapsed > 1 || seconds_elapsed < -1 { + return Err(Status::unauthenticated(format!( + "Date is not within 1 sec of the time of the server: CLI {} vs Server {}", + parsed_time, now + ))); + } + + let signature = match req.metadata().get("request-signature") { + Some(t) => t, + None => return Err(Status::unauthenticated("signature not found in metadata.")), + }; + let signature = bs58::decode(signature) + .into_vec() + .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; + let signature = ed25519_dalek::Signature::from_bytes( + signature + .as_slice() + .try_into() + .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, + ); + + let pubkey_value = match req.metadata().get("pubkey") { + Some(p) => p.clone(), + None => return Err(Status::unauthenticated("Signature not found in metadata.")), + }; + let pubkey = ed25519_dalek::VerifyingKey::from_bytes( + &bs58::decode(&pubkey_value) + .into_vec() + .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? + .try_into() + .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, + ) + .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; + + let req = req.into_inner(); + let message = format!("{time}{req:?}"); + use ed25519_dalek::Verifier; + pubkey + .verify(message.as_bytes(), &signature) + .map_err(|_| Status::unauthenticated("the signature is not valid"))?; + if let Some(req_pubkey) = req.get_pubkey() { + if pubkey_value.to_str().unwrap().to_string() != req_pubkey { + return Err(Status::unauthenticated( + "pubkey of signature does not match pubkey of request", + )); + } + } + Ok(req) +} + +fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> { + let now = chrono::Utc::now(); + let parsed_time = chrono::DateTime::parse_from_rfc3339(time) + .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; + let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); + if seconds_elapsed > 1 || seconds_elapsed < -1 { + return Err(Status::unauthenticated(format!( + "Date is not within 1 sec of the time of the server: CLI {} vs Server {}", + parsed_time, now + ))); + } + + let signature = bs58::decode(sig) + .into_vec() + .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; + let signature = ed25519_dalek::Signature::from_bytes( + signature + .as_slice() + .try_into() + .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, + ); + + let pubkey = ed25519_dalek::VerifyingKey::from_bytes( + &bs58::decode(&pubkey) + .into_vec() + .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? + .try_into() + .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, + ) + .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; + + let msg = time.to_string() + msg; + use ed25519_dalek::Verifier; + pubkey + .verify(msg.as_bytes(), &signature) + .map_err(|_| Status::unauthenticated("the signature is not valid"))?; + + Ok(()) +} From 64f892c174b12554ef7d735a746f68bd7c22d868 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Wed, 5 Feb 2025 02:44:42 +0200 Subject: [PATCH 05/11] added admin functionality --- snp.proto | 16 ++++++++- src/data.rs | 20 +++++++++-- src/grpc.rs | 96 ++++++++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 121 insertions(+), 11 deletions(-) diff --git a/snp.proto b/snp.proto index fa9654e..c54e4f6 100644 --- a/snp.proto +++ b/snp.proto @@ -191,8 +191,18 @@ message ExtendVmReq { uint64 locked_nano = 3; } +message AirdropReq { + string pubkey = 1; + uint64 tokens = 2; +} + +message Account { + string pubkey = 1; + uint64 balance = 2; + uint64 tmp_locked = 3; +} + service BrainCli { - rpc GetAirdrop (Pubkey) returns (Empty); rpc GetBalance (Pubkey) returns (AccountBalance); rpc NewVm (NewVmReq) returns (NewVmResp); rpc ListContracts (ListContractsReq) returns (stream Contract); @@ -201,4 +211,8 @@ service BrainCli { rpc DeleteVm (DeleteVmReq) returns (Empty); rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp); rpc ExtendVm (ExtendVmReq) returns (Empty); + // admin commands + rpc Airdrop (AirdropReq) returns (Empty); + rpc ListAllContracts (Empty) returns (stream Contract); + rpc ListAccounts (Empty) returns (stream Account); } diff --git a/src/data.rs b/src/data.rs index d286c52..dc60bc2 100644 --- a/src/data.rs +++ b/src/data.rs @@ -176,8 +176,8 @@ impl BrainData { } } - pub fn get_airdrop(&self, account: &str) { - self.add_nano_to_wallet(account, 1000_000000000); + pub fn give_airdrop(&self, account: &str, tokens: u64) { + self.add_nano_to_wallet(account, tokens.saturating_mul(1_000_000_000)); } fn add_nano_to_wallet(&self, account: &str, nano_lp: u64) { @@ -684,6 +684,22 @@ impl BrainData { contracts.iter().cloned().find(|c| c.uuid == uuid) } + pub fn list_all_contracts(&self) -> Vec { + let contracts = self.contracts.read().unwrap(); + contracts.iter().cloned().collect() + } + + pub fn list_accounts(&self) -> Vec { + self.accounts + .iter() + .map(|a| grpc::Account { + pubkey: a.key().to_string(), + balance: a.balance, + tmp_locked: a.tmp_locked, + }) + .collect() + } + pub fn find_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec { debug!("Searching contracts for admin pubkey {admin_pubkey}"); let contracts: Vec = self diff --git a/src/grpc.rs b/src/grpc.rs index 6d3d102..8de7995 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -15,6 +15,11 @@ use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; +const ADMIN_ACCOUNTS: &[&str] = &[ + "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK", + "FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL", +]; + pub struct BrainDaemonMock { data: Arc, } @@ -99,7 +104,10 @@ impl BrainDaemon for BrainDaemonMock { let mut req_stream = req.into_inner(); let pubkey: String; if let Some(Ok(msg)) = req_stream.next().await { - log::debug!("demon_messages received the following auth message: {:?}", msg.msg); + log::debug!( + "demon_messages received the following auth message: {:?}", + msg.msg + ); if let Some(daemon_message::Msg::Auth(auth)) = msg.msg { pubkey = auth.pubkey.clone(); check_sig_from_parts( @@ -149,12 +157,6 @@ impl BrainCli for BrainCliMock { Ok(Response::new(self.data.get_balance(&req.pubkey).into())) } - async fn get_airdrop(&self, req: Request) -> Result, Status> { - let req = check_sig_from_req(req)?; - self.data.get_airdrop(&req.pubkey); - Ok(Response::new(Empty {})) - } - async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("New VM requested via CLI: {req:?}"); @@ -270,6 +272,54 @@ impl BrainCli for BrainCliMock { Err(e) => Err(Status::not_found(e.to_string())), } } + + async fn airdrop(&self, req: Request) -> Result, Status> { + check_admin_key(&req)?; + let req = check_sig_from_req(req)?; + self.data.give_airdrop(&req.pubkey, req.tokens); + Ok(Response::new(Empty{})) + } + + type ListAllContractsStream = Pin> + Send>>; + async fn list_all_contracts( + &self, + req: Request, + ) -> Result, Status> { + check_admin_key(&req)?; + let _ = check_sig_from_req(req)?; + let contracts = self.data.list_all_contracts(); + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for contract in contracts { + let _ = tx.send(Ok(contract.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::ListContractsStream + )) + } + + type ListAccountsStream = Pin> + Send>>; + async fn list_accounts( + &self, + req: Request, + ) -> Result, Status> { + check_admin_key(&req)?; + let _ = check_sig_from_req(req)?; + let accounts = self.data.list_accounts(); + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for account in accounts { + let _ = tx.send(Ok(account.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::ListAccountsStream + )) + } + } trait PubkeyGetter { @@ -324,6 +374,18 @@ impl PubkeyGetter for RegisterNodeReq { } } +impl PubkeyGetter for Empty { + fn get_pubkey(&self) -> Option { + None + } +} + +impl PubkeyGetter for AirdropReq { + fn get_pubkey(&self) -> Option { + None + } +} + fn check_sig_from_req(req: Request) -> Result { let time = match req.metadata().get("timestamp") { Some(t) => t.clone(), @@ -360,7 +422,7 @@ fn check_sig_from_req(req: Request) -> Res let pubkey_value = match req.metadata().get("pubkey") { Some(p) => p.clone(), - None => return Err(Status::unauthenticated("Signature not found in metadata.")), + None => return Err(Status::unauthenticated("pubkey not found in metadata.")), }; let pubkey = ed25519_dalek::VerifyingKey::from_bytes( &bs58::decode(&pubkey_value) @@ -426,3 +488,21 @@ fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Resul Ok(()) } + +fn check_admin_key(req: &Request) -> Result<(), Status> { + let pubkey = match req.metadata().get("pubkey") { + Some(p) => p.clone(), + None => return Err(Status::unauthenticated("pubkey not found in metadata.")), + }; + let pubkey = pubkey + .to_str() + .map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?; + + if !ADMIN_ACCOUNTS.contains(&pubkey) { + return Err(Status::unauthenticated( + "This operation is reserved to admin accounts", + )); + } + + Ok(()) +} From c98db7f8c3452719ff7be13ae2b335e92521178b Mon Sep 17 00:00:00 2001 From: ghe0 Date: Sat, 8 Feb 2025 19:50:44 +0200 Subject: [PATCH 06/11] rename structs so that they say "VM" --- build.rs | 2 +- src/data.rs | 169 ++++++++++++++++++------------------------ src/grpc.rs | 79 ++++++++++---------- src/main.rs | 6 +- snp.proto => vm.proto | 36 ++++----- 5 files changed, 134 insertions(+), 158 deletions(-) rename snp.proto => vm.proto (84%) diff --git a/build.rs b/build.rs index d1b9140..469b7d1 100644 --- a/build.rs +++ b/build.rs @@ -1,6 +1,6 @@ fn main() { tonic_build::configure() .build_server(true) - .compile_protos(&["snp.proto"], &["proto"]) + .compile_protos(&["vm.proto"], &["proto"]) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); } diff --git a/src/data.rs b/src/data.rs index dc60bc2..d9c6459 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use crate::grpc::snp_proto::{self as grpc}; use chrono::Utc; use dashmap::DashMap; @@ -14,10 +13,8 @@ pub enum Error { TxTooBig, #[error("Account has insufficient funds for this operation")] InsufficientFunds, - #[error("I have no idea how this happened. Please report this bug.")] - ImpossibleError, #[error("Could not find contract {0}")] - ContractNotFound(String), + VmContractNotFound(String), } #[derive(Clone)] @@ -36,7 +33,7 @@ impl From for grpc::AccountBalance { } #[derive(Eq, Hash, PartialEq, Clone, Debug, Default)] -pub struct Node { +pub struct VmNode { pub public_key: String, pub owner_key: String, pub country: String, @@ -54,9 +51,9 @@ pub struct Node { pub price: u64, } -impl Into for Node { - fn into(self) -> grpc::NodeListResp { - grpc::NodeListResp { +impl Into for VmNode { + fn into(self) -> grpc::VmNodeListResp { + grpc::VmNodeListResp { node_pubkey: self.public_key, country: self.country, region: self.region, @@ -70,7 +67,7 @@ impl Into for Node { } #[derive(Clone, Debug)] -pub struct Contract { +pub struct VmContract { pub uuid: String, pub hostname: String, pub admin_pubkey: String, @@ -92,7 +89,7 @@ pub struct Contract { pub collected_at: chrono::DateTime, } -impl Contract { +impl VmContract { fn total_units(&self) -> u64 { // TODO: Optimize this based on price of hardware. // I tried, but this can be done better. @@ -109,10 +106,10 @@ impl Contract { } } -impl Into for Contract { - fn into(self) -> grpc::Contract { +impl Into for VmContract { + fn into(self) -> grpc::VmContract { let nano_per_minute = self.price_per_minute(); - grpc::Contract { + grpc::VmContract { uuid: self.uuid, hostname: self.hostname, admin_pubkey: self.admin_pubkey, @@ -138,26 +135,19 @@ impl Into for Contract { pub struct BrainData { // amount of nanoLP in each account accounts: DashMap, - nodes: RwLock>, - contracts: RwLock>, + vm_nodes: RwLock>, + vm_contracts: RwLock>, tmp_newvm_reqs: DashMap)>, tmp_updatevm_reqs: DashMap)>, - daemon_tx: DashMap>, -} - -#[derive(Debug)] -enum TxType { - CliContract, - DaemonDeleteVm, - DaemonNewVm, + daemon_tx: DashMap>, } impl BrainData { pub fn new() -> Self { Self { accounts: DashMap::new(), - nodes: RwLock::new(Vec::new()), - contracts: RwLock::new(Vec::new()), + vm_nodes: RwLock::new(Vec::new()), + vm_contracts: RwLock::new(Vec::new()), tmp_newvm_reqs: DashMap::new(), tmp_updatevm_reqs: DashMap::new(), daemon_tx: DashMap::new(), @@ -191,11 +181,11 @@ impl BrainData { }); } - pub async fn contracts_cron(&self) { + pub async fn vm_contracts_cron(&self) { let mut deleted_contracts: Vec<(String, String)> = Vec::new(); log::debug!("Running contracts cron..."); { - let mut contracts = self.contracts.write().unwrap(); + let mut contracts = self.vm_contracts.write().unwrap(); contracts.retain_mut(|c| { let owner_key = self .find_nodes_by_pubkey(&c.node_pubkey) @@ -220,8 +210,8 @@ impl BrainData { // inform daemons of the deletion of the contracts for (uuid, node_pubkey) in deleted_contracts.iter() { if let Some(daemon_tx) = self.daemon_tx.get(&node_pubkey.clone()) { - let msg = grpc::BrainMessage { - msg: Some(grpc::brain_message::Msg::DeleteVm(grpc::DeleteVmReq { + let msg = grpc::BrainVmMessage { + msg: Some(grpc::brain_vm_message::Msg::DeleteVm(grpc::DeleteVmReq { uuid: uuid.to_string(), admin_pubkey: String::new(), })), @@ -232,13 +222,13 @@ impl BrainData { } } - pub fn insert_node(&self, node: Node) { + pub fn insert_node(&self, node: VmNode) { info!("Registering node {node:?}"); - let mut nodes = self.nodes.write().unwrap(); + let mut nodes = self.vm_nodes.write().unwrap(); for n in nodes.iter_mut() { if n.public_key == node.public_key { // TODO: figure what to do in this case. - warn!("Node {} already exists. Updating data.", n.public_key); + warn!("VM Node {} already exists. Updating data.", n.public_key); *n = node; return; } @@ -262,20 +252,7 @@ impl BrainData { } } - pub fn unlock_nanotockens(&self, account: &str, nano_lp: u64) -> Result<(), Error> { - if let Some(mut account) = self.accounts.get_mut(account) { - if nano_lp > account.tmp_locked { - return Err(Error::ImpossibleError); - } - account.balance = account.balance.saturating_add(nano_lp); - account.tmp_locked = account.tmp_locked.saturating_sub(nano_lp); - Ok(()) - } else { - Err(Error::ImpossibleError) - } - } - - pub fn extend_contract_time( + pub fn extend_vm_contract_time( &self, uuid: &str, wallet: &str, @@ -289,7 +266,7 @@ impl BrainData { None => return Err(Error::InsufficientFunds), }; match self - .contracts + .vm_contracts .write() .unwrap() .iter_mut() @@ -297,7 +274,7 @@ impl BrainData { { Some(contract) => { if contract.admin_pubkey != wallet { - return Err(Error::ContractNotFound(uuid.to_string())); + return Err(Error::VmContractNotFound(uuid.to_string())); } if account.balance + contract.locked_nano < nano_lp { return Err(Error::InsufficientFunds); @@ -306,12 +283,12 @@ impl BrainData { contract.locked_nano = nano_lp; Ok(()) } - None => Err(Error::ContractNotFound(uuid.to_string())), + None => Err(Error::VmContractNotFound(uuid.to_string())), } } - pub fn submit_node_resources(&self, res: grpc::NodeResources) { - let mut nodes = self.nodes.write().unwrap(); + pub fn submit_node_resources(&self, res: grpc::VmNodeResources) { + let mut nodes = self.vm_nodes.write().unwrap(); for n in nodes.iter_mut() { if n.public_key == res.node_pubkey { debug!( @@ -329,13 +306,13 @@ impl BrainData { } } debug!( - "Node {} not found when trying to update resources.", + "VM Node {} not found when trying to update resources.", res.node_pubkey ); - debug!("Node list:\n{:?}", nodes); + debug!("VM Node list:\n{:?}", nodes); } - pub fn add_daemon_tx(&self, node_pubkey: &str, tx: Sender) { + pub fn add_daemon_tx(&self, node_pubkey: &str, tx: Sender) { self.daemon_tx.insert(node_pubkey.to_string(), tx); } @@ -347,12 +324,12 @@ impl BrainData { let contract = match self.find_contract_by_uuid(&delete_vm.uuid) { Some(contract) => { if contract.admin_pubkey != delete_vm.admin_pubkey { - return Err(Error::ContractNotFound(delete_vm.uuid)); + return Err(Error::VmContractNotFound(delete_vm.uuid)); } contract } None => { - return Err(Error::ContractNotFound(delete_vm.uuid)); + return Err(Error::VmContractNotFound(delete_vm.uuid)); } }; info!("Found vm {}. Deleting...", delete_vm.uuid); @@ -361,8 +338,8 @@ impl BrainData { "TX for daemon {} found. Informing daemon about deletion of {}.", contract.node_pubkey, delete_vm.uuid ); - let msg = grpc::BrainMessage { - msg: Some(grpc::brain_message::Msg::DeleteVm(delete_vm.clone())), + let msg = grpc::BrainVmMessage { + msg: Some(grpc::brain_vm_message::Msg::DeleteVm(delete_vm.clone())), }; if let Err(e) = daemon_tx.send(msg).await { warn!( @@ -375,11 +352,12 @@ impl BrainData { } self.add_nano_to_wallet(&contract.admin_pubkey, contract.locked_nano); - let mut contracts = self.contracts.write().unwrap(); + let mut contracts = self.vm_contracts.write().unwrap(); contracts.retain(|c| c.uuid != delete_vm.uuid); Ok(()) } + // also unlocks nanotokens in case VM creation failed pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) { let new_vm_req = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) { Some((_, r)) => r, @@ -426,7 +404,7 @@ impl BrainData { admin_wallet.tmp_locked -= new_vm_req.0.locked_nano; } - let contract = Contract { + let contract = VmContract { uuid: new_vm_resp.uuid, exposed_ports: args.exposed_ports.clone(), public_ipv4, @@ -446,7 +424,7 @@ impl BrainData { collected_at: Utc::now(), }; info!("Created new contract: {contract:?}"); - self.contracts.write().unwrap().push(contract); + self.vm_contracts.write().unwrap().push(contract); } pub async fn submit_updatevm_resp(&self, mut update_vm_resp: grpc::UpdateVmResp) { @@ -465,7 +443,7 @@ impl BrainData { if update_vm_resp.error != "" { return; } - let mut contracts = self.contracts.write().unwrap(); + let mut contracts = self.vm_contracts.write().unwrap(); match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) { Some(contract) => { if update_vm_req.0.vcpus != 0 { @@ -494,8 +472,8 @@ impl BrainData { contract.updated_at = Utc::now(); } None => { - log::error!("Contract not found for {}.", update_vm_req.0.uuid); - update_vm_resp.error = "Contract not found.".to_string(); + log::error!("VM Contract not found for {}.", update_vm_req.0.uuid); + update_vm_resp.error = "VM Contract not found.".to_string(); } } if let Err(e) = update_vm_req.1.send(update_vm_resp.clone()) { @@ -527,8 +505,8 @@ impl BrainData { "Found daemon TX for {}. Sending newVMReq {}", req.node_pubkey, req.uuid ); - let msg = grpc::BrainMessage { - msg: Some(grpc::brain_message::Msg::NewVmReq(req.clone())), + let msg = grpc::BrainVmMessage { + msg: Some(grpc::brain_vm_message::Msg::NewVmReq(req.clone())), }; if let Err(e) = daemon_tx.send(msg).await { warn!( @@ -544,6 +522,13 @@ impl BrainData { }) .await; } + } else { + self.submit_newvm_resp(grpc::NewVmResp { + error: "Daemon is offline.".to_string(), + uuid: req.uuid, + args: None, + }) + .await; } } @@ -559,7 +544,7 @@ impl BrainData { if contract.admin_pubkey != req.admin_pubkey { let _ = tx.send(grpc::UpdateVmResp { uuid, - error: "Contract does not exist.".to_string(), + error: "VM Contract does not exist.".to_string(), args: None, }); return; @@ -573,7 +558,7 @@ impl BrainData { ); let _ = tx.send(grpc::UpdateVmResp { uuid, - error: "Contract does not exist.".to_string(), + error: "VM Contract does not exist.".to_string(), args: None, }); return; @@ -586,8 +571,8 @@ impl BrainData { "Found daemon TX for {}. Sending updateVMReq {}", node_pubkey, req.uuid ); - let msg = grpc::BrainMessage { - msg: Some(grpc::brain_message::Msg::UpdateVmReq(req.clone())), + let msg = grpc::BrainVmMessage { + msg: Some(grpc::brain_vm_message::Msg::UpdateVmReq(req.clone())), }; match server_tx.send(msg).await { Ok(_) => { @@ -617,26 +602,16 @@ impl BrainData { } } - pub fn insert_contract(&self, contract: Contract) { - let mut contracts = self.contracts.write().unwrap(); - contracts.push(contract); - } - - pub fn find_nodes_by_pubkey(&self, public_key: &str) -> Option { - let nodes = self.nodes.read().unwrap(); + pub fn find_nodes_by_pubkey(&self, public_key: &str) -> Option { + let nodes = self.vm_nodes.read().unwrap(); nodes.iter().cloned().find(|n| n.public_key == public_key) } - pub fn find_ns_by_owner_key(&self, owner_key: &str) -> Option { - let nodes = self.nodes.read().unwrap(); - nodes.iter().cloned().find(|n| n.owner_key == owner_key) - } - pub fn find_nodes_by_filters( &self, - filters: &crate::grpc::snp_proto::NodeFilters, - ) -> Vec { - let nodes = self.nodes.read().unwrap(); + filters: &crate::grpc::snp_proto::VmNodeFilters, + ) -> Vec { + let nodes = self.vm_nodes.read().unwrap(); nodes .iter() .filter(|n| { @@ -658,9 +633,9 @@ impl BrainData { // TODO: sort by rating pub fn get_one_node_by_filters( &self, - filters: &crate::grpc::snp_proto::NodeFilters, - ) -> Option { - let nodes = self.nodes.read().unwrap(); + filters: &crate::grpc::snp_proto::VmNodeFilters, + ) -> Option { + let nodes = self.vm_nodes.read().unwrap(); nodes .iter() .find(|n| { @@ -679,13 +654,13 @@ impl BrainData { .cloned() } - pub fn find_contract_by_uuid(&self, uuid: &str) -> Option { - let contracts = self.contracts.read().unwrap(); + pub fn find_contract_by_uuid(&self, uuid: &str) -> Option { + let contracts = self.vm_contracts.read().unwrap(); contracts.iter().cloned().find(|c| c.uuid == uuid) } - pub fn list_all_contracts(&self) -> Vec { - let contracts = self.contracts.read().unwrap(); + pub fn list_all_contracts(&self) -> Vec { + let contracts = self.vm_contracts.read().unwrap(); contracts.iter().cloned().collect() } @@ -700,10 +675,10 @@ impl BrainData { .collect() } - pub fn find_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec { + pub fn find_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec { debug!("Searching contracts for admin pubkey {admin_pubkey}"); - let contracts: Vec = self - .contracts + let contracts: Vec = self + .vm_contracts .read() .unwrap() .iter() @@ -714,8 +689,8 @@ impl BrainData { contracts } - pub fn find_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec { - let contracts = self.contracts.read().unwrap(); + pub fn find_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec { + let contracts = self.vm_contracts.read().unwrap(); contracts .iter() .filter(|c| c.node_pubkey == node_pubkey) diff --git a/src/grpc.rs b/src/grpc.rs index 8de7995..6f1f1b1 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,13 +1,14 @@ #![allow(dead_code)] pub mod snp_proto { - tonic::include_proto!("snp_proto"); + tonic::include_proto!("vm_proto"); } +use crate::grpc::vm_daemon_message; use crate::data::BrainData; use log::info; use snp_proto::brain_cli_server::BrainCli; -use snp_proto::brain_daemon_server::BrainDaemon; +use snp_proto::brain_vm_daemon_server::BrainVmDaemon; use snp_proto::*; use std::pin::Pin; use std::sync::Arc; @@ -41,15 +42,15 @@ impl BrainCliMock { } #[tonic::async_trait] -impl BrainDaemon for BrainDaemonMock { - type RegisterNodeStream = Pin> + Send>>; - async fn register_node( +impl BrainVmDaemon for BrainDaemonMock { + type RegisterVmNodeStream = Pin> + Send>>; + async fn register_vm_node( &self, - req: Request, - ) -> Result, Status> { + req: Request, + ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("Starting registration process for {:?}", req); - let node = crate::data::Node { + let node = crate::data::VmNode { public_key: req.node_pubkey.clone(), owner_key: req.owner_pubkey, country: req.country, @@ -71,11 +72,11 @@ impl BrainDaemon for BrainDaemonMock { }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( - Box::pin(output_stream) as Self::RegisterNodeStream + Box::pin(output_stream) as Self::RegisterVmNodeStream )) } - type BrainMessagesStream = Pin> + Send>>; + type BrainMessagesStream = Pin> + Send>>; async fn brain_messages( &self, req: Request, @@ -99,7 +100,7 @@ impl BrainDaemon for BrainDaemonMock { async fn daemon_messages( &self, - req: Request>, + req: Request>, ) -> Result, Status> { let mut req_stream = req.into_inner(); let pubkey: String; @@ -108,7 +109,7 @@ impl BrainDaemon for BrainDaemonMock { "demon_messages received the following auth message: {:?}", msg.msg ); - if let Some(daemon_message::Msg::Auth(auth)) = msg.msg { + if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg { pubkey = auth.pubkey.clone(); check_sig_from_parts( &pubkey, @@ -129,13 +130,13 @@ impl BrainDaemon for BrainDaemonMock { while let Some(daemon_message) = req_stream.next().await { match daemon_message { Ok(msg) => match msg.msg { - Some(daemon_message::Msg::NewVmResp(new_vm_resp)) => { + Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { self.data.submit_newvm_resp(new_vm_resp).await; } - Some(daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { + Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { self.data.submit_updatevm_resp(update_vm_resp).await; } - Some(daemon_message::Msg::NodeResources(node_resources)) => { + Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { self.data.submit_node_resources(node_resources); } _ => {} @@ -197,20 +198,20 @@ impl BrainCli for BrainCliMock { let req = check_sig_from_req(req)?; match self .data - .extend_contract_time(&req.uuid, &req.admin_pubkey, req.locked_nano) + .extend_vm_contract_time(&req.uuid, &req.admin_pubkey, req.locked_nano) { Ok(()) => Ok(Response::new(Empty {})), Err(e) => Err(Status::unknown(format!("Could not extend contract: {e}"))), } } - type ListContractsStream = Pin> + Send>>; - async fn list_contracts( + type ListVmContractsStream = Pin> + Send>>; + async fn list_vm_contracts( &self, - req: Request, - ) -> Result, Status> { + req: Request, + ) -> Result, Status> { let req = check_sig_from_req(req)?; - info!("CLI {} requested ListVMContractsStream", req.admin_pubkey); + info!("CLI {} requested ListVMVmContractsStream", req.admin_pubkey); let contracts = match req.uuid.is_empty() { false => match self.data.find_contract_by_uuid(&req.uuid) { Some(contract) => vec![contract], @@ -226,17 +227,17 @@ impl BrainCli for BrainCliMock { }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( - Box::pin(output_stream) as Self::ListContractsStream + Box::pin(output_stream) as Self::ListVmContractsStream )) } - type ListNodesStream = Pin> + Send>>; - async fn list_nodes( + type ListVmNodesStream = Pin> + Send>>; + async fn list_vm_nodes( &self, - req: Request, - ) -> Result, tonic::Status> { + req: Request, + ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; - info!("Unknown CLI requested ListNodesStream: {req:?}"); + info!("CLI requested ListVmNodesStream: {req:?}"); let nodes = self.data.find_nodes_by_filters(&req); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { @@ -246,16 +247,16 @@ impl BrainCli for BrainCliMock { }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( - Box::pin(output_stream) as Self::ListNodesStream + Box::pin(output_stream) as Self::ListVmNodesStream )) } - async fn get_one_node( + async fn get_one_vm_node( &self, - req: Request, - ) -> Result, Status> { + req: Request, + ) -> Result, Status> { let req = check_sig_from_req(req)?; - info!("Unknown CLI requested ListNodesStream: {req:?}"); + info!("Unknown CLI requested ListVmNodesStream: {req:?}"); match self.data.get_one_node_by_filters(&req) { Some(node) => Ok(Response::new(node.into())), None => Err(Status::not_found( @@ -280,11 +281,11 @@ impl BrainCli for BrainCliMock { Ok(Response::new(Empty{})) } - type ListAllContractsStream = Pin> + Send>>; - async fn list_all_contracts( + type ListAllVmContractsStream = Pin> + Send>>; + async fn list_all_vm_contracts( &self, req: Request, - ) -> Result, Status> { + ) -> Result, Status> { check_admin_key(&req)?; let _ = check_sig_from_req(req)?; let contracts = self.data.list_all_contracts(); @@ -296,7 +297,7 @@ impl BrainCli for BrainCliMock { }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( - Box::pin(output_stream) as Self::ListContractsStream + Box::pin(output_stream) as Self::ListVmContractsStream )) } @@ -356,19 +357,19 @@ impl PubkeyGetter for ExtendVmReq { } } -impl PubkeyGetter for ListContractsReq { +impl PubkeyGetter for ListVmContractsReq { fn get_pubkey(&self) -> Option { Some(self.admin_pubkey.clone()) } } -impl PubkeyGetter for NodeFilters { +impl PubkeyGetter for VmNodeFilters { fn get_pubkey(&self) -> Option { None } } -impl PubkeyGetter for RegisterNodeReq { +impl PubkeyGetter for RegisterVmNodeReq { fn get_pubkey(&self) -> Option { Some(self.node_pubkey.clone()) } diff --git a/src/main.rs b/src/main.rs index ae04f30..ffc72f3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ mod grpc; use data::BrainData; use grpc::snp_proto::brain_cli_server::BrainCliServer; -use grpc::snp_proto::brain_daemon_server::BrainDaemonServer; +use grpc::snp_proto::brain_vm_daemon_server::BrainVmDaemonServer; use grpc::BrainCliMock; use grpc::BrainDaemonMock; use std::sync::Arc; @@ -19,12 +19,12 @@ async fn main() { tokio::spawn(async move { loop { tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - data_clone.contracts_cron().await; + data_clone.vm_contracts_cron().await; } }); let addr = "0.0.0.0:31337".parse().unwrap(); - let daemon_server = BrainDaemonServer::new(BrainDaemonMock::new(data.clone())); + let daemon_server = BrainVmDaemonServer::new(BrainDaemonMock::new(data.clone())); let cli_server = BrainCliServer::new(BrainCliMock::new(data.clone())); Server::builder() diff --git a/snp.proto b/vm.proto similarity index 84% rename from snp.proto rename to vm.proto index c54e4f6..e755772 100644 --- a/snp.proto +++ b/vm.proto @@ -1,5 +1,5 @@ syntax = "proto3"; -package snp_proto; +package vm_proto; message Empty { } @@ -13,7 +13,7 @@ message AccountBalance { uint64 tmp_locked = 2; } -message Contract { +message VmContract { string uuid = 1; string hostname = 2; string admin_pubkey = 3; @@ -53,7 +53,7 @@ message MeasurementIP { } // This should also include a block hash or similar, for auth -message RegisterNodeReq { +message RegisterVmNodeReq { string node_pubkey = 1; string owner_pubkey = 2; string main_ip = 3; @@ -64,7 +64,7 @@ message RegisterNodeReq { uint64 price = 7; } -message NodeResources { +message VmNodeResources { string node_pubkey = 1; uint32 avail_ports = 2; uint32 avail_ipv4 = 3; @@ -123,7 +123,7 @@ message DeleteVmReq { string admin_pubkey = 2; } -message BrainMessage { +message BrainVmMessage { oneof Msg { NewVmReq new_vm_req = 1; UpdateVmReq update_vm_req = 2; @@ -138,28 +138,28 @@ message DaemonStreamAuth { string signature = 4; } -message DaemonMessage { +message VmDaemonMessage { oneof Msg { DaemonStreamAuth auth = 1; NewVmResp new_vm_resp = 2; UpdateVmResp update_vm_resp = 3; - NodeResources node_resources = 4; + VmNodeResources vm_node_resources = 4; } } -service BrainDaemon { - rpc RegisterNode (RegisterNodeReq) returns (stream Contract); - rpc BrainMessages (DaemonStreamAuth) returns (stream BrainMessage); - rpc DaemonMessages (stream DaemonMessage) returns (Empty); +service BrainVmDaemon { + rpc RegisterVmNode (RegisterVmNodeReq) returns (stream VmContract); + rpc BrainMessages (DaemonStreamAuth) returns (stream BrainVmMessage); + rpc DaemonMessages (stream VmDaemonMessage) returns (Empty); } -message ListContractsReq { +message ListVmContractsReq { string admin_pubkey = 1; string node_pubkey = 2; string uuid = 3; } -message NodeFilters { +message VmNodeFilters { uint32 free_ports = 1; bool offers_ipv4 = 2; bool offers_ipv6 = 3; @@ -173,7 +173,7 @@ message NodeFilters { string node_pubkey = 11; } -message NodeListResp { +message VmNodeListResp { string node_pubkey = 1; string country = 2; string region = 3; @@ -205,14 +205,14 @@ message Account { service BrainCli { rpc GetBalance (Pubkey) returns (AccountBalance); rpc NewVm (NewVmReq) returns (NewVmResp); - rpc ListContracts (ListContractsReq) returns (stream Contract); - rpc ListNodes (NodeFilters) returns (stream NodeListResp); - rpc GetOneNode (NodeFilters) returns (NodeListResp); + rpc ListVmContracts (ListVmContractsReq) returns (stream VmContract); + rpc ListVmNodes (VmNodeFilters) returns (stream VmNodeListResp); + rpc GetOneVmNode (VmNodeFilters) returns (VmNodeListResp); rpc DeleteVm (DeleteVmReq) returns (Empty); rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp); rpc ExtendVm (ExtendVmReq) returns (Empty); // admin commands rpc Airdrop (AirdropReq) returns (Empty); - rpc ListAllContracts (Empty) returns (stream Contract); + rpc ListAllVmContracts (Empty) returns (stream VmContract); rpc ListAccounts (Empty) returns (stream Account); } From df805ea291f1065548ae3b69d84dc4087513ed14 Mon Sep 17 00:00:00 2001 From: Noor Date: Tue, 11 Feb 2025 21:03:05 +0530 Subject: [PATCH 07/11] add admin key --- src/grpc.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/grpc.rs b/src/grpc.rs index 6f1f1b1..2d222f2 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -19,6 +19,7 @@ use tonic::{Request, Response, Status, Streaming}; const ADMIN_ACCOUNTS: &[&str] = &[ "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK", "FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL", + "H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc", ]; pub struct BrainDaemonMock { From 5c213f2eb4a734fde34cec8c99c0d62ca375ae40 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Wed, 12 Feb 2025 02:12:11 +0200 Subject: [PATCH 08/11] small refactoring on var names and impls --- src/data.rs | 4 +-- src/grpc.rs | 86 +++++++++++++++++------------------------------------ 2 files changed, 29 insertions(+), 61 deletions(-) diff --git a/src/data.rs b/src/data.rs index d9c6459..13d3d6a 100644 --- a/src/data.rs +++ b/src/data.rs @@ -675,7 +675,7 @@ impl BrainData { .collect() } - pub fn find_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec { + pub fn find_vm_contracts_by_admin(&self, admin_pubkey: &str) -> Vec { debug!("Searching contracts for admin pubkey {admin_pubkey}"); let contracts: Vec = self .vm_contracts @@ -689,7 +689,7 @@ impl BrainData { contracts } - pub fn find_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec { + pub fn find_vm_contracts_by_node(&self, node_pubkey: &str) -> Vec { let contracts = self.vm_contracts.read().unwrap(); contracts .iter() diff --git a/src/grpc.rs b/src/grpc.rs index 2d222f2..a80f74b 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] pub mod snp_proto { tonic::include_proto!("vm_proto"); @@ -64,7 +63,7 @@ impl BrainVmDaemon for BrainDaemonMock { self.data.insert_node(node); info!("Sending existing contracts to {}", req.node_pubkey); - let contracts = self.data.find_contracts_by_node_pubkey(&req.node_pubkey); + let contracts = self.data.find_vm_contracts_by_node(&req.node_pubkey); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { @@ -218,7 +217,7 @@ impl BrainCli for BrainCliMock { Some(contract) => vec![contract], None => Vec::new(), }, - true => self.data.find_contracts_by_admin_pubkey(&req.admin_pubkey), + true => self.data.find_vm_contracts_by_admin(&req.admin_pubkey), }; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { @@ -328,65 +327,34 @@ trait PubkeyGetter { fn get_pubkey(&self) -> Option; } -impl PubkeyGetter for Pubkey { - fn get_pubkey(&self) -> Option { - Some(self.pubkey.clone()) - } +macro_rules! impl_pubkey_getter { + ($t:ty, $field:ident) => { + impl PubkeyGetter for $t { + fn get_pubkey(&self) -> Option { + Some(self.$field.clone()) + } + } + }; + ($t:ty) => { + impl PubkeyGetter for $t { + fn get_pubkey(&self) -> Option { + None + } + } + }; } -impl PubkeyGetter for NewVmReq { - fn get_pubkey(&self) -> Option { - Some(self.admin_pubkey.clone()) - } -} +impl_pubkey_getter!(Pubkey, pubkey); +impl_pubkey_getter!(NewVmReq, admin_pubkey); +impl_pubkey_getter!(DeleteVmReq, admin_pubkey); +impl_pubkey_getter!(UpdateVmReq, admin_pubkey); +impl_pubkey_getter!(ExtendVmReq, admin_pubkey); +impl_pubkey_getter!(ListVmContractsReq, admin_pubkey); +impl_pubkey_getter!(RegisterVmNodeReq, node_pubkey); -impl PubkeyGetter for DeleteVmReq { - fn get_pubkey(&self) -> Option { - Some(self.admin_pubkey.clone()) - } -} - -impl PubkeyGetter for UpdateVmReq { - fn get_pubkey(&self) -> Option { - Some(self.admin_pubkey.clone()) - } -} - -impl PubkeyGetter for ExtendVmReq { - fn get_pubkey(&self) -> Option { - Some(self.admin_pubkey.clone()) - } -} - -impl PubkeyGetter for ListVmContractsReq { - fn get_pubkey(&self) -> Option { - Some(self.admin_pubkey.clone()) - } -} - -impl PubkeyGetter for VmNodeFilters { - fn get_pubkey(&self) -> Option { - None - } -} - -impl PubkeyGetter for RegisterVmNodeReq { - fn get_pubkey(&self) -> Option { - Some(self.node_pubkey.clone()) - } -} - -impl PubkeyGetter for Empty { - fn get_pubkey(&self) -> Option { - None - } -} - -impl PubkeyGetter for AirdropReq { - fn get_pubkey(&self) -> Option { - None - } -} +impl_pubkey_getter!(VmNodeFilters); +impl_pubkey_getter!(Empty); +impl_pubkey_getter!(AirdropReq); fn check_sig_from_req(req: Request) -> Result { let time = match req.metadata().get("timestamp") { From 02be48fd96fff98fcdce3f1f6945063bd4402962 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Thu, 13 Feb 2025 01:47:56 +0200 Subject: [PATCH 09/11] add support for operators --- Cargo.lock | 31 +++- Cargo.toml | 8 +- src/data.rs | 469 +++++++++++++++++++++++++++++++++++++++++++++------- src/grpc.rs | 146 +++++++++++++--- src/main.rs | 4 + vm.proto | 77 +++++++-- 6 files changed, 632 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ad4f555..14d3b2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -244,7 +244,7 @@ dependencies = [ "prost-types", "reqwest", "serde", - "serde_json", + "serde_yaml", "thiserror", "tokio", "tokio-stream", @@ -305,6 +305,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets", ] @@ -401,6 +402,7 @@ dependencies = [ "lock_api", "once_cell", "parking_lot_core", + "serde", ] [[package]] @@ -1607,18 +1609,18 @@ checksum = "f79dfe2d285b0488816f30e700a7438c5a73d816b5b7d3ac72fbc48b0d185e03" [[package]] name = "serde" -version = "1.0.216" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.216" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", @@ -1649,6 +1651,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap 2.7.0", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha2" version = "0.10.8" @@ -2049,6 +2064,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 00e4298..3ae3262 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,16 +5,16 @@ edition = "2021" [dependencies] bs58 = "0.5.1" -chrono = "0.4.39" -dashmap = "6.1.0" +chrono = { version = "0.4.39", features = ["serde"] } +dashmap = { version = "6.1.0", features = ["serde"] } ed25519-dalek = "2.1.1" env_logger = "0.11.6" log = "0.4.22" prost = "0.13.4" prost-types = "0.13.4" reqwest = "0.12.10" -serde = { version = "1.0.216", features = ["derive"] } -serde_json = "1.0.134" +serde = { version = "1.0.217", features = ["derive"] } +serde_yaml = "0.9.34" thiserror = "2.0.11" tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } tokio-stream = "0.1.17" diff --git a/src/data.rs b/src/data.rs index 13d3d6a..9f7182e 100644 --- a/src/data.rs +++ b/src/data.rs @@ -2,29 +2,56 @@ use crate::grpc::snp_proto::{self as grpc}; use chrono::Utc; use dashmap::DashMap; use log::{debug, info, warn}; +use serde::{Deserialize, Serialize}; use std::str::FromStr; use std::sync::RwLock; +use std::{ + collections::{HashMap, HashSet}, + fs::File, + io::Write, +}; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Sender as OneshotSender; +const DATA_PATH: &str = "/etc/detee/brain-mock/saved_data.yaml"; + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("We do not allow locking of more than 100000 LP.")] TxTooBig, + #[error("Escrow must be at least 5000 LP.")] + MinimalEscrow, #[error("Account has insufficient funds for this operation")] InsufficientFunds, #[error("Could not find contract {0}")] VmContractNotFound(String), + #[error("This error should never happen.")] + ImpossibleError, + #[error("You don't have the required permissions for this operation.")] + AccessDenied, } -#[derive(Clone)] -pub struct AccountNanoLP { +#[derive(Clone, Default, Serialize, Deserialize)] +pub struct AccountData { pub balance: u64, pub tmp_locked: u64, + // holds reasons why VMs of this account got kicked + pub kicked_for: Vec, + pub last_kick: chrono::DateTime, + // holds accounts that banned this account + pub banned_by: HashSet, } -impl From for grpc::AccountBalance { - fn from(value: AccountNanoLP) -> Self { +#[derive(Clone, Default, Serialize, Deserialize)] +pub struct OperatorData { + pub escrow: u64, + pub email: String, + pub banned_users: HashSet, + pub vm_nodes: HashSet, +} + +impl From for grpc::AccountBalance { + fn from(value: AccountData) -> Self { grpc::AccountBalance { balance: value.balance, tmp_locked: value.tmp_locked, @@ -32,10 +59,10 @@ impl From for grpc::AccountBalance { } } -#[derive(Eq, Hash, PartialEq, Clone, Debug, Default)] +#[derive(Eq, PartialEq, Clone, Debug, Default, Serialize, Deserialize)] pub struct VmNode { pub public_key: String, - pub owner_key: String, + pub operator_wallet: String, pub country: String, pub region: String, pub city: String, @@ -49,24 +76,27 @@ pub struct VmNode { pub max_ports_per_vm: u32, // nanoLP per unit per minute pub price: u64, + // 1st String is user wallet and 2nd String is report message + pub reports: HashMap, + pub offline_minutes: u64, } impl Into for VmNode { fn into(self) -> grpc::VmNodeListResp { grpc::VmNodeListResp { + operator: self.operator_wallet, node_pubkey: self.public_key, country: self.country, region: self.region, city: self.city, ip: self.ip, - server_rating: 0, - provider_rating: 0, price: self.price, + reports: self.reports.into_values().collect(), } } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct VmContract { pub uuid: String, pub hostname: String, @@ -82,14 +112,15 @@ pub struct VmContract { pub dtrfs_sha: String, pub created_at: chrono::DateTime, pub updated_at: chrono::DateTime, - // price per unit per minute // recommended value is 20000 + /// price per unit per minute pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: chrono::DateTime, } impl VmContract { + /// total hardware units of this VM fn total_units(&self) -> u64 { // TODO: Optimize this based on price of hardware. // I tried, but this can be done better. @@ -100,7 +131,7 @@ impl VmContract { + (!self.public_ipv4.is_empty() as u64 * 10) } - // Returns price per minute in nanoLP + /// Returns price per minute in nanoLP fn price_per_minute(&self) -> u64 { self.total_units() * self.price_per_unit } @@ -131,78 +162,178 @@ impl Into for VmContract { } } -#[derive(Default)] +#[derive(Default, Serialize, Deserialize)] pub struct BrainData { // amount of nanoLP in each account - accounts: DashMap, + accounts: DashMap, + operators: DashMap, vm_nodes: RwLock>, vm_contracts: RwLock>, + #[serde(skip_serializing, skip_deserializing)] tmp_newvm_reqs: DashMap)>, + #[serde(skip_serializing, skip_deserializing)] tmp_updatevm_reqs: DashMap)>, + #[serde(skip_serializing, skip_deserializing)] daemon_tx: DashMap>, } impl BrainData { + pub fn save_to_disk(&self) -> Result<(), Box> { + let mut file = File::create(DATA_PATH)?; + file.write_all(serde_yaml::to_string(self)?.as_bytes())?; + Ok(()) + } + + fn load_from_disk() -> Result> { + let content = std::fs::read_to_string(DATA_PATH)?; + let data: Self = serde_yaml::from_str(&content)?; + Ok(data) + } + pub fn new() -> Self { - Self { - accounts: DashMap::new(), - vm_nodes: RwLock::new(Vec::new()), - vm_contracts: RwLock::new(Vec::new()), - tmp_newvm_reqs: DashMap::new(), - tmp_updatevm_reqs: DashMap::new(), - daemon_tx: DashMap::new(), + match Self::load_from_disk() { + Ok(data) => data, + Err(e) => { + warn!("Could not data {DATA_PATH} due to error: {e:?}"); + info!("Creating new instance of brain."); + Self { + accounts: DashMap::new(), + operators: DashMap::new(), + vm_nodes: RwLock::new(Vec::new()), + vm_contracts: RwLock::new(Vec::new()), + tmp_newvm_reqs: DashMap::new(), + tmp_updatevm_reqs: DashMap::new(), + daemon_tx: DashMap::new(), + } + } } } - pub fn get_balance(&self, account: &str) -> AccountNanoLP { + pub fn get_balance(&self, account: &str) -> AccountData { if let Some(account) = self.accounts.get(account) { return account.value().clone(); } else { - let balance = AccountNanoLP { + let balance = AccountData { balance: 0, tmp_locked: 0, + kicked_for: Vec::new(), + banned_by: HashSet::new(), + last_kick: chrono::Utc::now(), }; return balance; } } pub fn give_airdrop(&self, account: &str, tokens: u64) { + warn!("Airdropping {tokens} to {account}."); self.add_nano_to_wallet(account, tokens.saturating_mul(1_000_000_000)); } + pub fn slash_account(&self, account: &str, tokens: u64) { + warn!("Slashing {tokens} from {account}."); + self.rm_nano_from_wallet(account, tokens.saturating_mul(1_000_000_000)); + } + fn add_nano_to_wallet(&self, account: &str, nano_lp: u64) { log::debug!("Adding {nano_lp} nanoLP to {account}"); self.accounts .entry(account.to_string()) .and_modify(|d| d.balance += nano_lp) - .or_insert(AccountNanoLP { + .or_insert(AccountData { balance: nano_lp, - tmp_locked: 0, + ..Default::default() }); } + fn rm_nano_from_wallet(&self, account: &str, nano_lp: u64) { + log::debug!("Slashing {nano_lp} nanoLP to {account}"); + self.accounts.entry(account.to_string()).and_modify(|d| { + d.balance = d.balance.saturating_sub(nano_lp); + }); + } + + /// This is written to run every minute + pub async fn vm_nodes_cron(&self) { + log::debug!("Running vm nodes cron..."); + let mut nodes = self.vm_nodes.write().unwrap(); + let mut vm_contracts = self.vm_contracts.write().unwrap(); + for node in nodes.iter_mut() { + if self.daemon_tx.contains_key(&node.public_key) { + node.offline_minutes = 0; + continue; + } + let mut operator = match self + .operators + .iter_mut() + .find(|o| o.vm_nodes.contains(&node.public_key)) + { + Some(op) => op, + None => continue, + }; + node.offline_minutes += 1; + // compensate contract admin if the node is offline more then 5 minutes + if node.offline_minutes > 5 { + for c in vm_contracts + .iter() + .filter(|c| c.node_pubkey == node.public_key) + { + let compensation = c.price_per_minute() * 24; + if compensation < operator.escrow { + operator.escrow -= compensation; + self.add_nano_to_wallet(&c.admin_pubkey, compensation); + } + } + } + } + // delete nodes that are offline more than 3 hours, and clean contracts + nodes.retain(|n| { + if n.offline_minutes > 180 { + vm_contracts.retain_mut(|c| { + if c.node_pubkey == n.public_key { + self.add_nano_to_wallet(&c.admin_pubkey, c.locked_nano); + } + c.node_pubkey != n.public_key + }); + for mut op in self.operators.iter_mut() { + op.vm_nodes.remove(&n.public_key); + } + } + n.offline_minutes <= 180 + }); + } + pub async fn vm_contracts_cron(&self) { let mut deleted_contracts: Vec<(String, String)> = Vec::new(); log::debug!("Running contracts cron..."); { let mut contracts = self.vm_contracts.write().unwrap(); contracts.retain_mut(|c| { - let owner_key = self - .find_nodes_by_pubkey(&c.node_pubkey) - .unwrap() - .owner_key - .clone(); - let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; - c.collected_at = Utc::now(); - let mut nanolp_to_collect = c.price_per_minute().saturating_mul(minutes_to_collect); - if nanolp_to_collect > c.locked_nano { - nanolp_to_collect = c.locked_nano; - } - log::debug!("Removing {nanolp_to_collect} nanoLP from {}", c.uuid); - c.locked_nano -= nanolp_to_collect; - self.add_nano_to_wallet(&owner_key, nanolp_to_collect); - if c.locked_nano == 0 { - deleted_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); + let node = self.find_node_by_pubkey(&c.node_pubkey).unwrap(); + if node.offline_minutes == 0 { + let operator_wallet = node.operator_wallet.clone(); + let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; + c.collected_at = Utc::now(); + let mut nanolp_to_collect = + c.price_per_minute().saturating_mul(minutes_to_collect); + if nanolp_to_collect > c.locked_nano { + nanolp_to_collect = c.locked_nano; + } + log::debug!("Removing {nanolp_to_collect} nanoLP from {}", c.uuid); + c.locked_nano -= nanolp_to_collect; + let escrow_multiplier = match self.operators.get(&operator_wallet) { + Some(op) if op.escrow > 5000 => match self.operators.get(&c.admin_pubkey) { + Some(user_is_op) if user_is_op.escrow > 5000 => 1, + _ => 5, + }, + _ => 1, + }; + self.add_nano_to_wallet( + &operator_wallet, + nanolp_to_collect * escrow_multiplier, + ); + if c.locked_nano == 0 { + deleted_contracts.push((c.uuid.clone(), c.node_pubkey.clone())); + } } c.locked_nano > 0 }); @@ -222,8 +353,9 @@ impl BrainData { } } - pub fn insert_node(&self, node: VmNode) { + pub fn register_node(&self, node: VmNode) { info!("Registering node {node:?}"); + self.add_vmnode_to_operator(&node.operator_wallet, &node.public_key); let mut nodes = self.vm_nodes.write().unwrap(); for n in nodes.iter_mut() { if n.public_key == node.public_key { @@ -236,6 +368,101 @@ impl BrainData { nodes.push(node); } + // todo: this should also support Apps + /// Receives: operator, contract uuid, reason of kick + pub async fn kick_contract( + &self, + operator: &str, + uuid: &str, + reason: &str, + ) -> Result { + log::debug!("Operator {operator} requested a kick of {uuid} for reason: {reason}"); + let contract = self.find_contract_by_uuid(uuid)?; + let mut operator_data = self + .operators + .get_mut(operator) + .ok_or(Error::AccessDenied)?; + if !operator_data.vm_nodes.contains(&contract.node_pubkey) { + return Err(Error::AccessDenied); + } + + let mut minutes_to_refund = chrono::Utc::now() + .signed_duration_since(contract.updated_at) + .num_minutes() + .abs() as u64; + // cap refund at 1 week + if minutes_to_refund > 10080 { + minutes_to_refund = 10080; + } + + let mut refund_amount = minutes_to_refund * contract.price_per_minute(); + let mut admin_account = self + .accounts + .get_mut(&contract.admin_pubkey) + .ok_or(Error::ImpossibleError)?; + + // check if he got kicked within the last day + if !chrono::Utc::now() + .signed_duration_since(admin_account.last_kick) + .gt(&chrono::Duration::days(1)) + { + refund_amount = 0; + } + + if operator_data.escrow < refund_amount { + refund_amount = operator_data.escrow; + } + + log::debug!( + "Removing {refund_amount} escrow from {} and giving it to {}", + operator_data.key(), + admin_account.key() + ); + admin_account.balance += refund_amount; + admin_account.kicked_for.push(reason.to_string()); + operator_data.escrow -= refund_amount; + + let admin_pubkey = contract.admin_pubkey.clone(); + drop(admin_account); + drop(contract); + + self.delete_vm(grpc::DeleteVmReq { + uuid: uuid.to_string(), + admin_pubkey, + }) + .await?; + + Ok(refund_amount) + } + + pub fn ban_user(&self, operator: &str, user: &str) { + self.accounts + .entry(user.to_string()) + .and_modify(|a| { + a.banned_by.insert(operator.to_string()); + }) + .or_insert(AccountData { + banned_by: HashSet::from([operator.to_string()]), + ..Default::default() + }); + self.operators + .entry(operator.to_string()) + .and_modify(|o| { + o.banned_users.insert(user.to_string()); + }) + .or_insert(OperatorData { + banned_users: HashSet::from([user.to_string()]), + ..Default::default() + }); + } + + pub fn report_node(&self, admin_pubkey: String, node: &str, report: String) { + let mut nodes = self.vm_nodes.write().unwrap(); + if let Some(node) = nodes.iter_mut().find(|n| n.public_key == node) { + node.reports.insert(admin_pubkey, report); + } + } + pub fn lock_nanotockens(&self, account: &str, nano_lp: u64) -> Result<(), Error> { if nano_lp > 100_000_000_000_000 { return Err(Error::TxTooBig); @@ -321,17 +548,11 @@ impl BrainData { } pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) -> Result<(), Error> { - let contract = match self.find_contract_by_uuid(&delete_vm.uuid) { - Some(contract) => { - if contract.admin_pubkey != delete_vm.admin_pubkey { - return Err(Error::VmContractNotFound(delete_vm.uuid)); - } - contract - } - None => { - return Err(Error::VmContractNotFound(delete_vm.uuid)); - } - }; + log::debug!("Starting deletion of VM {}", delete_vm.uuid); + let contract = self.find_contract_by_uuid(&delete_vm.uuid)?; + if contract.admin_pubkey != delete_vm.admin_pubkey { + return Err(Error::AccessDenied); + } info!("Found vm {}. Deleting...", delete_vm.uuid); if let Some(daemon_tx) = self.daemon_tx.get(&contract.node_pubkey) { debug!( @@ -540,7 +761,7 @@ impl BrainData { let uuid = req.uuid.clone(); info!("Inserting new vm update request in memory: {req:?}"); let node_pubkey = match self.find_contract_by_uuid(&req.uuid) { - Some(contract) => { + Ok(contract) => { if contract.admin_pubkey != req.admin_pubkey { let _ = tx.send(grpc::UpdateVmResp { uuid, @@ -551,7 +772,7 @@ impl BrainData { } contract.node_pubkey } - None => { + Err(_) => { log::warn!( "Received UpdateVMReq for a contract that does not exist: {}", req.uuid @@ -602,12 +823,81 @@ impl BrainData { } } - pub fn find_nodes_by_pubkey(&self, public_key: &str) -> Option { + pub fn find_node_by_pubkey(&self, public_key: &str) -> Option { let nodes = self.vm_nodes.read().unwrap(); nodes.iter().cloned().find(|n| n.public_key == public_key) } - pub fn find_nodes_by_filters( + pub fn is_user_banned_by_node(&self, user_wallet: &str, node_pubkey: &str) -> bool { + if let Some(node) = self.find_node_by_pubkey(&node_pubkey) { + if let Some(account) = self.accounts.get(user_wallet) { + if account.banned_by.contains(&node.operator_wallet) { + return true; + } + } + } + false + } + + pub fn add_vmnode_to_operator(&self, operator_wallet: &str, node_pubkey: &str) { + self.operators + .entry(operator_wallet.to_string()) + .and_modify(|op| { + op.vm_nodes.insert(node_pubkey.to_string()); + }) + .or_insert(OperatorData { + escrow: 0, + email: String::new(), + banned_users: HashSet::new(), + vm_nodes: HashSet::from([node_pubkey.to_string()]), + }); + } + + pub fn register_operator(&self, req: grpc::RegOperatorReq) -> Result<(), Error> { + let mut operator = match self.operators.get(&req.pubkey) { + Some(o) => (*(o.value())).clone(), + None => OperatorData { + ..Default::default() + }, + }; + if req.escrow < 5000 { + return Err(Error::MinimalEscrow); + } + let escrow = req.escrow * 1_000_000_000; + if let Some(mut account) = self.accounts.get_mut(&req.pubkey) { + if (account.balance + operator.escrow) < escrow { + return Err(Error::InsufficientFunds); + } + account.balance = account.balance + operator.escrow - escrow; + operator.escrow = escrow; + } else { + return Err(Error::InsufficientFunds); + } + operator.email = req.email; + self.operators.insert(req.pubkey, operator); + Ok(()) + } + + pub fn find_vm_nodes_by_operator(&self, operator_wallet: &str) -> Vec { + let nodes = self.vm_nodes.read().unwrap(); + nodes + .iter() + .filter(|node| node.operator_wallet == operator_wallet) + .cloned() + .collect() + } + + pub fn total_operator_reports(&self, operator_wallet: &str) -> usize { + let nodes = self.vm_nodes.read().unwrap(); + nodes + .iter() + .cloned() + .filter(|n| n.operator_wallet == operator_wallet) + .map(|node| node.reports.len()) + .sum() + } + + pub fn find_vm_nodes_by_filters( &self, filters: &crate::grpc::snp_proto::VmNodeFilters, ) -> Vec { @@ -654,9 +944,13 @@ impl BrainData { .cloned() } - pub fn find_contract_by_uuid(&self, uuid: &str) -> Option { + pub fn find_contract_by_uuid(&self, uuid: &str) -> Result { let contracts = self.vm_contracts.read().unwrap(); - contracts.iter().cloned().find(|c| c.uuid == uuid) + contracts + .iter() + .cloned() + .find(|c| c.uuid == uuid) + .ok_or(Error::VmContractNotFound(uuid.to_string())) } pub fn list_all_contracts(&self) -> Vec { @@ -675,17 +969,68 @@ impl BrainData { .collect() } - pub fn find_vm_contracts_by_admin(&self, admin_pubkey: &str) -> Vec { - debug!("Searching contracts for admin pubkey {admin_pubkey}"); + pub fn list_operators(&self) -> Vec { + self.operators + .iter() + .map(|op| grpc::ListOperatorsResp { + pubkey: op.key().to_string(), + escrow: op.escrow / 1_000_000_000, + email: op.email.clone(), + app_nodes: 0, + vm_nodes: op.vm_nodes.len() as u64, + reports: self.total_operator_reports(op.key()) as u64, + }) + .collect() + } + + pub fn inspect_operator(&self, wallet: &str) -> Option { + self.operators.get(wallet).map(|op| { + let nodes = self + .find_vm_nodes_by_operator(wallet) + .into_iter() + .map(|n| n.into()) + .collect(); + grpc::InspectOperatorResp { + operator: Some(grpc::ListOperatorsResp { + pubkey: op.key().to_string(), + escrow: op.escrow, + email: op.email.clone(), + app_nodes: 0, + vm_nodes: op.vm_nodes.len() as u64, + reports: self.total_operator_reports(op.key()) as u64, + }), + nodes, + } + }) + } + + pub fn find_vm_contracts_by_operator(&self, wallet: &str) -> Vec { + debug!("Searching contracts for operator {wallet}"); + let nodes = match self.operators.get(wallet) { + Some(op) => op.vm_nodes.clone(), + None => return Vec::new(), + }; let contracts: Vec = self .vm_contracts .read() .unwrap() .iter() - .filter(|c| c.admin_pubkey == admin_pubkey) + .filter(|c| nodes.contains(&c.node_pubkey)) + .cloned() + .collect(); + contracts + } + + pub fn find_vm_contracts_by_admin(&self, admin_wallet: &str) -> Vec { + debug!("Searching contracts for admin pubkey {admin_wallet}"); + let contracts: Vec = self + .vm_contracts + .read() + .unwrap() + .iter() + .filter(|c| c.admin_pubkey == admin_wallet) .cloned() .collect(); - debug!("Found {} contracts or {admin_pubkey}.", contracts.len()); contracts } diff --git a/src/grpc.rs b/src/grpc.rs index a80f74b..f3a6327 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,10 +1,9 @@ - pub mod snp_proto { tonic::include_proto!("vm_proto"); } -use crate::grpc::vm_daemon_message; use crate::data::BrainData; +use crate::grpc::vm_daemon_message; use log::info; use snp_proto::brain_cli_server::BrainCli; use snp_proto::brain_vm_daemon_server::BrainVmDaemon; @@ -52,7 +51,7 @@ impl BrainVmDaemon for BrainDaemonMock { info!("Starting registration process for {:?}", req); let node = crate::data::VmNode { public_key: req.node_pubkey.clone(), - owner_key: req.owner_pubkey, + operator_wallet: req.operator_wallet, country: req.country, region: req.region, city: req.city, @@ -60,7 +59,7 @@ impl BrainVmDaemon for BrainDaemonMock { price: req.price, ..Default::default() }; - self.data.insert_node(node); + self.data.register_node(node); info!("Sending existing contracts to {}", req.node_pubkey); let contracts = self.data.find_vm_contracts_by_node(&req.node_pubkey); @@ -161,6 +160,14 @@ impl BrainCli for BrainCliMock { async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("New VM requested via CLI: {req:?}"); + if self + .data + .is_user_banned_by_node(&req.admin_pubkey, &req.node_pubkey) + { + return Err(Status::permission_denied( + "This operator banned you. What did you do?", + )); + } let admin_pubkey = req.admin_pubkey.clone(); let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); self.data.submit_newvm_req(req, oneshot_tx).await; @@ -205,20 +212,55 @@ impl BrainCli for BrainCliMock { } } + async fn delete_vm(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + match self.data.delete_vm(req).await { + Ok(()) => Ok(Response::new(Empty {})), + Err(e) => Err(Status::not_found(e.to_string())), + } + } + + async fn report_node(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + match self.data.find_contract_by_uuid(&req.contract) { + Ok(contract) + if contract.admin_pubkey == req.admin_pubkey + && contract.node_pubkey == req.node_pubkey => + { + () + } + _ => return Err(Status::unauthenticated("No contract found by this ID.")), + }; + self.data + .report_node(req.admin_pubkey, &req.node_pubkey, req.reason); + Ok(Response::new(Empty {})) + } + type ListVmContractsStream = Pin> + Send>>; async fn list_vm_contracts( &self, req: Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; - info!("CLI {} requested ListVMVmContractsStream", req.admin_pubkey); - let contracts = match req.uuid.is_empty() { - false => match self.data.find_contract_by_uuid(&req.uuid) { - Some(contract) => vec![contract], - None => Vec::new(), - }, - true => self.data.find_vm_contracts_by_admin(&req.admin_pubkey), - }; + info!( + "CLI {} requested ListVMVmContractsStream. As operator: {}", + req.wallet, req.as_operator + ); + let mut contracts = Vec::new(); + if !req.uuid.is_empty() { + if let Ok(specific_contract) = self.data.find_contract_by_uuid(&req.uuid) { + if specific_contract.admin_pubkey == req.wallet { + contracts.push(specific_contract); + } + // TODO: allow operator to inspect contracts + } + } else { + if req.as_operator { + contracts.append(&mut self.data.find_vm_contracts_by_operator(&req.wallet)); + } else { + contracts.append(&mut self.data.find_vm_contracts_by_admin(&req.wallet)); + } + } let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { @@ -238,7 +280,7 @@ impl BrainCli for BrainCliMock { ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("CLI requested ListVmNodesStream: {req:?}"); - let nodes = self.data.find_nodes_by_filters(&req); + let nodes = self.data.find_vm_nodes_by_filters(&req); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for node in nodes { @@ -265,12 +307,65 @@ impl BrainCli for BrainCliMock { } } - async fn delete_vm(&self, req: Request) -> Result, Status> { + async fn register_operator( + &self, + req: Request, + ) -> Result, Status> { let req = check_sig_from_req(req)?; - info!("Unknown CLI requested to delete vm {}", req.uuid); - match self.data.delete_vm(req).await { + info!("Regitering new operator: {req:?}"); + match self.data.register_operator(req) { Ok(()) => Ok(Response::new(Empty {})), - Err(e) => Err(Status::not_found(e.to_string())), + Err(e) => Err(Status::failed_precondition(e.to_string())), + } + } + + async fn kick_contract(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + match self + .data + .kick_contract(&req.operator_wallet, &req.contract_uuid, &req.reason) + .await + { + Ok(nano_lp) => Ok(Response::new(KickResp { nano_lp })), + Err(e) => Err(Status::permission_denied(e.to_string())), + } + } + + async fn ban_user(&self, req: Request) -> Result, Status> { + let req = check_sig_from_req(req)?; + self.data.ban_user(&req.operator_wallet, &req.user_wallet); + Ok(Response::new(Empty {})) + } + + type ListOperatorsStream = + Pin> + Send>>; + async fn list_operators( + &self, + req: Request, + ) -> Result, Status> { + let _ = check_sig_from_req(req)?; + let operators = self.data.list_operators(); + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for op in operators { + let _ = tx.send(Ok(op.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::ListOperatorsStream + )) + } + + async fn inspect_operator( + &self, + req: Request, + ) -> Result, Status> { + match self.data.inspect_operator(&req.into_inner().pubkey) { + Some(op) => Ok(Response::new(op.into())), + None => Err(Status::not_found( + "The wallet you specified is not an operator", + )), } } @@ -278,7 +373,14 @@ impl BrainCli for BrainCliMock { check_admin_key(&req)?; let req = check_sig_from_req(req)?; self.data.give_airdrop(&req.pubkey, req.tokens); - Ok(Response::new(Empty{})) + Ok(Response::new(Empty {})) + } + + async fn slash(&self, req: Request) -> Result, Status> { + check_admin_key(&req)?; + let req = check_sig_from_req(req)?; + self.data.slash_account(&req.pubkey, req.tokens); + Ok(Response::new(Empty {})) } type ListAllVmContractsStream = Pin> + Send>>; @@ -320,7 +422,6 @@ impl BrainCli for BrainCliMock { Box::pin(output_stream) as Self::ListAccountsStream )) } - } trait PubkeyGetter { @@ -349,12 +450,17 @@ impl_pubkey_getter!(NewVmReq, admin_pubkey); impl_pubkey_getter!(DeleteVmReq, admin_pubkey); impl_pubkey_getter!(UpdateVmReq, admin_pubkey); impl_pubkey_getter!(ExtendVmReq, admin_pubkey); -impl_pubkey_getter!(ListVmContractsReq, admin_pubkey); +impl_pubkey_getter!(ReportNodeReq, admin_pubkey); +impl_pubkey_getter!(ListVmContractsReq, wallet); impl_pubkey_getter!(RegisterVmNodeReq, node_pubkey); +impl_pubkey_getter!(RegOperatorReq, pubkey); +impl_pubkey_getter!(KickReq, operator_wallet); +impl_pubkey_getter!(BanUserReq, operator_wallet); impl_pubkey_getter!(VmNodeFilters); impl_pubkey_getter!(Empty); impl_pubkey_getter!(AirdropReq); +impl_pubkey_getter!(SlashReq); fn check_sig_from_req(req: Request) -> Result { let time = match req.metadata().get("timestamp") { diff --git a/src/main.rs b/src/main.rs index ffc72f3..64936a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,11 @@ async fn main() { tokio::spawn(async move { loop { tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + data_clone.vm_nodes_cron().await; data_clone.vm_contracts_cron().await; + if let Err(e) = data_clone.save_to_disk() { + log::error!("Could not save data to disk due to error: {e}") + } } }); let addr = "0.0.0.0:31337".parse().unwrap(); diff --git a/vm.proto b/vm.proto index e755772..f0f4d2e 100644 --- a/vm.proto +++ b/vm.proto @@ -55,7 +55,7 @@ message MeasurementIP { // This should also include a block hash or similar, for auth message RegisterVmNodeReq { string node_pubkey = 1; - string owner_pubkey = 2; + string operator_wallet = 2; string main_ip = 3; string country = 4; string region = 5; @@ -154,8 +154,8 @@ service BrainVmDaemon { } message ListVmContractsReq { - string admin_pubkey = 1; - string node_pubkey = 2; + string wallet = 1; + bool as_operator = 2; string uuid = 3; } @@ -174,15 +174,14 @@ message VmNodeFilters { } message VmNodeListResp { - string node_pubkey = 1; - string country = 2; - string region = 3; - string city = 4; - string ip = 5; // required for latency test - uint32 server_rating = 6; - uint32 provider_rating = 7; - // nanoLP per unit per minute - uint64 price = 8; + string operator = 1; + string node_pubkey = 2; + string country = 3; + string region = 4; + string city = 5; + string ip = 6; // required for latency test + repeated string reports = 7; // TODO: this will become an enum + uint64 price = 8; // nanoLP per unit per minute } message ExtendVmReq { @@ -196,12 +195,59 @@ message AirdropReq { uint64 tokens = 2; } +message SlashReq { + string pubkey = 1; + uint64 tokens = 2; +} + message Account { string pubkey = 1; uint64 balance = 2; uint64 tmp_locked = 3; } +message RegOperatorReq { + string pubkey = 1; + uint64 escrow = 2; + string email = 3; +} + +message ListOperatorsResp { + string pubkey = 1; + uint64 escrow = 2; + string email = 3; + uint64 app_nodes = 4; + uint64 vm_nodes = 5; + uint64 reports = 6; +} + +message InspectOperatorResp { + ListOperatorsResp operator = 1; + repeated VmNodeListResp nodes = 2; +} + +message ReportNodeReq { + string admin_pubkey = 1; + string node_pubkey = 2; + string contract = 3; + string reason = 4; +} + +message KickReq { + string operator_wallet = 1; + string contract_uuid = 2; + string reason = 3; +} + +message BanUserReq { + string operator_wallet = 1; + string user_wallet = 2; +} + +message KickResp { + uint64 nano_lp = 1; +} + service BrainCli { rpc GetBalance (Pubkey) returns (AccountBalance); rpc NewVm (NewVmReq) returns (NewVmResp); @@ -211,8 +257,15 @@ service BrainCli { rpc DeleteVm (DeleteVmReq) returns (Empty); rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp); rpc ExtendVm (ExtendVmReq) returns (Empty); + rpc ReportNode (ReportNodeReq) returns (Empty); + rpc ListOperators (Empty) returns (stream ListOperatorsResp); + rpc InspectOperator (Pubkey) returns (InspectOperatorResp); + rpc RegisterOperator (RegOperatorReq) returns (Empty); + rpc KickContract (KickReq) returns (KickResp); + rpc BanUser (BanUserReq) returns (Empty); // admin commands rpc Airdrop (AirdropReq) returns (Empty); + rpc Slash (SlashReq) returns (Empty); rpc ListAllVmContracts (Empty) returns (stream VmContract); rpc ListAccounts (Empty) returns (stream Account); } From 64b65d7ecdf020e69263b4cb091ed740f804276c Mon Sep 17 00:00:00 2001 From: ghe0 Date: Fri, 28 Feb 2025 00:37:34 +0200 Subject: [PATCH 10/11] don't delete VMs if node is offline for < 1day --- src/data.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/data.rs b/src/data.rs index 9f7182e..13e02e0 100644 --- a/src/data.rs +++ b/src/data.rs @@ -277,7 +277,7 @@ impl BrainData { .iter() .filter(|c| c.node_pubkey == node.public_key) { - let compensation = c.price_per_minute() * 24; + let compensation = c.price_per_minute() * 10; if compensation < operator.escrow { operator.escrow -= compensation; self.add_nano_to_wallet(&c.admin_pubkey, compensation); @@ -287,7 +287,7 @@ impl BrainData { } // delete nodes that are offline more than 3 hours, and clean contracts nodes.retain(|n| { - if n.offline_minutes > 180 { + if n.offline_minutes > 1600 { vm_contracts.retain_mut(|c| { if c.node_pubkey == n.public_key { self.add_nano_to_wallet(&c.admin_pubkey, c.locked_nano); From 70bf3b5e73c113fb95161ae9ddb4a66d26238689 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Thu, 6 Mar 2025 01:30:51 +0200 Subject: [PATCH 11/11] pass update error from daemon to CLI --- src/data.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/data.rs b/src/data.rs index 13e02e0..17add83 100644 --- a/src/data.rs +++ b/src/data.rs @@ -661,6 +661,11 @@ impl BrainData { return; } }; + if let Err(e) = update_vm_req.1.send(update_vm_resp.clone()) { + log::warn!( + "CLI RX dropped before receiving UpdateVMResp {update_vm_resp:?}. Error: {e:?}" + ); + } if update_vm_resp.error != "" { return; } @@ -697,11 +702,6 @@ impl BrainData { update_vm_resp.error = "VM Contract not found.".to_string(); } } - if let Err(e) = update_vm_req.1.send(update_vm_resp.clone()) { - log::warn!( - "CLI RX dropped before receiving UpdateVMResp {update_vm_resp:?}. Error: {e:?}" - ); - } } pub async fn submit_newvm_req(