396 lines
14 KiB
Rust
396 lines
14 KiB
Rust
#![allow(dead_code)]
|
|
use crate::grpc::brain as grpc;
|
|
use chrono::Utc;
|
|
use dashmap::DashMap;
|
|
use log::{debug, info, warn};
|
|
use std::sync::RwLock;
|
|
use tokio::sync::mpsc::Sender;
|
|
use tokio::sync::oneshot::Sender as OneshotSender;
|
|
|
|
#[derive(Eq, Hash, PartialEq, Clone)]
|
|
pub struct Node {
|
|
pub public_key: String,
|
|
pub owner_key: String,
|
|
pub country: String,
|
|
pub city: String,
|
|
pub ip: String,
|
|
pub avail_mem_mb: u32,
|
|
pub avail_vcpus: u32,
|
|
pub avail_storage_gbs: u32,
|
|
pub avail_ipv4: u32,
|
|
pub avail_ipv6: u32,
|
|
pub avail_ports: u32,
|
|
pub max_ports_per_vm: u32,
|
|
}
|
|
|
|
impl From<grpc::RegisterNodeReq> for Node {
|
|
fn from(node: grpc::RegisterNodeReq) -> Self {
|
|
Node {
|
|
public_key: node.node_pubkey,
|
|
owner_key: node.owner_pubkey,
|
|
country: node.country,
|
|
city: node.city,
|
|
ip: node.ip,
|
|
avail_mem_mb: node.avail_memory_mb,
|
|
avail_vcpus: node.avail_vcpus,
|
|
avail_storage_gbs: node.avail_storage_gb,
|
|
avail_ipv4: node.avail_ipv4,
|
|
avail_ipv6: node.avail_ipv6,
|
|
avail_ports: node.avail_ports,
|
|
max_ports_per_vm: node.max_ports_per_vm,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Into<grpc::NodeListResp> for Node {
|
|
fn into(self) -> grpc::NodeListResp {
|
|
grpc::NodeListResp {
|
|
node_pubkey: self.public_key,
|
|
country: self.country,
|
|
city: self.city,
|
|
ip: self.ip,
|
|
server_rating: 0,
|
|
provider_rating: 0,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct Contract {
|
|
pub uuid: String,
|
|
pub hostname: String,
|
|
pub admin_pubkey: String,
|
|
pub node_pubkey: String,
|
|
pub exposed_ports: Vec<u32>,
|
|
pub public_ipv4: String,
|
|
pub public_ipv6: String,
|
|
pub disk_size_gb: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mb: u32,
|
|
pub kernel_sha: String,
|
|
pub dtrfs_sha: String,
|
|
pub created_at: String,
|
|
pub updated_at: String,
|
|
}
|
|
|
|
impl Into<grpc::VmContract> for Contract {
|
|
fn into(self) -> grpc::VmContract {
|
|
grpc::VmContract {
|
|
uuid: self.uuid,
|
|
hostname: self.hostname,
|
|
admin_pubkey: self.admin_pubkey,
|
|
node_pubkey: self.node_pubkey,
|
|
exposed_ports: self.exposed_ports,
|
|
public_ipv4: self.public_ipv4,
|
|
public_ipv6: self.public_ipv6,
|
|
disk_size_gb: self.disk_size_gb,
|
|
vcpus: self.vcpus,
|
|
memory_mb: self.memory_mb,
|
|
kernel_sha: self.kernel_sha,
|
|
dtrfs_sha: self.dtrfs_sha,
|
|
created_at: self.created_at,
|
|
updated_at: self.updated_at,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Default)]
|
|
pub struct BrainData {
|
|
nodes: RwLock<Vec<Node>>,
|
|
contracts: RwLock<Vec<Contract>>,
|
|
tmp_newvm_reqs: DashMap<String, (grpc::NewVmReq, OneshotSender<grpc::NewVmResp>)>,
|
|
tmp_updatevm_reqs: DashMap<String, (grpc::UpdateVmReq, OneshotSender<grpc::UpdateVmResp>)>,
|
|
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeleteVmReq>>,
|
|
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmReq>>,
|
|
daemon_updatevm_tx: DashMap<String, Sender<grpc::UpdateVmReq>>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
enum TxType {
|
|
CliContract,
|
|
DaemonDeleteVm,
|
|
DaemonNewVm,
|
|
}
|
|
|
|
impl BrainData {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
nodes: RwLock::new(Vec::new()),
|
|
contracts: RwLock::new(Vec::new()),
|
|
tmp_newvm_reqs: DashMap::new(),
|
|
tmp_updatevm_reqs: DashMap::new(),
|
|
daemon_deletevm_tx: DashMap::new(),
|
|
daemon_newvm_tx: DashMap::new(),
|
|
daemon_updatevm_tx: DashMap::new(),
|
|
}
|
|
}
|
|
|
|
pub fn insert_node(&self, node: grpc::RegisterNodeReq) {
|
|
info!("Registering node {node:?}");
|
|
let mut nodes = self.nodes.write().unwrap();
|
|
for n in nodes.iter_mut() {
|
|
if n.public_key == node.node_pubkey {
|
|
info!("Node {} already exists. Updating data.", n.public_key);
|
|
*n = node.into();
|
|
return;
|
|
}
|
|
}
|
|
nodes.push(node.into());
|
|
}
|
|
|
|
pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::DeleteVmReq>) {
|
|
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
|
|
}
|
|
|
|
pub fn add_daemon_updatevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::UpdateVmReq>) {
|
|
log::debug!("Adding daemon updatevm tx for node_pubkey: {}", node_pubkey);
|
|
self.daemon_updatevm_tx.insert(node_pubkey.to_string(), tx);
|
|
info!("Added daemon TX for {}", node_pubkey);
|
|
}
|
|
|
|
pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) {
|
|
if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) {
|
|
info!("Found vm {}. Deleting...", delete_vm.uuid);
|
|
if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) {
|
|
debug!(
|
|
"TX for daemon {} found. Informing daemon about deletion of {}.",
|
|
contract.node_pubkey, delete_vm.uuid
|
|
);
|
|
if daemon_tx.send(delete_vm.clone()).await.is_err() {
|
|
warn!(
|
|
"Failed to send deletion request to {}. Triggering memory cleanup.",
|
|
contract.node_pubkey
|
|
);
|
|
}
|
|
}
|
|
let mut contracts = self.contracts.write().unwrap();
|
|
contracts.retain(|c| c.uuid != delete_vm.uuid);
|
|
}
|
|
}
|
|
|
|
pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmReq>) {
|
|
self.tmp_newvm_reqs
|
|
.retain(|_, req| req.0.node_pubkey != node_pubkey);
|
|
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
|
|
}
|
|
|
|
pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmResp) {
|
|
let newvmreq = match self.tmp_newvm_reqs.remove(&confirmation.uuid) {
|
|
Some((_, r)) => r,
|
|
None => {
|
|
log::error!(
|
|
"Received confirmation for ghost NewVMReq {}",
|
|
confirmation.uuid
|
|
);
|
|
return;
|
|
}
|
|
};
|
|
if let Err(e) = newvmreq.1.send(confirmation.clone()) {
|
|
log::error!(
|
|
"CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}",
|
|
&newvmreq.0.admin_pubkey,
|
|
confirmation,
|
|
e
|
|
);
|
|
}
|
|
if confirmation.error != "" {
|
|
return;
|
|
}
|
|
let contract = Contract {
|
|
uuid: confirmation.uuid,
|
|
exposed_ports: confirmation.exposed_ports,
|
|
public_ipv4: confirmation.public_ipv4,
|
|
public_ipv6: confirmation.public_ipv6,
|
|
created_at: Utc::now().to_rfc3339(),
|
|
updated_at: String::new(),
|
|
hostname: newvmreq.0.hostname,
|
|
admin_pubkey: newvmreq.0.admin_pubkey,
|
|
node_pubkey: newvmreq.0.node_pubkey,
|
|
disk_size_gb: newvmreq.0.disk_size_gb,
|
|
vcpus: newvmreq.0.vcpus,
|
|
memory_mb: newvmreq.0.memory_mb,
|
|
kernel_sha: newvmreq.0.kernel_sha,
|
|
dtrfs_sha: newvmreq.0.dtrfs_sha,
|
|
};
|
|
info!("Created new contract: {contract:?}");
|
|
self.contracts.write().unwrap().push(contract);
|
|
}
|
|
|
|
pub async fn submit_updatevm_resp(&self, resp: grpc::UpdateVmResp) {
|
|
let updatevmreq = match self.tmp_updatevm_reqs.remove(&resp.uuid) {
|
|
Some((_, r)) => r,
|
|
None => {
|
|
log::error!(
|
|
"Received confirmation for ghost UpdateVMRequest {}",
|
|
resp.uuid
|
|
);
|
|
return;
|
|
}
|
|
};
|
|
if let Err(e) = updatevmreq.1.send(resp.clone()) {
|
|
log::error!("CLI RX dropped before receiving UpdateVMResp {resp:?}. Error is: {e:?}");
|
|
}
|
|
if resp.error != "" {
|
|
return;
|
|
}
|
|
let mut contracts = self.contracts.write().unwrap();
|
|
if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == resp.uuid) {
|
|
contract.disk_size_gb = updatevmreq.0.disk_size_gb;
|
|
contract.vcpus = updatevmreq.0.vcpus;
|
|
contract.memory_mb = updatevmreq.0.memory_mb;
|
|
contract.kernel_sha = updatevmreq.0.kernel_sha;
|
|
contract.dtrfs_sha = updatevmreq.0.dtrfs_sha;
|
|
contract.updated_at = Utc::now().to_rfc3339();
|
|
}
|
|
}
|
|
|
|
pub async fn submit_newvmrequest(
|
|
&self,
|
|
mut req: grpc::NewVmReq,
|
|
tx: OneshotSender<grpc::NewVmResp>,
|
|
) {
|
|
let uuid = uuid::Uuid::new_v4().to_string();
|
|
|
|
req.uuid = uuid.clone();
|
|
info!("Inserting new vm request in memory: {req:?}");
|
|
self.tmp_newvm_reqs
|
|
.insert(req.uuid.clone(), (req.clone(), tx));
|
|
if let Some(server_tx) = self.daemon_newvm_tx.get(&req.node_pubkey) {
|
|
debug!(
|
|
"Found daemon TX for {}. Sending newVMReq {}",
|
|
req.node_pubkey, req.uuid
|
|
);
|
|
if server_tx.send(req.clone()).await.is_ok() {
|
|
return;
|
|
} else {
|
|
warn!(
|
|
"Daemon {} RX dropped before sending update. Cleaning memory...",
|
|
req.node_pubkey
|
|
);
|
|
self.submit_vmconfirmation(grpc::NewVmResp {
|
|
error: "Daemon is offline.".to_string(),
|
|
uuid,
|
|
exposed_ports: Vec::new(),
|
|
public_ipv4: "".to_string(),
|
|
public_ipv6: "".to_string(),
|
|
})
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn submit_updatevm_req(
|
|
&self,
|
|
req: grpc::UpdateVmReq,
|
|
tx: OneshotSender<grpc::UpdateVmResp>,
|
|
) {
|
|
let uuid = req.uuid.clone();
|
|
info!("Inserting new vm update request in memory: {req:?}");
|
|
let node_pubkey = match self.find_contract_by_uuid(&req.uuid) {
|
|
Some(contract) => contract.node_pubkey,
|
|
None => {
|
|
log::warn!(
|
|
"Received UpdateVMReq for a contract that does not exist: {}",
|
|
req.uuid
|
|
);
|
|
let _ = tx.send(grpc::UpdateVmResp {
|
|
uuid,
|
|
error: "Contract does not exist.".to_string(),
|
|
});
|
|
return;
|
|
}
|
|
};
|
|
self.tmp_updatevm_reqs
|
|
.insert(req.uuid.clone(), (req.clone(), tx));
|
|
if let Some(server_tx) = self.daemon_updatevm_tx.get(&node_pubkey) {
|
|
debug!(
|
|
"Found daemon TX for {}. Sending updateVMReq {}",
|
|
node_pubkey, req.uuid
|
|
);
|
|
match server_tx.send(req.clone()).await {
|
|
Ok(_) => {
|
|
debug!("Successfully sent updateVMReq to {}", node_pubkey);
|
|
return;
|
|
}
|
|
Err(e) => {
|
|
warn!(
|
|
"Failed to send updateVMReq to {}: {}. Cleaning memory...",
|
|
node_pubkey, e
|
|
);
|
|
self.submit_updatevm_resp(grpc::UpdateVmResp {
|
|
uuid,
|
|
error: "Daemon is offline.".to_string(),
|
|
})
|
|
.await;
|
|
}
|
|
}
|
|
} else {
|
|
warn!("No daemon TX found for {}", node_pubkey);
|
|
self.submit_updatevm_resp(grpc::UpdateVmResp {
|
|
uuid,
|
|
error: "Daemon is offline.".to_string(),
|
|
})
|
|
.await;
|
|
}
|
|
}
|
|
|
|
pub fn insert_contract(&self, contract: Contract) {
|
|
let mut contracts = self.contracts.write().unwrap();
|
|
contracts.push(contract);
|
|
}
|
|
|
|
pub fn find_nodes_by_pubkey(&self, public_key: &str) -> Option<Node> {
|
|
let nodes = self.nodes.read().unwrap();
|
|
nodes.iter().cloned().find(|n| n.public_key == public_key)
|
|
}
|
|
|
|
pub fn find_ns_by_owner_key(&self, owner_key: &str) -> Option<Node> {
|
|
let nodes = self.nodes.read().unwrap();
|
|
nodes.iter().cloned().find(|n| n.owner_key == owner_key)
|
|
}
|
|
|
|
pub fn find_nodes_by_filters(&self, filters: &crate::grpc::brain::NodeFilters) -> Vec<Node> {
|
|
let nodes = self.nodes.read().unwrap();
|
|
nodes
|
|
.iter()
|
|
.cloned()
|
|
.filter(|n| {
|
|
n.avail_ports >= filters.free_ports
|
|
&& (!filters.offers_ipv4 || n.avail_ipv4 > 0)
|
|
&& (!filters.offers_ipv6 || n.avail_ipv6 > 0)
|
|
&& n.avail_vcpus >= filters.vcpus
|
|
&& n.avail_mem_mb >= filters.memory_mb
|
|
&& n.avail_storage_gbs >= filters.storage_gb
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
pub fn find_contract_by_uuid(&self, uuid: &str) -> Option<Contract> {
|
|
let contracts = self.contracts.read().unwrap();
|
|
contracts.iter().cloned().find(|c| c.uuid == uuid)
|
|
}
|
|
|
|
pub fn find_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec<Contract> {
|
|
debug!("Searching contracts for admin pubkey {admin_pubkey}");
|
|
let contracts: Vec<Contract> = self
|
|
.contracts
|
|
.read()
|
|
.unwrap()
|
|
.iter()
|
|
.cloned()
|
|
.filter(|c| c.admin_pubkey == admin_pubkey)
|
|
.collect();
|
|
debug!("Found {} contracts or {admin_pubkey}.", contracts.len());
|
|
contracts
|
|
}
|
|
|
|
pub fn find_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec<Contract> {
|
|
let contracts = self.contracts.read().unwrap();
|
|
contracts
|
|
.iter()
|
|
.cloned()
|
|
.filter(|c| c.node_pubkey == node_pubkey)
|
|
.collect()
|
|
}
|
|
}
|