945 lines
33 KiB
Rust
945 lines
33 KiB
Rust
use crate::grpc::snp_proto::{self as grpc};
|
|
use chrono::Utc;
|
|
use dashmap::DashMap;
|
|
use log::{debug, info, warn};
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::str::FromStr;
|
|
use std::sync::RwLock;
|
|
use tokio::sync::mpsc::Sender;
|
|
use tokio::sync::oneshot::Sender as OneshotSender;
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub enum Error {
|
|
#[error("We do not allow locking of more than 100000 LP.")]
|
|
TxTooBig,
|
|
#[error("Escrow must be at least 5000 LP.")]
|
|
MinimalEscrow,
|
|
#[error("Account has insufficient funds for this operation")]
|
|
InsufficientFunds,
|
|
#[error("Could not find contract {0}")]
|
|
VmContractNotFound(String),
|
|
#[error("This error should never happen.")]
|
|
ImpossibleError,
|
|
#[error("You don't have the required permissions for this operation.")]
|
|
AccessDenied,
|
|
}
|
|
|
|
#[derive(Clone, Default)]
|
|
pub struct AccountData {
|
|
pub balance: u64,
|
|
pub tmp_locked: u64,
|
|
// holds reasons why VMs of this account got kicked
|
|
pub kicked_for: Vec<String>,
|
|
pub last_kick: chrono::DateTime<Utc>,
|
|
// holds accounts that banned this account
|
|
pub banned_by: HashSet<String>,
|
|
}
|
|
|
|
#[derive(Clone, Default)]
|
|
pub struct OperatorData {
|
|
pub escrow: u64,
|
|
pub email: String,
|
|
pub banned_users: HashSet<String>,
|
|
pub vm_nodes: HashSet<String>,
|
|
}
|
|
|
|
impl From<AccountData> for grpc::AccountBalance {
|
|
fn from(value: AccountData) -> Self {
|
|
grpc::AccountBalance {
|
|
balance: value.balance,
|
|
tmp_locked: value.tmp_locked,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Eq, PartialEq, Clone, Debug, Default)]
|
|
pub struct VmNode {
|
|
pub public_key: String,
|
|
pub operator_wallet: String,
|
|
pub country: String,
|
|
pub region: String,
|
|
pub city: String,
|
|
pub ip: String,
|
|
pub avail_mem_mb: u32,
|
|
pub avail_vcpus: u32,
|
|
pub avail_storage_gbs: u32,
|
|
pub avail_ipv4: u32,
|
|
pub avail_ipv6: u32,
|
|
pub avail_ports: u32,
|
|
pub max_ports_per_vm: u32,
|
|
// nanoLP per unit per minute
|
|
pub price: u64,
|
|
// 1st String is user wallet and 2nd String is report message
|
|
pub reports: HashMap<String, String>,
|
|
}
|
|
|
|
impl Into<grpc::VmNodeListResp> for VmNode {
|
|
fn into(self) -> grpc::VmNodeListResp {
|
|
grpc::VmNodeListResp {
|
|
operator: self.operator_wallet,
|
|
node_pubkey: self.public_key,
|
|
country: self.country,
|
|
region: self.region,
|
|
city: self.city,
|
|
ip: self.ip,
|
|
price: self.price,
|
|
reports: self.reports.into_values().collect(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct VmContract {
|
|
pub uuid: String,
|
|
pub hostname: String,
|
|
pub admin_pubkey: String,
|
|
pub node_pubkey: String,
|
|
pub exposed_ports: Vec<u32>,
|
|
pub public_ipv4: String,
|
|
pub public_ipv6: String,
|
|
pub disk_size_gb: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mb: u32,
|
|
pub kernel_sha: String,
|
|
pub dtrfs_sha: String,
|
|
pub created_at: chrono::DateTime<Utc>,
|
|
pub updated_at: chrono::DateTime<Utc>,
|
|
// recommended value is 20000
|
|
/// price per unit per minute
|
|
pub price_per_unit: u64,
|
|
pub locked_nano: u64,
|
|
pub collected_at: chrono::DateTime<Utc>,
|
|
}
|
|
|
|
impl VmContract {
|
|
/// total hardware units of this VM
|
|
fn total_units(&self) -> u64 {
|
|
// TODO: Optimize this based on price of hardware.
|
|
// I tried, but this can be done better.
|
|
// Storage cost should also be based on tier
|
|
(self.vcpus as u64 * 10)
|
|
+ ((self.memory_mb + 256) as u64 / 200)
|
|
+ (self.disk_size_gb as u64 / 10)
|
|
+ (!self.public_ipv4.is_empty() as u64 * 10)
|
|
}
|
|
|
|
/// Returns price per minute in nanoLP
|
|
fn price_per_minute(&self) -> u64 {
|
|
self.total_units() * self.price_per_unit
|
|
}
|
|
}
|
|
|
|
impl Into<grpc::VmContract> for VmContract {
|
|
fn into(self) -> grpc::VmContract {
|
|
let nano_per_minute = self.price_per_minute();
|
|
grpc::VmContract {
|
|
uuid: self.uuid,
|
|
hostname: self.hostname,
|
|
admin_pubkey: self.admin_pubkey,
|
|
node_pubkey: self.node_pubkey,
|
|
exposed_ports: self.exposed_ports,
|
|
public_ipv4: self.public_ipv4,
|
|
public_ipv6: self.public_ipv6,
|
|
disk_size_gb: self.disk_size_gb,
|
|
vcpus: self.vcpus,
|
|
memory_mb: self.memory_mb,
|
|
kernel_sha: self.kernel_sha,
|
|
dtrfs_sha: self.dtrfs_sha,
|
|
created_at: self.created_at.to_rfc3339(),
|
|
updated_at: self.updated_at.to_rfc3339(),
|
|
nano_per_minute,
|
|
locked_nano: self.locked_nano,
|
|
collected_at: self.collected_at.to_rfc3339(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Default)]
|
|
pub struct BrainData {
|
|
// amount of nanoLP in each account
|
|
accounts: DashMap<String, AccountData>,
|
|
operators: DashMap<String, OperatorData>,
|
|
vm_nodes: RwLock<Vec<VmNode>>,
|
|
vm_contracts: RwLock<Vec<VmContract>>,
|
|
tmp_newvm_reqs: DashMap<String, (grpc::NewVmReq, OneshotSender<grpc::NewVmResp>)>,
|
|
tmp_updatevm_reqs: DashMap<String, (grpc::UpdateVmReq, OneshotSender<grpc::UpdateVmResp>)>,
|
|
daemon_tx: DashMap<String, Sender<grpc::BrainVmMessage>>,
|
|
}
|
|
|
|
impl BrainData {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
accounts: DashMap::new(),
|
|
operators: DashMap::new(),
|
|
vm_nodes: RwLock::new(Vec::new()),
|
|
vm_contracts: RwLock::new(Vec::new()),
|
|
tmp_newvm_reqs: DashMap::new(),
|
|
tmp_updatevm_reqs: DashMap::new(),
|
|
daemon_tx: DashMap::new(),
|
|
}
|
|
}
|
|
|
|
pub fn get_balance(&self, account: &str) -> AccountData {
|
|
if let Some(account) = self.accounts.get(account) {
|
|
return account.value().clone();
|
|
} else {
|
|
let balance = AccountData {
|
|
balance: 0,
|
|
tmp_locked: 0,
|
|
kicked_for: Vec::new(),
|
|
banned_by: HashSet::new(),
|
|
last_kick: chrono::Utc::now(),
|
|
};
|
|
return balance;
|
|
}
|
|
}
|
|
|
|
pub fn give_airdrop(&self, account: &str, tokens: u64) {
|
|
warn!("Airdropping {tokens} to {account}.");
|
|
self.add_nano_to_wallet(account, tokens.saturating_mul(1_000_000_000));
|
|
}
|
|
|
|
pub fn slash_account(&self, account: &str, tokens: u64) {
|
|
warn!("Slashing {tokens} from {account}.");
|
|
self.rm_nano_from_wallet(account, tokens.saturating_mul(1_000_000_000));
|
|
}
|
|
|
|
fn add_nano_to_wallet(&self, account: &str, nano_lp: u64) {
|
|
log::debug!("Adding {nano_lp} nanoLP to {account}");
|
|
self.accounts
|
|
.entry(account.to_string())
|
|
.and_modify(|d| d.balance += nano_lp)
|
|
.or_insert(AccountData {
|
|
balance: nano_lp,
|
|
..Default::default()
|
|
});
|
|
}
|
|
|
|
fn rm_nano_from_wallet(&self, account: &str, nano_lp: u64) {
|
|
log::debug!("Adding {nano_lp} nanoLP to {account}");
|
|
self.accounts.entry(account.to_string()).and_modify(|d| {
|
|
let _ = d.balance.saturating_sub(nano_lp);
|
|
});
|
|
}
|
|
|
|
pub async fn vm_contracts_cron(&self) {
|
|
let mut deleted_contracts: Vec<(String, String)> = Vec::new();
|
|
log::debug!("Running contracts cron...");
|
|
{
|
|
let mut contracts = self.vm_contracts.write().unwrap();
|
|
contracts.retain_mut(|c| {
|
|
let operator_wallet = self
|
|
.find_node_by_pubkey(&c.node_pubkey)
|
|
.unwrap()
|
|
.operator_wallet
|
|
.clone();
|
|
let minutes_to_collect = (Utc::now() - c.collected_at).num_minutes() as u64;
|
|
c.collected_at = Utc::now();
|
|
let mut nanolp_to_collect = c.price_per_minute().saturating_mul(minutes_to_collect);
|
|
if nanolp_to_collect > c.locked_nano {
|
|
nanolp_to_collect = c.locked_nano;
|
|
}
|
|
log::debug!("Removing {nanolp_to_collect} nanoLP from {}", c.uuid);
|
|
c.locked_nano -= nanolp_to_collect;
|
|
self.add_nano_to_wallet(&operator_wallet, nanolp_to_collect);
|
|
if c.locked_nano == 0 {
|
|
deleted_contracts.push((c.uuid.clone(), c.node_pubkey.clone()));
|
|
}
|
|
c.locked_nano > 0
|
|
});
|
|
}
|
|
// inform daemons of the deletion of the contracts
|
|
for (uuid, node_pubkey) in deleted_contracts.iter() {
|
|
if let Some(daemon_tx) = self.daemon_tx.get(&node_pubkey.clone()) {
|
|
let msg = grpc::BrainVmMessage {
|
|
msg: Some(grpc::brain_vm_message::Msg::DeleteVm(grpc::DeleteVmReq {
|
|
uuid: uuid.to_string(),
|
|
admin_pubkey: String::new(),
|
|
})),
|
|
};
|
|
let daemon_tx = daemon_tx.clone();
|
|
let _ = daemon_tx.send(msg).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn register_node(&self, node: VmNode) {
|
|
info!("Registering node {node:?}");
|
|
self.add_vmnode_to_operator(&node.operator_wallet, &node.public_key);
|
|
let mut nodes = self.vm_nodes.write().unwrap();
|
|
for n in nodes.iter_mut() {
|
|
if n.public_key == node.public_key {
|
|
// TODO: figure what to do in this case.
|
|
warn!("VM Node {} already exists. Updating data.", n.public_key);
|
|
*n = node;
|
|
return;
|
|
}
|
|
}
|
|
nodes.push(node);
|
|
}
|
|
|
|
// todo: this should also support Apps
|
|
/// Receives: operator, contract uuid, reason of kick
|
|
pub async fn kick_contract(
|
|
&self,
|
|
operator: &str,
|
|
uuid: &str,
|
|
reason: &str,
|
|
) -> Result<u64, Error> {
|
|
log::debug!("Operator {operator} requested a kick of {uuid} for reason: {reason}");
|
|
let contract = self.find_contract_by_uuid(uuid)?;
|
|
let mut operator_data = self
|
|
.operators
|
|
.get_mut(operator)
|
|
.ok_or(Error::AccessDenied)?;
|
|
if !operator_data.vm_nodes.contains(&contract.node_pubkey) {
|
|
return Err(Error::AccessDenied);
|
|
}
|
|
|
|
let mut minutes_to_refund = chrono::Utc::now()
|
|
.signed_duration_since(contract.updated_at)
|
|
.num_minutes()
|
|
.abs() as u64;
|
|
// cap refund at 1 week
|
|
if minutes_to_refund > 10080 {
|
|
minutes_to_refund = 10080;
|
|
}
|
|
|
|
let mut refund_ammount = minutes_to_refund * contract.price_per_minute();
|
|
let mut admin_account = self
|
|
.accounts
|
|
.get_mut(&contract.admin_pubkey)
|
|
.ok_or(Error::ImpossibleError)?;
|
|
|
|
// check if he got kicked within the last day
|
|
if !chrono::Utc::now()
|
|
.signed_duration_since(admin_account.last_kick)
|
|
.gt(&chrono::Duration::days(1))
|
|
{
|
|
refund_ammount = 0;
|
|
}
|
|
|
|
if operator_data.escrow < refund_ammount {
|
|
refund_ammount = operator_data.escrow;
|
|
}
|
|
|
|
log::debug!(
|
|
"Removing {refund_ammount} escrow from {} and giving it to {}",
|
|
operator_data.key(),
|
|
admin_account.key()
|
|
);
|
|
admin_account.balance += refund_ammount;
|
|
admin_account.kicked_for.push(reason.to_string());
|
|
operator_data.escrow -= refund_ammount;
|
|
|
|
let admin_pubkey = contract.admin_pubkey.clone();
|
|
drop(admin_account);
|
|
drop(contract);
|
|
|
|
self.delete_vm(grpc::DeleteVmReq {
|
|
uuid: uuid.to_string(),
|
|
admin_pubkey,
|
|
})
|
|
.await?;
|
|
|
|
Ok(refund_ammount)
|
|
}
|
|
|
|
pub fn ban_user(&self, operator: &str, user: &str) {
|
|
self.accounts
|
|
.entry(user.to_string())
|
|
.and_modify(|a| {
|
|
a.banned_by.insert(operator.to_string());
|
|
})
|
|
.or_insert(AccountData {
|
|
banned_by: HashSet::from([operator.to_string()]),
|
|
..Default::default()
|
|
});
|
|
self.operators
|
|
.entry(operator.to_string())
|
|
.and_modify(|o| {
|
|
o.banned_users.insert(user.to_string());
|
|
})
|
|
.or_insert(OperatorData {
|
|
banned_users: HashSet::from([user.to_string()]),
|
|
..Default::default()
|
|
});
|
|
}
|
|
|
|
pub fn report_node(&self, admin_pubkey: String, node: &str, report: String) {
|
|
let mut nodes = self.vm_nodes.write().unwrap();
|
|
if let Some(node) = nodes.iter_mut().find(|n| n.public_key == node) {
|
|
node.reports.insert(admin_pubkey, report);
|
|
}
|
|
}
|
|
|
|
pub fn lock_nanotockens(&self, account: &str, nano_lp: u64) -> Result<(), Error> {
|
|
if nano_lp > 100_000_000_000_000 {
|
|
return Err(Error::TxTooBig);
|
|
}
|
|
if let Some(mut account) = self.accounts.get_mut(account) {
|
|
if nano_lp > account.balance {
|
|
return Err(Error::InsufficientFunds);
|
|
}
|
|
account.balance = account.balance.saturating_sub(nano_lp);
|
|
account.tmp_locked = account.tmp_locked.saturating_add(nano_lp);
|
|
Ok(())
|
|
} else {
|
|
Err(Error::InsufficientFunds)
|
|
}
|
|
}
|
|
|
|
pub fn extend_vm_contract_time(
|
|
&self,
|
|
uuid: &str,
|
|
wallet: &str,
|
|
nano_lp: u64,
|
|
) -> Result<(), Error> {
|
|
if nano_lp > 100_000_000_000_000 {
|
|
return Err(Error::TxTooBig);
|
|
}
|
|
let mut account = match self.accounts.get_mut(wallet) {
|
|
Some(account) => account,
|
|
None => return Err(Error::InsufficientFunds),
|
|
};
|
|
match self
|
|
.vm_contracts
|
|
.write()
|
|
.unwrap()
|
|
.iter_mut()
|
|
.find(|c| c.uuid == uuid)
|
|
{
|
|
Some(contract) => {
|
|
if contract.admin_pubkey != wallet {
|
|
return Err(Error::VmContractNotFound(uuid.to_string()));
|
|
}
|
|
if account.balance + contract.locked_nano < nano_lp {
|
|
return Err(Error::InsufficientFunds);
|
|
}
|
|
account.balance = account.balance + contract.locked_nano - nano_lp;
|
|
contract.locked_nano = nano_lp;
|
|
Ok(())
|
|
}
|
|
None => Err(Error::VmContractNotFound(uuid.to_string())),
|
|
}
|
|
}
|
|
|
|
pub fn submit_node_resources(&self, res: grpc::VmNodeResources) {
|
|
let mut nodes = self.vm_nodes.write().unwrap();
|
|
for n in nodes.iter_mut() {
|
|
if n.public_key == res.node_pubkey {
|
|
debug!(
|
|
"Found node {}. Updating resources to {:?}",
|
|
n.public_key, res
|
|
);
|
|
n.avail_ipv4 = res.avail_ipv4;
|
|
n.avail_ipv6 = res.avail_ipv6;
|
|
n.avail_vcpus = res.avail_vcpus;
|
|
n.avail_mem_mb = res.avail_memory_mb;
|
|
n.avail_storage_gbs = res.avail_storage_gb;
|
|
n.max_ports_per_vm = res.max_ports_per_vm;
|
|
n.avail_ports = res.avail_ports;
|
|
return;
|
|
}
|
|
}
|
|
debug!(
|
|
"VM Node {} not found when trying to update resources.",
|
|
res.node_pubkey
|
|
);
|
|
debug!("VM Node list:\n{:?}", nodes);
|
|
}
|
|
|
|
pub fn add_daemon_tx(&self, node_pubkey: &str, tx: Sender<grpc::BrainVmMessage>) {
|
|
self.daemon_tx.insert(node_pubkey.to_string(), tx);
|
|
}
|
|
|
|
pub fn del_daemon_tx(&self, node_pubkey: &str) {
|
|
self.daemon_tx.remove(node_pubkey);
|
|
}
|
|
|
|
pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) -> Result<(), Error> {
|
|
log::debug!("Starting deletion of VM {}", delete_vm.uuid);
|
|
let contract = self.find_contract_by_uuid(&delete_vm.uuid)?;
|
|
if contract.admin_pubkey != delete_vm.admin_pubkey {
|
|
return Err(Error::AccessDenied);
|
|
}
|
|
info!("Found vm {}. Deleting...", delete_vm.uuid);
|
|
if let Some(daemon_tx) = self.daemon_tx.get(&contract.node_pubkey) {
|
|
debug!(
|
|
"TX for daemon {} found. Informing daemon about deletion of {}.",
|
|
contract.node_pubkey, delete_vm.uuid
|
|
);
|
|
let msg = grpc::BrainVmMessage {
|
|
msg: Some(grpc::brain_vm_message::Msg::DeleteVm(delete_vm.clone())),
|
|
};
|
|
if let Err(e) = daemon_tx.send(msg).await {
|
|
warn!(
|
|
"Failed to send deletion request to {} due to error: {e:?}",
|
|
contract.node_pubkey
|
|
);
|
|
info!("Deleting daemon TX for {}", contract.node_pubkey);
|
|
self.del_daemon_tx(&contract.node_pubkey);
|
|
}
|
|
}
|
|
|
|
self.add_nano_to_wallet(&contract.admin_pubkey, contract.locked_nano);
|
|
let mut contracts = self.vm_contracts.write().unwrap();
|
|
contracts.retain(|c| c.uuid != delete_vm.uuid);
|
|
Ok(())
|
|
}
|
|
|
|
// also unlocks nanotokens in case VM creation failed
|
|
pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) {
|
|
let new_vm_req = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) {
|
|
Some((_, r)) => r,
|
|
None => {
|
|
log::error!(
|
|
"Received confirmation for ghost NewVMReq {}",
|
|
new_vm_resp.uuid
|
|
);
|
|
return;
|
|
}
|
|
};
|
|
if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) {
|
|
log::error!(
|
|
"CLI RX for {} dropped before receiving confirmation {:?}.",
|
|
&new_vm_req.0.admin_pubkey,
|
|
new_vm_resp,
|
|
);
|
|
}
|
|
if new_vm_resp.error != "" {
|
|
if let Some(mut admin_wallet) = self.accounts.get_mut(&new_vm_req.0.admin_pubkey) {
|
|
admin_wallet.balance += new_vm_req.0.locked_nano;
|
|
admin_wallet.tmp_locked -= new_vm_req.0.locked_nano;
|
|
}
|
|
return;
|
|
}
|
|
|
|
let mut public_ipv4 = String::new();
|
|
let mut public_ipv6 = String::new();
|
|
|
|
let args = new_vm_resp.args.as_ref().unwrap();
|
|
for ip in args.ips.iter() {
|
|
if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) {
|
|
if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() {
|
|
public_ipv4 = ipv4_addr.to_string();
|
|
}
|
|
continue;
|
|
}
|
|
if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) {
|
|
public_ipv6 = ipv6_addr.to_string();
|
|
}
|
|
}
|
|
|
|
if let Some(mut admin_wallet) = self.accounts.get_mut(&new_vm_req.0.admin_pubkey) {
|
|
admin_wallet.tmp_locked -= new_vm_req.0.locked_nano;
|
|
}
|
|
|
|
let contract = VmContract {
|
|
uuid: new_vm_resp.uuid,
|
|
exposed_ports: args.exposed_ports.clone(),
|
|
public_ipv4,
|
|
public_ipv6,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
hostname: new_vm_req.0.hostname,
|
|
admin_pubkey: new_vm_req.0.admin_pubkey,
|
|
node_pubkey: new_vm_req.0.node_pubkey.clone(),
|
|
disk_size_gb: new_vm_req.0.disk_size_gb,
|
|
vcpus: new_vm_req.0.vcpus,
|
|
memory_mb: new_vm_req.0.memory_mb,
|
|
kernel_sha: new_vm_req.0.kernel_sha,
|
|
dtrfs_sha: new_vm_req.0.dtrfs_sha,
|
|
price_per_unit: new_vm_req.0.price_per_unit,
|
|
locked_nano: new_vm_req.0.locked_nano,
|
|
collected_at: Utc::now(),
|
|
};
|
|
info!("Created new contract: {contract:?}");
|
|
self.vm_contracts.write().unwrap().push(contract);
|
|
}
|
|
|
|
pub async fn submit_updatevm_resp(&self, mut update_vm_resp: grpc::UpdateVmResp) {
|
|
let update_vm_req = match self.tmp_updatevm_reqs.remove(&update_vm_resp.uuid) {
|
|
Some((_, r)) => r,
|
|
None => {
|
|
log::error!(
|
|
"Received confirmation for ghost UpdateVMRequest {}",
|
|
update_vm_resp.uuid
|
|
);
|
|
update_vm_resp.error =
|
|
"Received confirmation for ghost UpdateVMRequest.".to_string();
|
|
return;
|
|
}
|
|
};
|
|
if update_vm_resp.error != "" {
|
|
return;
|
|
}
|
|
let mut contracts = self.vm_contracts.write().unwrap();
|
|
match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) {
|
|
Some(contract) => {
|
|
if update_vm_req.0.vcpus != 0 {
|
|
contract.vcpus = update_vm_req.0.vcpus;
|
|
}
|
|
if update_vm_req.0.memory_mb != 0 {
|
|
contract.memory_mb = update_vm_req.0.memory_mb;
|
|
}
|
|
if update_vm_req.0.disk_size_gb != 0 {
|
|
contract.disk_size_gb = update_vm_req.0.disk_size_gb;
|
|
}
|
|
if !update_vm_req.0.kernel_sha.is_empty() {
|
|
debug!(
|
|
"Updating kernel sha for {} to {}",
|
|
contract.uuid, update_vm_req.0.kernel_sha
|
|
);
|
|
contract.kernel_sha = update_vm_req.0.kernel_sha;
|
|
}
|
|
if !update_vm_req.0.dtrfs_sha.is_empty() {
|
|
debug!(
|
|
"Updating dtrfs sha for {} to {}",
|
|
contract.uuid, update_vm_req.0.dtrfs_sha
|
|
);
|
|
contract.dtrfs_sha = update_vm_req.0.dtrfs_sha;
|
|
}
|
|
contract.updated_at = Utc::now();
|
|
}
|
|
None => {
|
|
log::error!("VM Contract not found for {}.", update_vm_req.0.uuid);
|
|
update_vm_resp.error = "VM Contract not found.".to_string();
|
|
}
|
|
}
|
|
if let Err(e) = update_vm_req.1.send(update_vm_resp.clone()) {
|
|
log::warn!(
|
|
"CLI RX dropped before receiving UpdateVMResp {update_vm_resp:?}. Error: {e:?}"
|
|
);
|
|
}
|
|
}
|
|
|
|
pub async fn submit_newvm_req(
|
|
&self,
|
|
mut req: grpc::NewVmReq,
|
|
tx: OneshotSender<grpc::NewVmResp>,
|
|
) {
|
|
if let Err(e) = self.lock_nanotockens(&req.admin_pubkey, req.locked_nano) {
|
|
let _ = tx.send(grpc::NewVmResp {
|
|
uuid: String::new(),
|
|
error: e.to_string(),
|
|
args: None,
|
|
});
|
|
return;
|
|
}
|
|
req.uuid = uuid::Uuid::new_v4().to_string();
|
|
info!("Inserting new vm request in memory: {req:?}");
|
|
self.tmp_newvm_reqs
|
|
.insert(req.uuid.clone(), (req.clone(), tx));
|
|
if let Some(daemon_tx) = self.daemon_tx.get(&req.node_pubkey) {
|
|
debug!(
|
|
"Found daemon TX for {}. Sending newVMReq {}",
|
|
req.node_pubkey, req.uuid
|
|
);
|
|
let msg = grpc::BrainVmMessage {
|
|
msg: Some(grpc::brain_vm_message::Msg::NewVmReq(req.clone())),
|
|
};
|
|
if let Err(e) = daemon_tx.send(msg).await {
|
|
warn!(
|
|
"Failed to send new VM request to {} due to error: {e:?}",
|
|
req.node_pubkey
|
|
);
|
|
info!("Deleting daemon TX for {}", req.node_pubkey);
|
|
self.del_daemon_tx(&req.node_pubkey);
|
|
self.submit_newvm_resp(grpc::NewVmResp {
|
|
error: "Daemon is offline.".to_string(),
|
|
uuid: req.uuid,
|
|
args: None,
|
|
})
|
|
.await;
|
|
}
|
|
} else {
|
|
self.submit_newvm_resp(grpc::NewVmResp {
|
|
error: "Daemon is offline.".to_string(),
|
|
uuid: req.uuid,
|
|
args: None,
|
|
})
|
|
.await;
|
|
}
|
|
}
|
|
|
|
pub async fn submit_updatevm_req(
|
|
&self,
|
|
req: grpc::UpdateVmReq,
|
|
tx: OneshotSender<grpc::UpdateVmResp>,
|
|
) {
|
|
let uuid = req.uuid.clone();
|
|
info!("Inserting new vm update request in memory: {req:?}");
|
|
let node_pubkey = match self.find_contract_by_uuid(&req.uuid) {
|
|
Ok(contract) => {
|
|
if contract.admin_pubkey != req.admin_pubkey {
|
|
let _ = tx.send(grpc::UpdateVmResp {
|
|
uuid,
|
|
error: "VM Contract does not exist.".to_string(),
|
|
args: None,
|
|
});
|
|
return;
|
|
}
|
|
contract.node_pubkey
|
|
}
|
|
Err(_) => {
|
|
log::warn!(
|
|
"Received UpdateVMReq for a contract that does not exist: {}",
|
|
req.uuid
|
|
);
|
|
let _ = tx.send(grpc::UpdateVmResp {
|
|
uuid,
|
|
error: "VM Contract does not exist.".to_string(),
|
|
args: None,
|
|
});
|
|
return;
|
|
}
|
|
};
|
|
self.tmp_updatevm_reqs
|
|
.insert(req.uuid.clone(), (req.clone(), tx));
|
|
if let Some(server_tx) = self.daemon_tx.get(&node_pubkey) {
|
|
debug!(
|
|
"Found daemon TX for {}. Sending updateVMReq {}",
|
|
node_pubkey, req.uuid
|
|
);
|
|
let msg = grpc::BrainVmMessage {
|
|
msg: Some(grpc::brain_vm_message::Msg::UpdateVmReq(req.clone())),
|
|
};
|
|
match server_tx.send(msg).await {
|
|
Ok(_) => {
|
|
debug!("Successfully sent updateVMReq to {}", node_pubkey);
|
|
return;
|
|
}
|
|
Err(e) => {
|
|
warn!("Failed to send update VM request to {node_pubkey} due to error {e}");
|
|
info!("Deleting daemon TX for {}", node_pubkey);
|
|
self.del_daemon_tx(&node_pubkey);
|
|
self.submit_updatevm_resp(grpc::UpdateVmResp {
|
|
uuid,
|
|
error: "Daemon is offline.".to_string(),
|
|
args: None,
|
|
})
|
|
.await;
|
|
}
|
|
}
|
|
} else {
|
|
warn!("No daemon TX found for {}", node_pubkey);
|
|
self.submit_updatevm_resp(grpc::UpdateVmResp {
|
|
uuid,
|
|
error: "Daemon is offline.".to_string(),
|
|
args: None,
|
|
})
|
|
.await;
|
|
}
|
|
}
|
|
|
|
pub fn find_node_by_pubkey(&self, public_key: &str) -> Option<VmNode> {
|
|
let nodes = self.vm_nodes.read().unwrap();
|
|
nodes.iter().cloned().find(|n| n.public_key == public_key)
|
|
}
|
|
|
|
pub fn add_vmnode_to_operator(&self, operator_wallet: &str, node_pubkey: &str) {
|
|
self.operators
|
|
.entry(operator_wallet.to_string())
|
|
.and_modify(|op| {
|
|
op.vm_nodes.insert(node_pubkey.to_string());
|
|
})
|
|
.or_insert(OperatorData {
|
|
escrow: 0,
|
|
email: String::new(),
|
|
banned_users: HashSet::new(),
|
|
vm_nodes: HashSet::from([node_pubkey.to_string()]),
|
|
});
|
|
}
|
|
|
|
pub fn register_operator(&self, req: grpc::RegOperatorReq) -> Result<(), Error> {
|
|
let mut operator = match self.operators.get(&req.pubkey) {
|
|
Some(o) => (*(o.value())).clone(),
|
|
None => OperatorData {
|
|
..Default::default()
|
|
},
|
|
};
|
|
if req.escrow < 5000 {
|
|
return Err(Error::MinimalEscrow);
|
|
}
|
|
let escrow = req.escrow * 1_000_000_000;
|
|
if let Some(mut account) = self.accounts.get_mut(&req.pubkey) {
|
|
if (account.balance + operator.escrow) < escrow {
|
|
return Err(Error::InsufficientFunds);
|
|
}
|
|
account.balance = account.balance + operator.escrow - escrow;
|
|
operator.escrow = escrow;
|
|
} else {
|
|
return Err(Error::InsufficientFunds);
|
|
}
|
|
operator.email = req.email;
|
|
self.operators.insert(req.pubkey, operator);
|
|
Ok(())
|
|
}
|
|
|
|
pub fn find_vm_nodes_by_operator(&self, operator_wallet: &str) -> Vec<VmNode> {
|
|
let nodes = self.vm_nodes.read().unwrap();
|
|
nodes
|
|
.iter()
|
|
.filter(|node| node.operator_wallet == operator_wallet)
|
|
.cloned()
|
|
.collect()
|
|
}
|
|
|
|
pub fn total_operator_reports(&self, operator_wallet: &str) -> usize {
|
|
let nodes = self.vm_nodes.read().unwrap();
|
|
nodes
|
|
.iter()
|
|
.cloned()
|
|
.filter(|n| n.operator_wallet == operator_wallet)
|
|
.map(|node| node.reports.len())
|
|
.sum()
|
|
}
|
|
|
|
pub fn find_vm_nodes_by_filters(
|
|
&self,
|
|
filters: &crate::grpc::snp_proto::VmNodeFilters,
|
|
) -> Vec<VmNode> {
|
|
let nodes = self.vm_nodes.read().unwrap();
|
|
nodes
|
|
.iter()
|
|
.filter(|n| {
|
|
n.avail_ports >= filters.free_ports
|
|
&& (!filters.offers_ipv4 || n.avail_ipv4 > 0)
|
|
&& (!filters.offers_ipv6 || n.avail_ipv6 > 0)
|
|
&& n.avail_vcpus >= filters.vcpus
|
|
&& n.avail_mem_mb >= filters.memory_mb
|
|
&& n.avail_storage_gbs >= filters.storage_gb
|
|
&& (filters.country.is_empty() || (n.country == filters.country))
|
|
&& (filters.city.is_empty() || (n.city == filters.city))
|
|
&& (filters.region.is_empty() || (n.region == filters.region))
|
|
&& (filters.ip.is_empty() || (n.ip == filters.ip))
|
|
})
|
|
.cloned()
|
|
.collect()
|
|
}
|
|
|
|
// TODO: sort by rating
|
|
pub fn get_one_node_by_filters(
|
|
&self,
|
|
filters: &crate::grpc::snp_proto::VmNodeFilters,
|
|
) -> Option<VmNode> {
|
|
let nodes = self.vm_nodes.read().unwrap();
|
|
nodes
|
|
.iter()
|
|
.find(|n| {
|
|
n.avail_ports >= filters.free_ports
|
|
&& (!filters.offers_ipv4 || n.avail_ipv4 > 0)
|
|
&& (!filters.offers_ipv6 || n.avail_ipv6 > 0)
|
|
&& n.avail_vcpus >= filters.vcpus
|
|
&& n.avail_mem_mb >= filters.memory_mb
|
|
&& n.avail_storage_gbs >= filters.storage_gb
|
|
&& (filters.country.is_empty() || (n.country == filters.country))
|
|
&& (filters.city.is_empty() || (n.city == filters.city))
|
|
&& (filters.region.is_empty() || (n.region == filters.region))
|
|
&& (filters.ip.is_empty() || (n.ip == filters.ip))
|
|
&& (filters.node_pubkey.is_empty() || (n.public_key == filters.node_pubkey))
|
|
})
|
|
.cloned()
|
|
}
|
|
|
|
pub fn find_contract_by_uuid(&self, uuid: &str) -> Result<VmContract, Error> {
|
|
let contracts = self.vm_contracts.read().unwrap();
|
|
contracts
|
|
.iter()
|
|
.cloned()
|
|
.find(|c| c.uuid == uuid)
|
|
.ok_or(Error::VmContractNotFound(uuid.to_string()))
|
|
}
|
|
|
|
pub fn list_all_contracts(&self) -> Vec<VmContract> {
|
|
let contracts = self.vm_contracts.read().unwrap();
|
|
contracts.iter().cloned().collect()
|
|
}
|
|
|
|
pub fn list_accounts(&self) -> Vec<grpc::Account> {
|
|
self.accounts
|
|
.iter()
|
|
.map(|a| grpc::Account {
|
|
pubkey: a.key().to_string(),
|
|
balance: a.balance,
|
|
tmp_locked: a.tmp_locked,
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
pub fn list_operators(&self) -> Vec<grpc::ListOperatorsResp> {
|
|
self.operators
|
|
.iter()
|
|
.map(|op| grpc::ListOperatorsResp {
|
|
pubkey: op.key().to_string(),
|
|
escrow: op.escrow / 1_000_000_000,
|
|
email: op.email.clone(),
|
|
app_nodes: 0,
|
|
vm_nodes: op.vm_nodes.len() as u64,
|
|
reports: self.total_operator_reports(op.key()) as u64,
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
pub fn inspect_operator(&self, wallet: &str) -> Option<grpc::InspectOperatorResp> {
|
|
self.operators.get(wallet).map(|op| {
|
|
let nodes = self
|
|
.find_vm_nodes_by_operator(wallet)
|
|
.into_iter()
|
|
.map(|n| n.into())
|
|
.collect();
|
|
grpc::InspectOperatorResp {
|
|
operator: Some(grpc::ListOperatorsResp {
|
|
pubkey: op.key().to_string(),
|
|
escrow: op.escrow,
|
|
email: op.email.clone(),
|
|
app_nodes: 0,
|
|
vm_nodes: op.vm_nodes.len() as u64,
|
|
reports: self.total_operator_reports(op.key()) as u64,
|
|
}),
|
|
nodes,
|
|
}
|
|
})
|
|
}
|
|
|
|
pub fn find_vm_contracts_by_operator(&self, wallet: &str) -> Vec<VmContract> {
|
|
debug!("Searching contracts for operator {wallet}");
|
|
let nodes = match self.operators.get(wallet) {
|
|
Some(op) => op.vm_nodes.clone(),
|
|
None => return Vec::new(),
|
|
};
|
|
let contracts: Vec<VmContract> = self
|
|
.vm_contracts
|
|
.read()
|
|
.unwrap()
|
|
.iter()
|
|
.filter(|c| nodes.contains(&c.node_pubkey))
|
|
.cloned()
|
|
.collect();
|
|
contracts
|
|
}
|
|
|
|
pub fn find_vm_contracts_by_admin(&self, admin_wallet: &str) -> Vec<VmContract> {
|
|
debug!("Searching contracts for admin pubkey {admin_wallet}");
|
|
let contracts: Vec<VmContract> = self
|
|
.vm_contracts
|
|
.read()
|
|
.unwrap()
|
|
.iter()
|
|
.filter(|c| c.admin_pubkey == admin_wallet)
|
|
.cloned()
|
|
.collect();
|
|
contracts
|
|
}
|
|
|
|
pub fn find_vm_contracts_by_node(&self, node_pubkey: &str) -> Vec<VmContract> {
|
|
let contracts = self.vm_contracts.read().unwrap();
|
|
contracts
|
|
.iter()
|
|
.filter(|c| c.node_pubkey == node_pubkey)
|
|
.cloned()
|
|
.collect()
|
|
}
|
|
}
|