diff --git a/Cargo.lock b/Cargo.lock index 8bbe91b..af7229c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -219,7 +219,6 @@ checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" name = "brain-mock" version = "0.1.0" dependencies = [ - "anyhow", "chrono", "dashmap", "env_logger", @@ -229,6 +228,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "thiserror", "tokio", "tokio-stream", "tonic", @@ -1600,6 +1600,26 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "thiserror" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tinystr" version = "0.7.6" diff --git a/Cargo.toml b/Cargo.toml index 604a4af..5ef1cd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,6 @@ version = "0.1.0" edition = "2021" [dependencies] -anyhow = "1.0.95" chrono = "0.4.39" dashmap = "6.1.0" env_logger = "0.11.6" @@ -14,6 +13,7 @@ prost-types = "0.13.4" reqwest = "0.12.10" serde = { version = "1.0.216", features = ["derive"] } serde_json = "1.0.134" +thiserror = "2.0.11" tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } tokio-stream = "0.1.17" tonic = "0.12" diff --git a/snp.proto b/snp.proto index 89f637b..4addaf3 100644 --- a/snp.proto +++ b/snp.proto @@ -8,6 +8,11 @@ message Pubkey { string pubkey = 1; } +message AccountBalance { + uint64 balance = 1; + uint64 tmp_locked = 2; +} + message Contract { string uuid = 1; string hostname = 2; @@ -23,6 +28,10 @@ message Contract { string dtrfs_sha = 12; string created_at = 13; string updated_at = 14; + // total nanotoken cost per minute (for all units) + uint64 nano_per_minute = 15; + uint64 locked_nano = 16; + string collected_at = 17; } message MeasurementArgs { @@ -47,9 +56,11 @@ message RegisterNodeReq { string node_pubkey = 1; string owner_pubkey = 2; string main_ip = 3; - string country = 7; - string region = 8; - string city = 9; + string country = 4; + string region = 5; + string city = 6; + // nanotokens per unit per minute + uint64 price = 7; } message NodeResources { @@ -78,6 +89,8 @@ message NewVmReq { string kernel_sha = 12; string dtrfs_url = 13; string dtrfs_sha = 14; + uint64 price_per_unit = 15; + uint64 locked_nano = 16; } message NewVmResp { @@ -147,6 +160,7 @@ message NodeFilters { string region = 8; string city = 9; string ip = 10; + string node_pubkey = 11; } message NodeListResp { @@ -157,9 +171,13 @@ message NodeListResp { string ip = 5; // required for latency test uint32 server_rating = 6; uint32 provider_rating = 7; + // nanotokens per unit per minute + uint64 price = 8; } service BrainCli { + rpc GetAirdrop (Pubkey) returns (Empty); + rpc GetBalance (Pubkey) returns (AccountBalance); rpc NewVm (NewVmReq) returns (NewVmResp); rpc ListContracts (ListContractsReq) returns (stream Contract); rpc ListNodes (NodeFilters) returns (stream NodeListResp); diff --git a/src/data.rs b/src/data.rs index ac021aa..4736b60 100644 --- a/src/data.rs +++ b/src/data.rs @@ -8,6 +8,31 @@ use std::sync::RwLock; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot::Sender as OneshotSender; +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("We do not allow locking of more than 100000 tokens.")] + TxTooBig, + #[error("Account has insufficient funds for this operation")] + InsufficientFunds, + #[error("I have no idea how this happened. Please report this bug.")] + ImpossibleError, +} + +#[derive(Clone)] +pub struct AccountNanoTokens { + pub balance: u64, + pub tmp_locked: u64, +} + +impl From for grpc::AccountBalance { + fn from(value: AccountNanoTokens) -> Self { + grpc::AccountBalance { + balance: value.balance, + tmp_locked: value.tmp_locked, + } + } +} + #[derive(Eq, Hash, PartialEq, Clone, Debug, Default)] pub struct Node { pub public_key: String, @@ -23,6 +48,8 @@ pub struct Node { pub avail_ipv6: u32, pub avail_ports: u32, pub max_ports_per_vm: u32, + // nanotokens per unit per minute + pub price: u64, } impl Into for Node { @@ -35,6 +62,7 @@ impl Into for Node { ip: self.ip, server_rating: 0, provider_rating: 0, + price: self.price, } } } @@ -53,12 +81,35 @@ pub struct Contract { pub memory_mb: u32, pub kernel_sha: String, pub dtrfs_sha: String, - pub created_at: String, - pub updated_at: String, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, + // price per unit per minute + // recommended value is 20000 + pub price_per_unit: u64, + pub locked_nano: u64, + pub collected_at: chrono::DateTime, +} + +impl Contract { + fn total_units(&self) -> u64 { + // TODO: Optimize this based on price of hardware. + // I tried, but this can be done better. + // Storage cost should also be based on tier + (self.vcpus as u64 * 10) + + ((self.memory_mb + 256) as u64 * 4 / 100) + + (self.disk_size_gb as u64 / 10) + + (!self.public_ipv4.is_empty() as u64 * 10) + } + + // Returns price per minute in nanotokens + fn price_per_minute(&self) -> u64 { + self.total_units() * self.price_per_unit + } } impl Into for Contract { fn into(self) -> grpc::Contract { + let nano_per_minute = self.price_per_minute(); grpc::Contract { uuid: self.uuid, hostname: self.hostname, @@ -72,14 +123,19 @@ impl Into for Contract { memory_mb: self.memory_mb, kernel_sha: self.kernel_sha, dtrfs_sha: self.dtrfs_sha, - created_at: self.created_at, - updated_at: self.updated_at, + created_at: self.created_at.to_rfc3339(), + updated_at: self.updated_at.to_rfc3339(), + nano_per_minute, + locked_nano: self.locked_nano, + collected_at: self.collected_at.to_rfc3339(), } } } #[derive(Default)] pub struct BrainData { + // amount of nanotokens in each account + accounts: DashMap, nodes: RwLock>, contracts: RwLock>, tmp_newvm_reqs: DashMap)>, @@ -97,6 +153,7 @@ enum TxType { impl BrainData { pub fn new() -> Self { Self { + accounts: DashMap::new(), nodes: RwLock::new(Vec::new()), contracts: RwLock::new(Vec::new()), tmp_newvm_reqs: DashMap::new(), @@ -105,6 +162,59 @@ impl BrainData { } } + pub fn get_balance(&self, account: &str) -> AccountNanoTokens { + if let Some(account) = self.accounts.get(account) { + return account.value().clone(); + } else { + let balance = AccountNanoTokens { + balance: 0, + tmp_locked: 0, + }; + return balance; + } + } + + pub fn get_airdrop(&self, account: &str) { + self.add_nano_to_wallet(account, 1000_000000000); + } + + fn add_nano_to_wallet(&self, account: &str, nanotokens: u64) { + log::debug!("Adding {nanotokens} nanotokens to {account}"); + self.accounts + .entry(account.to_string()) + .and_modify(|tokens| tokens.balance += nanotokens) + .or_insert(AccountNanoTokens { + balance: nanotokens, + tmp_locked: 0, + }); + } + + pub fn contracts_cron(&self) { + log::debug!("Running contracts cron..."); + let mut contracts = self.contracts.write().unwrap(); + contracts.retain_mut(|c| { + let owner_key = self + .find_nodes_by_pubkey(&c.node_pubkey) + .unwrap() + .owner_key + .clone(); + let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64; + c.collected_at = Utc::now(); + log::debug!("{minutes_to_collect}"); + let mut nanotokens_to_collect = c.price_per_minute().saturating_mul(minutes_to_collect); + if nanotokens_to_collect > c.locked_nano { + nanotokens_to_collect = c.locked_nano; + } + log::debug!( + "Removing {nanotokens_to_collect} nanotokens from {}", + c.uuid + ); + c.locked_nano -= nanotokens_to_collect; + self.add_nano_to_wallet(&owner_key, nanotokens_to_collect); + c.locked_nano > 0 + }); + } + pub fn insert_node(&self, node: Node) { info!("Registering node {node:?}"); let mut nodes = self.nodes.write().unwrap(); @@ -119,6 +229,35 @@ impl BrainData { nodes.push(node); } + pub fn lock_nanotockens(&self, account: &str, nanotokens: u64) -> Result<(), Error> { + if nanotokens > 100_000_000_000_000 { + return Err(Error::TxTooBig); + } + if let Some(mut account) = self.accounts.get_mut(account) { + if nanotokens > account.balance { + return Err(Error::InsufficientFunds); + } + account.balance = account.balance.saturating_sub(nanotokens); + account.tmp_locked = account.tmp_locked.saturating_add(nanotokens); + Ok(()) + } else { + Err(Error::InsufficientFunds) + } + } + + pub fn unlock_nanotockens(&self, account: &str, nanotokens: u64) -> Result<(), Error> { + if let Some(mut account) = self.accounts.get_mut(account) { + if nanotokens > account.tmp_locked { + return Err(Error::ImpossibleError); + } + account.balance = account.balance.saturating_add(nanotokens); + account.tmp_locked = account.tmp_locked.saturating_sub(nanotokens); + Ok(()) + } else { + Err(Error::ImpossibleError) + } + } + pub fn submit_node_resources(&self, res: grpc::NodeResources) { let mut nodes = self.nodes.write().unwrap(); for n in nodes.iter_mut() { @@ -172,6 +311,8 @@ impl BrainData { self.del_daemon_tx(&contract.node_pubkey); } } + + self.add_nano_to_wallet(&contract.admin_pubkey, contract.locked_nano); let mut contracts = self.contracts.write().unwrap(); contracts.retain(|c| c.uuid != delete_vm.uuid); } @@ -196,6 +337,10 @@ impl BrainData { ); } if new_vm_resp.error != "" { + if let Some(mut admin_wallet) = self.accounts.get_mut(&new_vm_req.0.admin_pubkey) { + admin_wallet.balance += new_vm_req.0.locked_nano; + admin_wallet.tmp_locked -= new_vm_req.0.locked_nano; + } return; } @@ -215,13 +360,17 @@ impl BrainData { } } + if let Some(mut admin_wallet) = self.accounts.get_mut(&new_vm_req.0.admin_pubkey) { + admin_wallet.tmp_locked -= new_vm_req.0.locked_nano; + } + let contract = Contract { uuid: new_vm_resp.uuid, exposed_ports: args.exposed_ports.clone(), public_ipv4, public_ipv6, - created_at: Utc::now().to_rfc3339(), - updated_at: String::new(), + created_at: Utc::now(), + updated_at: Utc::now(), hostname: new_vm_req.0.hostname, admin_pubkey: new_vm_req.0.admin_pubkey, node_pubkey: new_vm_req.0.node_pubkey.clone(), @@ -230,6 +379,9 @@ impl BrainData { memory_mb: new_vm_req.0.memory_mb, kernel_sha: new_vm_req.0.kernel_sha, dtrfs_sha: new_vm_req.0.dtrfs_sha, + price_per_unit: new_vm_req.0.price_per_unit, + locked_nano: new_vm_req.0.locked_nano, + collected_at: Utc::now(), }; info!("Created new contract: {contract:?}"); self.contracts.write().unwrap().push(contract); @@ -271,7 +423,7 @@ impl BrainData { ); contract.dtrfs_sha = update_vm_req.0.dtrfs_sha; } - contract.updated_at = Utc::now().to_rfc3339(); + contract.updated_at = Utc::now(); } None => { log::error!("Contract not found for {}.", update_vm_req.0.uuid); @@ -290,6 +442,14 @@ impl BrainData { mut req: grpc::NewVmReq, tx: OneshotSender, ) { + if let Err(e) = self.lock_nanotockens(&req.admin_pubkey, req.locked_nano) { + let _ = tx.send(grpc::NewVmResp { + uuid: String::new(), + error: e.to_string(), + args: None, + }); + return; + } req.uuid = uuid::Uuid::new_v4().to_string(); info!("Inserting new vm request in memory: {req:?}"); self.tmp_newvm_reqs @@ -394,7 +554,10 @@ impl BrainData { nodes.iter().cloned().find(|n| n.owner_key == owner_key) } - pub fn find_nodes_by_filters(&self, filters: &crate::grpc::snp_proto::NodeFilters) -> Vec { + pub fn find_nodes_by_filters( + &self, + filters: &crate::grpc::snp_proto::NodeFilters, + ) -> Vec { let nodes = self.nodes.read().unwrap(); nodes .iter() @@ -433,6 +596,7 @@ impl BrainData { && (filters.city.is_empty() || (n.city == filters.city)) && (filters.region.is_empty() || (n.region == filters.region)) && (filters.ip.is_empty() || (n.ip == filters.ip)) + && (filters.node_pubkey.is_empty() || (n.public_key == filters.node_pubkey)) }) .cloned() } diff --git a/src/grpc.rs b/src/grpc.rs index 127652c..de08ad0 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -5,10 +5,10 @@ pub mod snp_proto { } use crate::data::BrainData; +use log::info; use snp_proto::brain_cli_server::BrainCli; use snp_proto::brain_daemon_server::BrainDaemon; use snp_proto::*; -use log::info; use std::pin::Pin; use std::sync::Arc; use tokio::sync::mpsc; @@ -51,6 +51,7 @@ impl BrainDaemon for BrainDaemonMock { region: req.region, city: req.city, ip: req.main_ip, + price: req.price, ..Default::default() }; self.data.insert_node(node); @@ -120,6 +121,17 @@ impl BrainDaemon for BrainDaemonMock { #[tonic::async_trait] impl BrainCli for BrainCliMock { + async fn get_balance(&self, req: Request) -> Result, Status> { + Ok(Response::new( + self.data.get_balance(&req.into_inner().pubkey).into(), + )) + } + + async fn get_airdrop(&self, req: Request) -> Result, Status> { + self.data.get_airdrop(&req.into_inner().pubkey); + Ok(Response::new(Empty {})) + } + async fn new_vm(&self, req: Request) -> Result, Status> { let req = req.into_inner(); info!("New VM requested via CLI: {req:?}"); diff --git a/src/main.rs b/src/main.rs index 5ea9aff..4bf30f7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,13 @@ async fn main() { .filter_level(log::LevelFilter::Debug) .init(); let data = Arc::new(BrainData::new()); + let data_clone = data.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + data_clone.contracts_cron(); + } + }); let addr = "0.0.0.0:31337".parse().unwrap(); let daemon_server = BrainDaemonServer::new(BrainDaemonMock::new(data.clone()));