brain-mock/src/data.rs

632 lines
22 KiB
Rust

#![allow(dead_code)]
use crate::grpc::snp_proto::{self as grpc};
use chrono::Utc;
use dashmap::DashMap;
use log::{debug, info, warn};
use std::str::FromStr;
use std::sync::RwLock;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Sender as OneshotSender;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("We do not allow locking of more than 100000 tokens.")]
TxTooBig,
#[error("Account has insufficient funds for this operation")]
InsufficientFunds,
#[error("I have no idea how this happened. Please report this bug.")]
ImpossibleError,
}
#[derive(Clone)]
pub struct AccountNanoTokens {
pub balance: u64,
pub tmp_locked: u64,
}
impl From<AccountNanoTokens> for grpc::AccountBalance {
fn from(value: AccountNanoTokens) -> Self {
grpc::AccountBalance {
balance: value.balance,
tmp_locked: value.tmp_locked,
}
}
}
#[derive(Eq, Hash, PartialEq, Clone, Debug, Default)]
pub struct Node {
pub public_key: String,
pub owner_key: String,
pub country: String,
pub region: String,
pub city: String,
pub ip: String,
pub avail_mem_mb: u32,
pub avail_vcpus: u32,
pub avail_storage_gbs: u32,
pub avail_ipv4: u32,
pub avail_ipv6: u32,
pub avail_ports: u32,
pub max_ports_per_vm: u32,
// nanotokens per unit per minute
pub price: u64,
}
impl Into<grpc::NodeListResp> for Node {
fn into(self) -> grpc::NodeListResp {
grpc::NodeListResp {
node_pubkey: self.public_key,
country: self.country,
region: self.region,
city: self.city,
ip: self.ip,
server_rating: 0,
provider_rating: 0,
price: self.price,
}
}
}
#[derive(Clone, Debug)]
pub struct Contract {
pub uuid: String,
pub hostname: String,
pub admin_pubkey: String,
pub node_pubkey: String,
pub exposed_ports: Vec<u32>,
pub public_ipv4: String,
pub public_ipv6: String,
pub disk_size_gb: u32,
pub vcpus: u32,
pub memory_mb: u32,
pub kernel_sha: String,
pub dtrfs_sha: String,
pub created_at: chrono::DateTime<Utc>,
pub updated_at: chrono::DateTime<Utc>,
// price per unit per minute
// recommended value is 20000
pub price_per_unit: u64,
pub locked_nano: u64,
pub collected_at: chrono::DateTime<Utc>,
}
impl Contract {
fn total_units(&self) -> u64 {
// TODO: Optimize this based on price of hardware.
// I tried, but this can be done better.
// Storage cost should also be based on tier
(self.vcpus as u64 * 10)
+ ((self.memory_mb + 256) as u64 * 4 / 100)
+ (self.disk_size_gb as u64 / 10)
+ (!self.public_ipv4.is_empty() as u64 * 10)
}
// Returns price per minute in nanotokens
fn price_per_minute(&self) -> u64 {
self.total_units() * self.price_per_unit
}
}
impl Into<grpc::Contract> for Contract {
fn into(self) -> grpc::Contract {
let nano_per_minute = self.price_per_minute();
grpc::Contract {
uuid: self.uuid,
hostname: self.hostname,
admin_pubkey: self.admin_pubkey,
node_pubkey: self.node_pubkey,
exposed_ports: self.exposed_ports,
public_ipv4: self.public_ipv4,
public_ipv6: self.public_ipv6,
disk_size_gb: self.disk_size_gb,
vcpus: self.vcpus,
memory_mb: self.memory_mb,
kernel_sha: self.kernel_sha,
dtrfs_sha: self.dtrfs_sha,
created_at: self.created_at.to_rfc3339(),
updated_at: self.updated_at.to_rfc3339(),
nano_per_minute,
locked_nano: self.locked_nano,
collected_at: self.collected_at.to_rfc3339(),
}
}
}
#[derive(Default)]
pub struct BrainData {
// amount of nanotokens in each account
accounts: DashMap<String, AccountNanoTokens>,
nodes: RwLock<Vec<Node>>,
contracts: RwLock<Vec<Contract>>,
tmp_newvm_reqs: DashMap<String, (grpc::NewVmReq, OneshotSender<grpc::NewVmResp>)>,
tmp_updatevm_reqs: DashMap<String, (grpc::UpdateVmReq, OneshotSender<grpc::UpdateVmResp>)>,
daemon_tx: DashMap<String, Sender<grpc::BrainMessage>>,
}
#[derive(Debug)]
enum TxType {
CliContract,
DaemonDeleteVm,
DaemonNewVm,
}
impl BrainData {
pub fn new() -> Self {
Self {
accounts: DashMap::new(),
nodes: RwLock::new(Vec::new()),
contracts: RwLock::new(Vec::new()),
tmp_newvm_reqs: DashMap::new(),
tmp_updatevm_reqs: DashMap::new(),
daemon_tx: DashMap::new(),
}
}
pub fn get_balance(&self, account: &str) -> AccountNanoTokens {
if let Some(account) = self.accounts.get(account) {
return account.value().clone();
} else {
let balance = AccountNanoTokens {
balance: 0,
tmp_locked: 0,
};
return balance;
}
}
pub fn get_airdrop(&self, account: &str) {
self.add_nano_to_wallet(account, 1000_000000000);
}
fn add_nano_to_wallet(&self, account: &str, nanotokens: u64) {
log::debug!("Adding {nanotokens} nanotokens to {account}");
self.accounts
.entry(account.to_string())
.and_modify(|tokens| tokens.balance += nanotokens)
.or_insert(AccountNanoTokens {
balance: nanotokens,
tmp_locked: 0,
});
}
pub fn contracts_cron(&self) {
log::debug!("Running contracts cron...");
let mut contracts = self.contracts.write().unwrap();
contracts.retain_mut(|c| {
let owner_key = self
.find_nodes_by_pubkey(&c.node_pubkey)
.unwrap()
.owner_key
.clone();
let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64;
c.collected_at = Utc::now();
log::debug!("{minutes_to_collect}");
let mut nanotokens_to_collect = c.price_per_minute().saturating_mul(minutes_to_collect);
if nanotokens_to_collect > c.locked_nano {
nanotokens_to_collect = c.locked_nano;
}
log::debug!(
"Removing {nanotokens_to_collect} nanotokens from {}",
c.uuid
);
c.locked_nano -= nanotokens_to_collect;
self.add_nano_to_wallet(&owner_key, nanotokens_to_collect);
c.locked_nano > 0
});
}
pub fn insert_node(&self, node: Node) {
info!("Registering node {node:?}");
let mut nodes = self.nodes.write().unwrap();
for n in nodes.iter_mut() {
if n.public_key == node.public_key {
// TODO: figure what to do in this case.
warn!("Node {} already exists. Updating data.", n.public_key);
*n = node;
return;
}
}
nodes.push(node);
}
pub fn lock_nanotockens(&self, account: &str, nanotokens: u64) -> Result<(), Error> {
if nanotokens > 100_000_000_000_000 {
return Err(Error::TxTooBig);
}
if let Some(mut account) = self.accounts.get_mut(account) {
if nanotokens > account.balance {
return Err(Error::InsufficientFunds);
}
account.balance = account.balance.saturating_sub(nanotokens);
account.tmp_locked = account.tmp_locked.saturating_add(nanotokens);
Ok(())
} else {
Err(Error::InsufficientFunds)
}
}
pub fn unlock_nanotockens(&self, account: &str, nanotokens: u64) -> Result<(), Error> {
if let Some(mut account) = self.accounts.get_mut(account) {
if nanotokens > account.tmp_locked {
return Err(Error::ImpossibleError);
}
account.balance = account.balance.saturating_add(nanotokens);
account.tmp_locked = account.tmp_locked.saturating_sub(nanotokens);
Ok(())
} else {
Err(Error::ImpossibleError)
}
}
pub fn submit_node_resources(&self, res: grpc::NodeResources) {
let mut nodes = self.nodes.write().unwrap();
for n in nodes.iter_mut() {
if n.public_key == res.node_pubkey {
debug!(
"Found node {}. Updating resources to {:?}",
n.public_key, res
);
n.avail_ipv4 = res.avail_ipv4;
n.avail_ipv6 = res.avail_ipv6;
n.avail_vcpus = res.avail_vcpus;
n.avail_mem_mb = res.avail_memory_mb;
n.avail_storage_gbs = res.avail_storage_gb;
n.max_ports_per_vm = res.max_ports_per_vm;
n.avail_ports = res.avail_ports;
return;
}
}
debug!(
"Node {} not found when trying to update resources.",
res.node_pubkey
);
debug!("Node list:\n{:?}", nodes);
}
pub fn add_daemon_tx(&self, node_pubkey: &str, tx: Sender<grpc::BrainMessage>) {
self.daemon_tx.insert(node_pubkey.to_string(), tx);
}
pub fn del_daemon_tx(&self, node_pubkey: &str) {
self.daemon_tx.remove(node_pubkey);
}
pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) {
if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) {
info!("Found vm {}. Deleting...", delete_vm.uuid);
if let Some(daemon_tx) = self.daemon_tx.get(&contract.node_pubkey) {
debug!(
"TX for daemon {} found. Informing daemon about deletion of {}.",
contract.node_pubkey, delete_vm.uuid
);
let msg = grpc::BrainMessage {
msg: Some(grpc::brain_message::Msg::DeleteVm(delete_vm.clone())),
};
if let Err(e) = daemon_tx.send(msg).await {
warn!(
"Failed to send deletion request to {} due to error: {e:?}",
contract.node_pubkey
);
info!("Deleting daemon TX for {}", contract.node_pubkey);
self.del_daemon_tx(&contract.node_pubkey);
}
}
self.add_nano_to_wallet(&contract.admin_pubkey, contract.locked_nano);
let mut contracts = self.contracts.write().unwrap();
contracts.retain(|c| c.uuid != delete_vm.uuid);
}
}
pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) {
let new_vm_req = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) {
Some((_, r)) => r,
None => {
log::error!(
"Received confirmation for ghost NewVMReq {}",
new_vm_resp.uuid
);
return;
}
};
if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) {
log::error!(
"CLI RX for {} dropped before receiving confirmation {:?}.",
&new_vm_req.0.admin_pubkey,
new_vm_resp,
);
}
if new_vm_resp.error != "" {
if let Some(mut admin_wallet) = self.accounts.get_mut(&new_vm_req.0.admin_pubkey) {
admin_wallet.balance += new_vm_req.0.locked_nano;
admin_wallet.tmp_locked -= new_vm_req.0.locked_nano;
}
return;
}
let mut public_ipv4 = String::new();
let mut public_ipv6 = String::new();
let args = new_vm_resp.args.as_ref().unwrap();
for ip in args.ips.iter() {
if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) {
if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() {
public_ipv4 = ipv4_addr.to_string();
}
continue;
}
if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) {
public_ipv6 = ipv6_addr.to_string();
}
}
if let Some(mut admin_wallet) = self.accounts.get_mut(&new_vm_req.0.admin_pubkey) {
admin_wallet.tmp_locked -= new_vm_req.0.locked_nano;
}
let contract = Contract {
uuid: new_vm_resp.uuid,
exposed_ports: args.exposed_ports.clone(),
public_ipv4,
public_ipv6,
created_at: Utc::now(),
updated_at: Utc::now(),
hostname: new_vm_req.0.hostname,
admin_pubkey: new_vm_req.0.admin_pubkey,
node_pubkey: new_vm_req.0.node_pubkey.clone(),
disk_size_gb: new_vm_req.0.disk_size_gb,
vcpus: new_vm_req.0.vcpus,
memory_mb: new_vm_req.0.memory_mb,
kernel_sha: new_vm_req.0.kernel_sha,
dtrfs_sha: new_vm_req.0.dtrfs_sha,
price_per_unit: new_vm_req.0.price_per_unit,
locked_nano: new_vm_req.0.locked_nano,
collected_at: Utc::now(),
};
info!("Created new contract: {contract:?}");
self.contracts.write().unwrap().push(contract);
}
pub async fn submit_updatevm_resp(&self, mut update_vm_resp: grpc::UpdateVmResp) {
let update_vm_req = match self.tmp_updatevm_reqs.remove(&update_vm_resp.uuid) {
Some((_, r)) => r,
None => {
log::error!(
"Received confirmation for ghost UpdateVMRequest {}",
update_vm_resp.uuid
);
update_vm_resp.error =
"Received confirmation for ghost UpdateVMRequest.".to_string();
return;
}
};
if update_vm_resp.error != "" {
return;
}
let mut contracts = self.contracts.write().unwrap();
match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) {
Some(contract) => {
contract.disk_size_gb = update_vm_req.0.disk_size_gb;
contract.vcpus = update_vm_req.0.vcpus;
contract.memory_mb = update_vm_req.0.memory_mb;
if !update_vm_req.0.kernel_sha.is_empty() {
debug!(
"Updating kernel sha for {} to {}",
contract.uuid, update_vm_req.0.kernel_sha
);
contract.kernel_sha = update_vm_req.0.kernel_sha;
}
if !update_vm_req.0.dtrfs_sha.is_empty() {
debug!(
"Updating dtrfs sha for {} to {}",
contract.uuid, update_vm_req.0.dtrfs_sha
);
contract.dtrfs_sha = update_vm_req.0.dtrfs_sha;
}
contract.updated_at = Utc::now();
}
None => {
log::error!("Contract not found for {}.", update_vm_req.0.uuid);
update_vm_resp.error = "Contract not found.".to_string();
}
}
if let Err(e) = update_vm_req.1.send(update_vm_resp.clone()) {
log::warn!(
"CLI RX dropped before receiving UpdateVMResp {update_vm_resp:?}. Error: {e:?}"
);
}
}
pub async fn submit_newvm_req(
&self,
mut req: grpc::NewVmReq,
tx: OneshotSender<grpc::NewVmResp>,
) {
if let Err(e) = self.lock_nanotockens(&req.admin_pubkey, req.locked_nano) {
let _ = tx.send(grpc::NewVmResp {
uuid: String::new(),
error: e.to_string(),
args: None,
});
return;
}
req.uuid = uuid::Uuid::new_v4().to_string();
info!("Inserting new vm request in memory: {req:?}");
self.tmp_newvm_reqs
.insert(req.uuid.clone(), (req.clone(), tx));
if let Some(daemon_tx) = self.daemon_tx.get(&req.node_pubkey) {
debug!(
"Found daemon TX for {}. Sending newVMReq {}",
req.node_pubkey, req.uuid
);
let msg = grpc::BrainMessage {
msg: Some(grpc::brain_message::Msg::NewVmReq(req.clone())),
};
if let Err(e) = daemon_tx.send(msg).await {
warn!(
"Failed to send new VM request to {} due to error: {e:?}",
req.node_pubkey
);
info!("Deleting daemon TX for {}", req.node_pubkey);
self.del_daemon_tx(&req.node_pubkey);
self.submit_newvm_resp(grpc::NewVmResp {
error: "Daemon is offline.".to_string(),
uuid: req.uuid,
args: None,
})
.await;
}
}
}
pub async fn submit_updatevm_req(
&self,
req: grpc::UpdateVmReq,
tx: OneshotSender<grpc::UpdateVmResp>,
) {
let uuid = req.uuid.clone();
info!("Inserting new vm update request in memory: {req:?}");
let node_pubkey = match self.find_contract_by_uuid(&req.uuid) {
Some(contract) => contract.node_pubkey,
None => {
log::warn!(
"Received UpdateVMReq for a contract that does not exist: {}",
req.uuid
);
let _ = tx.send(grpc::UpdateVmResp {
uuid,
error: "Contract does not exist.".to_string(),
args: None,
});
return;
}
};
self.tmp_updatevm_reqs
.insert(req.uuid.clone(), (req.clone(), tx));
if let Some(server_tx) = self.daemon_tx.get(&node_pubkey) {
debug!(
"Found daemon TX for {}. Sending updateVMReq {}",
node_pubkey, req.uuid
);
let msg = grpc::BrainMessage {
msg: Some(grpc::brain_message::Msg::UpdateVmReq(req.clone())),
};
match server_tx.send(msg).await {
Ok(_) => {
debug!("Successfully sent updateVMReq to {}", node_pubkey);
return;
}
Err(e) => {
warn!("Failed to send update VM request to {node_pubkey} due to error {e}");
info!("Deleting daemon TX for {}", node_pubkey);
self.del_daemon_tx(&node_pubkey);
self.submit_updatevm_resp(grpc::UpdateVmResp {
uuid,
error: "Daemon is offline.".to_string(),
args: None,
})
.await;
}
}
} else {
warn!("No daemon TX found for {}", node_pubkey);
self.submit_updatevm_resp(grpc::UpdateVmResp {
uuid,
error: "Daemon is offline.".to_string(),
args: None,
})
.await;
}
}
pub fn insert_contract(&self, contract: Contract) {
let mut contracts = self.contracts.write().unwrap();
contracts.push(contract);
}
pub fn find_nodes_by_pubkey(&self, public_key: &str) -> Option<Node> {
let nodes = self.nodes.read().unwrap();
nodes.iter().cloned().find(|n| n.public_key == public_key)
}
pub fn find_ns_by_owner_key(&self, owner_key: &str) -> Option<Node> {
let nodes = self.nodes.read().unwrap();
nodes.iter().cloned().find(|n| n.owner_key == owner_key)
}
pub fn find_nodes_by_filters(
&self,
filters: &crate::grpc::snp_proto::NodeFilters,
) -> Vec<Node> {
let nodes = self.nodes.read().unwrap();
nodes
.iter()
.filter(|n| {
n.avail_ports >= filters.free_ports
&& (!filters.offers_ipv4 || n.avail_ipv4 > 0)
&& (!filters.offers_ipv6 || n.avail_ipv6 > 0)
&& n.avail_vcpus >= filters.vcpus
&& n.avail_mem_mb >= filters.memory_mb
&& n.avail_storage_gbs >= filters.storage_gb
&& (filters.country.is_empty() || (n.country == filters.country))
&& (filters.city.is_empty() || (n.city == filters.city))
&& (filters.region.is_empty() || (n.region == filters.region))
&& (filters.ip.is_empty() || (n.ip == filters.ip))
})
.cloned()
.collect()
}
// TODO: sort by rating
pub fn get_one_node_by_filters(
&self,
filters: &crate::grpc::snp_proto::NodeFilters,
) -> Option<Node> {
let nodes = self.nodes.read().unwrap();
nodes
.iter()
.find(|n| {
n.avail_ports >= filters.free_ports
&& (!filters.offers_ipv4 || n.avail_ipv4 > 0)
&& (!filters.offers_ipv6 || n.avail_ipv6 > 0)
&& n.avail_vcpus >= filters.vcpus
&& n.avail_mem_mb >= filters.memory_mb
&& n.avail_storage_gbs >= filters.storage_gb
&& (filters.country.is_empty() || (n.country == filters.country))
&& (filters.city.is_empty() || (n.city == filters.city))
&& (filters.region.is_empty() || (n.region == filters.region))
&& (filters.ip.is_empty() || (n.ip == filters.ip))
&& (filters.node_pubkey.is_empty() || (n.public_key == filters.node_pubkey))
})
.cloned()
}
pub fn find_contract_by_uuid(&self, uuid: &str) -> Option<Contract> {
let contracts = self.contracts.read().unwrap();
contracts.iter().cloned().find(|c| c.uuid == uuid)
}
pub fn find_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec<Contract> {
debug!("Searching contracts for admin pubkey {admin_pubkey}");
let contracts: Vec<Contract> = self
.contracts
.read()
.unwrap()
.iter()
.filter(|c| c.admin_pubkey == admin_pubkey)
.cloned()
.collect();
debug!("Found {} contracts or {admin_pubkey}.", contracts.len());
contracts
}
pub fn find_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec<Contract> {
let contracts = self.contracts.read().unwrap();
contracts
.iter()
.filter(|c| c.node_pubkey == node_pubkey)
.cloned()
.collect()
}
}