941 lines
29 KiB
Rust
941 lines
29 KiB
Rust
pub use crate::constants::{
|
|
ACCOUNT, ACTIVE_VM, DB_ADDRESS, DB_NAME, DB_NS, DB_PASS, DB_USER, DELETED_VM, ID_ALPHABET,
|
|
NEW_VM_REQ, UPDATE_VM_REQ, VM_CONTRACT, VM_NODE,
|
|
};
|
|
|
|
use crate::old_brain;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::{str::FromStr, sync::LazyLock};
|
|
use surrealdb::{
|
|
engine::remote::ws::{Client, Ws},
|
|
opt::auth::Root,
|
|
sql::Datetime,
|
|
Notification, RecordId, Surreal,
|
|
};
|
|
use tokio::sync::mpsc::Sender;
|
|
use tokio_stream::StreamExt as _;
|
|
|
|
pub static DB: LazyLock<Surreal<Client>> = LazyLock::new(Surreal::init);
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub enum Error {
|
|
#[error("Internal DB error: {0}")]
|
|
DataBase(#[from] surrealdb::Error),
|
|
#[error("Daemon channel got closed: {0}")]
|
|
DaemonConnection(#[from] tokio::sync::mpsc::error::SendError<DaemonNotification>),
|
|
}
|
|
|
|
pub async fn init(db_address: &str, ns: &str, db: &str) -> surrealdb::Result<()> {
|
|
DB.connect::<Ws>(db_address).await?;
|
|
// Sign in to the server
|
|
DB.signin(Root { username: DB_USER, password: DB_PASS }).await?;
|
|
DB.use_ns(ns).use_db(db).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn upsert_record<SomeRecord: Serialize + 'static>(
|
|
table: &str,
|
|
id: &str,
|
|
my_record: SomeRecord,
|
|
) -> Result<(), Error> {
|
|
#[derive(Deserialize)]
|
|
struct Wrapper {}
|
|
let _: Option<Wrapper> = DB.create((table, id)).content(my_record).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()> {
|
|
let accounts: Vec<Account> = old_data.into();
|
|
let vm_nodes: Vec<VmNode> = old_data.into();
|
|
let app_nodes: Vec<AppNode> = old_data.into();
|
|
let vm_contracts: Vec<ActiveVm> = old_data.into();
|
|
|
|
println!("Inserting accounts...");
|
|
let _: Vec<Account> = DB.insert(()).content(accounts).await?;
|
|
println!("Inserting vm nodes...");
|
|
let _: Vec<VmNode> = DB.insert(()).content(vm_nodes).await?;
|
|
println!("Inserting app nodes...");
|
|
let _: Vec<AppNode> = DB.insert(()).content(app_nodes).await?;
|
|
println!("Inserting vm contracts...");
|
|
let _: Vec<ActiveVm> = DB.insert("vm_contract").relation(vm_contracts).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct Account {
|
|
pub id: RecordId,
|
|
pub balance: u64,
|
|
pub tmp_locked: u64,
|
|
pub escrow: u64,
|
|
pub email: String,
|
|
}
|
|
|
|
impl Account {
|
|
pub async fn get(address: &str) -> Result<Self, Error> {
|
|
let id = (ACCOUNT, address);
|
|
let account: Option<Self> = DB.select(id).await?;
|
|
let account = match account {
|
|
Some(account) => account,
|
|
None => {
|
|
Self { id: id.into(), balance: 0, tmp_locked: 0, escrow: 0, email: String::new() }
|
|
}
|
|
};
|
|
Ok(account)
|
|
}
|
|
|
|
pub async fn airdrop(account: &str, tokens: u64) -> Result<(), Error> {
|
|
let tokens = tokens.saturating_mul(1_000_000_000);
|
|
let _ = DB
|
|
.query(format!("upsert account:{account} SET balance = (balance || 0) + {tokens};"))
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl Account {
|
|
pub async fn is_banned_by_node(user: &str, node: &str) -> Result<bool, Error> {
|
|
let ban: Option<Self> = DB
|
|
.query(format!(
|
|
"(select operator->ban[0] as ban
|
|
from vm_node:{node}
|
|
where operator->ban->account contains account:{user}
|
|
).ban;"
|
|
))
|
|
.await?
|
|
.take(0)?;
|
|
Ok(ban.is_some())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct VmNode {
|
|
pub id: RecordId,
|
|
pub operator: RecordId,
|
|
pub country: String,
|
|
pub region: String,
|
|
pub city: String,
|
|
pub ip: String,
|
|
pub avail_mem_mb: u32,
|
|
pub avail_vcpus: u32,
|
|
pub avail_storage_gbs: u32,
|
|
pub avail_ipv4: u32,
|
|
pub avail_ipv6: u32,
|
|
pub avail_ports: u32,
|
|
pub max_ports_per_vm: u32,
|
|
pub price: u64,
|
|
pub offline_minutes: u64,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct VmNodeResources {
|
|
pub avail_mem_mb: u32,
|
|
pub avail_vcpus: u32,
|
|
pub avail_storage_gbs: u32,
|
|
pub avail_ipv4: u32,
|
|
pub avail_ipv6: u32,
|
|
pub avail_ports: u32,
|
|
pub max_ports_per_vm: u32,
|
|
}
|
|
|
|
impl VmNodeResources {
|
|
pub async fn merge(self, node_id: &str) -> Result<(), Error> {
|
|
let _: Option<VmNode> = DB.update((VM_NODE, node_id)).merge(self).await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl VmNode {
|
|
pub async fn register(self) -> Result<(), Error> {
|
|
let _: Option<VmNode> = DB.upsert(self.id.clone()).content(self).await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct VmNodeWithReports {
|
|
pub id: RecordId,
|
|
pub operator: RecordId,
|
|
pub country: String,
|
|
pub region: String,
|
|
pub city: String,
|
|
pub ip: String,
|
|
pub avail_mem_mb: u32,
|
|
pub avail_vcpus: u32,
|
|
pub avail_storage_gbs: u32,
|
|
pub avail_ipv4: u32,
|
|
pub avail_ipv6: u32,
|
|
pub avail_ports: u32,
|
|
pub max_ports_per_vm: u32,
|
|
pub price: u64,
|
|
pub offline_minutes: u64,
|
|
pub reports: Vec<Report>,
|
|
}
|
|
|
|
impl VmNodeWithReports {
|
|
// TODO: find a more elegant way to do this than importing gRPC in the DB module
|
|
// https://en.wikipedia.org/wiki/Dependency_inversion_principle
|
|
pub async fn find_by_filters(
|
|
filters: detee_shared::snp::pb::vm_proto::VmNodeFilters,
|
|
) -> Result<Vec<Self>, Error> {
|
|
let mut query = format!(
|
|
"select *, <-report.* as reports from {VM_NODE} where
|
|
avail_ports >= {} &&
|
|
max_ports_per_vm >= {} &&
|
|
avail_ipv4 >= {} &&
|
|
avail_ipv6 >= {} &&
|
|
avail_vcpus >= {} &&
|
|
avail_mem_mb >= {} &&
|
|
avail_storage_gbs >= {}\n",
|
|
filters.free_ports,
|
|
filters.free_ports,
|
|
filters.offers_ipv4 as u32,
|
|
filters.offers_ipv6 as u32,
|
|
filters.vcpus,
|
|
filters.memory_mb,
|
|
filters.storage_gb
|
|
);
|
|
if !filters.city.is_empty() {
|
|
query += &format!("&& city = '{}' ", filters.city);
|
|
}
|
|
if !filters.region.is_empty() {
|
|
query += &format!("&& region = '{}' ", filters.region);
|
|
}
|
|
if !filters.country.is_empty() {
|
|
query += &format!("&& country = '{}' ", filters.country);
|
|
}
|
|
if !filters.ip.is_empty() {
|
|
query += &format!("&& ip = '{}' ", filters.ip);
|
|
}
|
|
query += ";";
|
|
let mut result = DB.query(query).await?;
|
|
let vm_nodes: Vec<Self> = result.take(0)?;
|
|
Ok(vm_nodes)
|
|
}
|
|
}
|
|
|
|
pub enum DaemonNotification {
|
|
Create(NewVmReq),
|
|
Update(UpdateVmReq),
|
|
Delete(DeletedVm),
|
|
}
|
|
|
|
impl From<NewVmReq> for DaemonNotification {
|
|
fn from(value: NewVmReq) -> Self {
|
|
Self::Create(value)
|
|
}
|
|
}
|
|
|
|
impl From<UpdateVmReq> for DaemonNotification {
|
|
fn from(value: UpdateVmReq) -> Self {
|
|
Self::Update(value)
|
|
}
|
|
}
|
|
|
|
impl From<DeletedVm> for DaemonNotification {
|
|
fn from(value: DeletedVm) -> Self {
|
|
Self::Delete(value)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct NewVmReq {
|
|
pub id: RecordId,
|
|
#[serde(rename = "in")]
|
|
pub admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
pub vm_node: RecordId,
|
|
pub hostname: String,
|
|
pub extra_ports: Vec<u32>,
|
|
pub public_ipv4: bool,
|
|
pub public_ipv6: bool,
|
|
pub disk_size_gb: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mb: u32,
|
|
pub dtrfs_url: String,
|
|
pub dtrfs_sha: String,
|
|
pub kernel_sha: String,
|
|
pub kernel_url: String,
|
|
pub created_at: Datetime,
|
|
pub price_per_unit: u64,
|
|
pub locked_nano: u64,
|
|
pub error: String,
|
|
}
|
|
|
|
impl NewVmReq {
|
|
pub async fn get(id: &str) -> Result<Option<Self>, Error> {
|
|
let new_vm_req: Option<Self> = DB.select((NEW_VM_REQ, id)).await?;
|
|
Ok(new_vm_req)
|
|
}
|
|
|
|
pub async fn delete(id: &str) -> Result<(), Error> {
|
|
let _: Option<Self> = DB.delete((NEW_VM_REQ, id)).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn submit_error(id: &str, error: String) -> Result<(), Error> {
|
|
#[derive(Serialize)]
|
|
struct NewVmError {
|
|
error: String,
|
|
}
|
|
let _: Option<Self> = DB.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn submit(self) -> Result<(), Error> {
|
|
let _: Vec<Self> = DB.insert(NEW_VM_REQ).relation(self).await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// first string is the vm_id
|
|
pub enum NewVmResp {
|
|
// TODO: find a more elegant way to do this than importing gRPC in the DB module
|
|
// https://en.wikipedia.org/wiki/Dependency_inversion_principle
|
|
Args(String, detee_shared::snp::pb::vm_proto::MeasurementArgs),
|
|
Error(String, String),
|
|
}
|
|
|
|
impl NewVmResp {
|
|
pub async fn listen(vm_id: &str) -> Result<NewVmResp, Error> {
|
|
let mut resp = DB
|
|
.query(format!("live select * from {NEW_VM_REQ} where id = {NEW_VM_REQ}:{vm_id};"))
|
|
.query(format!(
|
|
"live select * from measurement_args where id = measurement_args:{vm_id};"
|
|
))
|
|
.await?;
|
|
let mut new_vm_stream = resp.stream::<Notification<NewVmReq>>(0)?;
|
|
let mut args_stream =
|
|
resp.stream::<Notification<detee_shared::snp::pb::vm_proto::MeasurementArgs>>(1)?;
|
|
|
|
loop {
|
|
tokio::select! {
|
|
new_vm_req_notif = new_vm_stream.next() => {
|
|
log::debug!("Got stream 1...");
|
|
if let Some(new_vm_req_notif) = new_vm_req_notif {
|
|
match new_vm_req_notif {
|
|
Ok(new_vm_req_notif) => {
|
|
if new_vm_req_notif.action == surrealdb::Action::Update && !new_vm_req_notif.data.error.is_empty() {
|
|
return Ok(Self::Error(vm_id.to_string(), new_vm_req_notif.data.error));
|
|
};
|
|
},
|
|
Err(e) => return Err(e.into()),
|
|
}
|
|
}
|
|
}
|
|
args_notif = args_stream.next() => {
|
|
if let Some(args_notif) = args_notif {
|
|
match args_notif {
|
|
Ok(args_notif) => {
|
|
if args_notif.action == surrealdb::Action::Create {
|
|
return Ok(Self::Args(vm_id.to_string(), args_notif.data));
|
|
};
|
|
},
|
|
Err(e) => return Err(e.into()),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct ActiveVm {
|
|
pub id: RecordId,
|
|
#[serde(rename = "in")]
|
|
pub admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
pub vm_node: RecordId,
|
|
pub hostname: String,
|
|
pub mapped_ports: Vec<(u32, u32)>,
|
|
pub public_ipv4: String,
|
|
pub public_ipv6: String,
|
|
pub disk_size_gb: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mb: u32,
|
|
pub dtrfs_sha: String,
|
|
pub kernel_sha: String,
|
|
pub created_at: Datetime,
|
|
pub price_per_unit: u64,
|
|
pub locked_nano: u64,
|
|
pub collected_at: Datetime,
|
|
}
|
|
|
|
impl ActiveVm {
|
|
pub async fn activate(
|
|
id: &str,
|
|
args: detee_shared::vm_proto::MeasurementArgs,
|
|
) -> Result<(), Error> {
|
|
let new_vm_req = match NewVmReq::get(id).await? {
|
|
Some(r) => r,
|
|
None => return Ok(()),
|
|
};
|
|
|
|
let mut public_ipv4 = String::new();
|
|
let mut public_ipv6 = String::new();
|
|
|
|
for ip in args.ips.iter() {
|
|
if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) {
|
|
if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() {
|
|
public_ipv4 = ipv4_addr.to_string();
|
|
}
|
|
continue;
|
|
}
|
|
if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) {
|
|
public_ipv6 = ipv6_addr.to_string();
|
|
}
|
|
}
|
|
|
|
let mut mapped_ports = Vec::new();
|
|
let mut guest_ports = vec![22];
|
|
guest_ports.append(&mut args.exposed_ports.clone());
|
|
let mut i = 0;
|
|
while i < new_vm_req.extra_ports.len() && i < guest_ports.len() {
|
|
mapped_ports.push((args.exposed_ports[i], guest_ports[i]));
|
|
i += 1;
|
|
}
|
|
|
|
let active_vm = ActiveVm {
|
|
id: RecordId::from((ACTIVE_VM, id)),
|
|
admin: new_vm_req.admin,
|
|
vm_node: new_vm_req.vm_node,
|
|
hostname: new_vm_req.hostname,
|
|
mapped_ports,
|
|
public_ipv4,
|
|
public_ipv6,
|
|
disk_size_gb: new_vm_req.disk_size_gb,
|
|
vcpus: new_vm_req.vcpus,
|
|
memory_mb: new_vm_req.memory_mb,
|
|
dtrfs_sha: new_vm_req.dtrfs_sha,
|
|
kernel_sha: new_vm_req.kernel_sha,
|
|
created_at: new_vm_req.created_at.clone(),
|
|
price_per_unit: new_vm_req.price_per_unit,
|
|
locked_nano: new_vm_req.locked_nano,
|
|
collected_at: new_vm_req.created_at,
|
|
};
|
|
|
|
let _: Vec<ActiveVm> = DB.insert(()).relation(active_vm).await?;
|
|
|
|
NewVmReq::delete(id).await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct UpdateVmReq {
|
|
pub id: RecordId,
|
|
#[serde(rename = "in")]
|
|
pub admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
pub vm_node: RecordId,
|
|
pub disk_size_gb: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mb: u32,
|
|
pub dtrfs_url: String,
|
|
pub dtrfs_sha: String,
|
|
pub kernel_sha: String,
|
|
pub kernel_url: String,
|
|
pub created_at: Datetime,
|
|
pub price_per_unit: u64,
|
|
pub locked_nano: u64,
|
|
}
|
|
|
|
pub async fn listen_for_node<
|
|
T: Into<DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>,
|
|
>(
|
|
node: &str,
|
|
tx: Sender<DaemonNotification>,
|
|
) -> Result<(), Error> {
|
|
let table_name = match std::any::type_name::<T>() {
|
|
"surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(),
|
|
"surreal_brain::db::UpdateVmReq" => UPDATE_VM_REQ.to_string(),
|
|
"surreal_brain::db::DeletedVm" => DELETED_VM.to_string(),
|
|
wat => {
|
|
log::error!("listen_for_node: T has type {wat}");
|
|
String::from("wat")
|
|
}
|
|
};
|
|
let mut resp =
|
|
DB.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?;
|
|
let mut live_stream = resp.stream::<Notification<T>>(0)?;
|
|
while let Some(result) = live_stream.next().await {
|
|
match result {
|
|
Ok(notification) => {
|
|
if notification.action == surrealdb::Action::Create {
|
|
tx.send(notification.data.into()).await?
|
|
}
|
|
}
|
|
Err(e) => {
|
|
log::warn!("listen_for_deletion DB stream failed for {node}: {e}");
|
|
return Err(Error::from(e));
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct DeletedVm {
|
|
pub id: RecordId,
|
|
#[serde(rename = "in")]
|
|
pub admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
pub vm_node: RecordId,
|
|
pub hostname: String,
|
|
pub mapped_ports: Vec<(u32, u32)>,
|
|
pub public_ipv4: String,
|
|
pub public_ipv6: String,
|
|
pub disk_size_gb: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mb: u32,
|
|
pub dtrfs_sha: String,
|
|
pub kernel_sha: String,
|
|
pub created_at: Datetime,
|
|
pub deleted_at: Datetime,
|
|
pub price_per_unit: u64,
|
|
}
|
|
|
|
impl DeletedVm {
|
|
pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> {
|
|
let contract: Option<Self> =
|
|
DB.query(format!("select * from {DELETED_VM}:{uuid};")).await?.take(0)?;
|
|
Ok(contract)
|
|
}
|
|
|
|
pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> {
|
|
let mut result =
|
|
DB.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?;
|
|
let contracts: Vec<Self> = result.take(0)?;
|
|
Ok(contracts)
|
|
}
|
|
|
|
pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> {
|
|
let mut result =
|
|
DB.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?;
|
|
let contracts: Vec<Self> = result.take(0)?;
|
|
Ok(contracts)
|
|
}
|
|
|
|
pub async fn list_by_operator(operator: &str) -> Result<Vec<Self>, Error> {
|
|
let mut result = DB
|
|
.query(format!(
|
|
"select
|
|
(select * from ->operator->vm_node<-{DELETED_VM}) as contracts
|
|
from {ACCOUNT}:{operator};"
|
|
))
|
|
.await?;
|
|
|
|
#[derive(Deserialize)]
|
|
struct Wrapper {
|
|
contracts: Vec<DeletedVm>,
|
|
}
|
|
|
|
let c: Option<Wrapper> = result.take(0)?;
|
|
match c {
|
|
Some(c) => Ok(c.contracts),
|
|
None => Ok(Vec::new()),
|
|
}
|
|
}
|
|
|
|
/// total hardware units of this VM
|
|
fn total_units(&self) -> u64 {
|
|
// TODO: Optimize this based on price of hardware.
|
|
// I tried, but this can be done better.
|
|
// Storage cost should also be based on tier
|
|
(self.vcpus as u64 * 10)
|
|
+ ((self.memory_mb + 256) as u64 / 200)
|
|
+ (self.disk_size_gb as u64 / 10)
|
|
+ (!self.public_ipv4.is_empty() as u64 * 10)
|
|
}
|
|
|
|
/// Returns price per minute in nanoLP
|
|
pub fn price_per_minute(&self) -> u64 {
|
|
self.total_units() * self.price_per_unit
|
|
}
|
|
}
|
|
|
|
impl ActiveVm {
|
|
/// total hardware units of this VM
|
|
fn total_units(&self) -> u64 {
|
|
// TODO: Optimize this based on price of hardware.
|
|
// I tried, but this can be done better.
|
|
// Storage cost should also be based on tier
|
|
(self.vcpus as u64 * 10)
|
|
+ ((self.memory_mb + 256) as u64 / 200)
|
|
+ (self.disk_size_gb as u64 / 10)
|
|
+ (!self.public_ipv4.is_empty() as u64 * 10)
|
|
}
|
|
|
|
/// Returns price per minute in nanoLP
|
|
pub fn price_per_minute(&self) -> u64 {
|
|
self.total_units() * self.price_per_unit
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct ActiveVmWithNode {
|
|
pub id: RecordId,
|
|
#[serde(rename = "in")]
|
|
pub admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
pub vm_node: VmNode,
|
|
pub hostname: String,
|
|
pub mapped_ports: Vec<(u32, u32)>,
|
|
pub public_ipv4: String,
|
|
pub public_ipv6: String,
|
|
pub disk_size_gb: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mb: u32,
|
|
pub dtrfs_sha: String,
|
|
pub kernel_sha: String,
|
|
pub created_at: Datetime,
|
|
pub price_per_unit: u64,
|
|
pub locked_nano: u64,
|
|
pub collected_at: Datetime,
|
|
}
|
|
|
|
impl ActiveVmWithNode {
|
|
pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> {
|
|
let contract: Option<Self> =
|
|
DB.query(format!("select * from {ACTIVE_VM}:{uuid} fetch out;")).await?.take(0)?;
|
|
Ok(contract)
|
|
}
|
|
|
|
pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> {
|
|
let mut result = DB
|
|
.query(format!("select * from {ACTIVE_VM} where in = {ACCOUNT}:{admin} fetch out;"))
|
|
.await?;
|
|
let contracts: Vec<Self> = result.take(0)?;
|
|
Ok(contracts)
|
|
}
|
|
|
|
pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> {
|
|
let mut result = DB
|
|
.query(format!("select * from {ACTIVE_VM} where out = {VM_NODE}:{admin} fetch out;"))
|
|
.await?;
|
|
let contracts: Vec<Self> = result.take(0)?;
|
|
Ok(contracts)
|
|
}
|
|
|
|
pub async fn list_by_operator(operator: &str) -> Result<Vec<Self>, Error> {
|
|
let mut result = DB
|
|
.query(format!(
|
|
"select
|
|
(select * from ->operator->vm_node<-{ACTIVE_VM} fetch out) as contracts
|
|
from {ACCOUNT}:{operator};"
|
|
))
|
|
.await?;
|
|
|
|
#[derive(Deserialize)]
|
|
struct Wrapper {
|
|
contracts: Vec<ActiveVmWithNode>,
|
|
}
|
|
|
|
let c: Option<Wrapper> = result.take(0)?;
|
|
match c {
|
|
Some(c) => Ok(c.contracts),
|
|
None => Ok(Vec::new()),
|
|
}
|
|
}
|
|
|
|
/// total hardware units of this VM
|
|
fn total_units(&self) -> u64 {
|
|
// TODO: Optimize this based on price of hardware.
|
|
// I tried, but this can be done better.
|
|
// Storage cost should also be based on tier
|
|
(self.vcpus as u64 * 10)
|
|
+ ((self.memory_mb + 256) as u64 / 200)
|
|
+ (self.disk_size_gb as u64 / 10)
|
|
+ (!self.public_ipv4.is_empty() as u64 * 10)
|
|
}
|
|
|
|
/// Returns price per minute in nanoLP
|
|
pub fn price_per_minute(&self) -> u64 {
|
|
self.total_units() * self.price_per_unit
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct AppNode {
|
|
pub id: RecordId,
|
|
pub operator: RecordId,
|
|
pub country: String,
|
|
pub region: String,
|
|
pub city: String,
|
|
pub ip: String,
|
|
pub avail_mem_mb: u32,
|
|
pub avail_vcpus: u32,
|
|
pub avail_storage_gbs: u32,
|
|
pub avail_ports: u32,
|
|
pub max_ports_per_app: u32,
|
|
pub price: u64,
|
|
pub offline_minutes: u64,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct AppNodeWithReports {
|
|
pub id: RecordId,
|
|
pub operator: RecordId,
|
|
pub country: String,
|
|
pub region: String,
|
|
pub city: String,
|
|
pub ip: String,
|
|
pub avail_mem_mb: u32,
|
|
pub avail_vcpus: u32,
|
|
pub avail_storage_gbs: u32,
|
|
pub avail_ports: u32,
|
|
pub max_ports_per_app: u32,
|
|
pub price: u64,
|
|
pub offline_minutes: u64,
|
|
pub reports: Vec<Report>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct AppContract {
|
|
id: RecordId,
|
|
#[serde(rename = "in")]
|
|
admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
app_node: RecordId,
|
|
app_name: String,
|
|
mapped_ports: Vec<(u64, u64)>,
|
|
host_ipv4: String,
|
|
vcpus: u64,
|
|
memory_mb: u64,
|
|
disk_size_gb: u64,
|
|
created_at: Datetime,
|
|
price_per_unit: u64,
|
|
locked_nano: u64,
|
|
collected_at: Datetime,
|
|
mr_enclave: String,
|
|
package_url: String,
|
|
hratls_pubkey: String,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct Ban {
|
|
id: RecordId,
|
|
#[serde(rename = "in")]
|
|
from_account: RecordId,
|
|
#[serde(rename = "out")]
|
|
to_account: RecordId,
|
|
created_at: Datetime,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct Kick {
|
|
id: RecordId,
|
|
#[serde(rename = "in")]
|
|
from_account: RecordId,
|
|
#[serde(rename = "out")]
|
|
to_account: RecordId,
|
|
created_at: Datetime,
|
|
reason: String,
|
|
contract: RecordId,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct Report {
|
|
#[serde(rename = "in")]
|
|
from_account: RecordId,
|
|
#[serde(rename = "out")]
|
|
to_node: RecordId,
|
|
created_at: Datetime,
|
|
pub reason: String,
|
|
}
|
|
|
|
impl Report {
|
|
// TODO: test this functionality and remove this comment
|
|
pub async fn create(
|
|
from_account: RecordId,
|
|
to_node: RecordId,
|
|
reason: String,
|
|
) -> Result<(), Error> {
|
|
let _: Vec<Self> = DB
|
|
.insert("report")
|
|
.relation(Report { from_account, to_node, created_at: Datetime::default(), reason })
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// This is the operator obtained from the DB,
|
|
/// however the relation is defined using OperatorRelation
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct Operator {
|
|
pub account: RecordId,
|
|
pub app_nodes: u64,
|
|
pub vm_nodes: u64,
|
|
pub email: String,
|
|
pub escrow: u64,
|
|
pub reports: u64,
|
|
}
|
|
|
|
impl Operator {
|
|
pub async fn list() -> Result<Vec<Self>, Error> {
|
|
let mut result = DB
|
|
.query(
|
|
"array::distinct(array::flatten( [
|
|
(select operator from vm_node group by operator).operator,
|
|
(select operator from app_node group by operator).operator
|
|
]));"
|
|
.to_string(),
|
|
)
|
|
.await?;
|
|
let operator_accounts: Vec<RecordId> = result.take(0)?;
|
|
let mut operators: Vec<Self> = Vec::new();
|
|
for account in operator_accounts.iter() {
|
|
if let Some(operator) = Self::inspect(&account.key().to_string()).await? {
|
|
operators.push(operator);
|
|
}
|
|
}
|
|
Ok(operators)
|
|
}
|
|
|
|
pub async fn inspect(account: &str) -> Result<Option<Self>, Error> {
|
|
let mut result = DB
|
|
.query(format!(
|
|
"$vm_nodes = (select id from vm_node where operator = account:{account}).id;
|
|
$app_nodes = (select id from app_node where operator = account:{account}).id;
|
|
select *,
|
|
id as account,
|
|
email,
|
|
escrow,
|
|
$vm_nodes.len() as vm_nodes,
|
|
$app_nodes.len() as app_nodes,
|
|
(select id from report where $vm_nodes contains out).len() +
|
|
(select id from report where $app_nodes contains out).len()
|
|
as reports
|
|
from account where id = account:{account};"
|
|
))
|
|
.await?;
|
|
let operator: Option<Self> = result.take(2)?;
|
|
Ok(operator)
|
|
}
|
|
|
|
pub async fn inspect_nodes(
|
|
account: &str,
|
|
) -> Result<(Option<Self>, Vec<VmNodeWithReports>, Vec<AppNodeWithReports>), Error> {
|
|
let operator = Self::inspect(account).await?;
|
|
let mut result = DB
|
|
.query(format!(
|
|
"select *, operator, <-report.* as reports from vm_node
|
|
where operator = account:{account};"
|
|
))
|
|
.query(format!(
|
|
"select *, operator, <-report.* as reports from app_node
|
|
where operator = account:{account};"
|
|
))
|
|
.await?;
|
|
let vm_nodes: Vec<VmNodeWithReports> = result.take(0)?;
|
|
let app_nodes: Vec<AppNodeWithReports> = result.take(1)?;
|
|
|
|
Ok((operator, vm_nodes, app_nodes))
|
|
}
|
|
}
|
|
|
|
// TODO: delete all of these From implementation after migration 0 gets executed
|
|
|
|
impl From<&old_brain::BrainData> for Vec<VmNode> {
|
|
fn from(old_data: &old_brain::BrainData) -> Self {
|
|
let mut nodes = Vec::new();
|
|
for old_node in old_data.vm_nodes.iter() {
|
|
nodes.push(VmNode {
|
|
id: RecordId::from((VM_NODE, old_node.public_key.clone())),
|
|
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
|
|
country: old_node.country.clone(),
|
|
region: old_node.region.clone(),
|
|
city: old_node.city.clone(),
|
|
ip: old_node.ip.clone(),
|
|
avail_mem_mb: old_node.avail_mem_mb,
|
|
avail_vcpus: old_node.avail_vcpus,
|
|
avail_storage_gbs: old_node.avail_storage_gbs,
|
|
avail_ipv4: old_node.avail_ipv4,
|
|
avail_ipv6: old_node.avail_ipv6,
|
|
avail_ports: old_node.avail_ports,
|
|
max_ports_per_vm: old_node.max_ports_per_vm,
|
|
price: old_node.price,
|
|
offline_minutes: old_node.offline_minutes,
|
|
});
|
|
}
|
|
nodes
|
|
}
|
|
}
|
|
|
|
impl From<&old_brain::BrainData> for Vec<ActiveVm> {
|
|
fn from(old_data: &old_brain::BrainData) -> Self {
|
|
let mut contracts = Vec::new();
|
|
for old_c in old_data.vm_contracts.iter() {
|
|
let mut mapped_ports = Vec::new();
|
|
for port in old_c.exposed_ports.iter() {
|
|
mapped_ports.push((*port, 8080u32));
|
|
}
|
|
contracts.push(ActiveVm {
|
|
id: RecordId::from((ACTIVE_VM, old_c.uuid.replace("-", ""))),
|
|
admin: RecordId::from((ACCOUNT, old_c.admin_pubkey.clone())),
|
|
vm_node: RecordId::from((VM_NODE, old_c.node_pubkey.clone())),
|
|
hostname: old_c.hostname.clone(),
|
|
mapped_ports,
|
|
public_ipv4: old_c.public_ipv4.clone(),
|
|
public_ipv6: old_c.public_ipv6.clone(),
|
|
disk_size_gb: old_c.disk_size_gb,
|
|
vcpus: old_c.vcpus,
|
|
memory_mb: old_c.memory_mb,
|
|
dtrfs_sha: old_c.dtrfs_sha.clone(),
|
|
kernel_sha: old_c.kernel_sha.clone(),
|
|
price_per_unit: old_c.price_per_unit,
|
|
locked_nano: old_c.locked_nano,
|
|
created_at: old_c.created_at.into(),
|
|
collected_at: old_c.collected_at.into(),
|
|
});
|
|
}
|
|
contracts
|
|
}
|
|
}
|
|
|
|
impl From<&old_brain::BrainData> for Vec<AppNode> {
|
|
fn from(old_data: &old_brain::BrainData) -> Self {
|
|
let mut nodes = Vec::new();
|
|
for old_node in old_data.app_nodes.iter() {
|
|
nodes.push(AppNode {
|
|
id: RecordId::from(("app_node", old_node.node_pubkey.clone())),
|
|
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
|
|
country: old_node.country.clone(),
|
|
region: old_node.region.clone(),
|
|
city: old_node.city.clone(),
|
|
ip: old_node.ip.clone(),
|
|
avail_mem_mb: old_node.avail_mem_mb,
|
|
avail_vcpus: old_node.avail_vcpus,
|
|
avail_storage_gbs: old_node.avail_storage_mb,
|
|
avail_ports: old_node.avail_no_of_port,
|
|
max_ports_per_app: old_node.max_ports_per_app,
|
|
price: old_node.price,
|
|
offline_minutes: old_node.offline_minutes,
|
|
});
|
|
}
|
|
nodes
|
|
}
|
|
}
|
|
|
|
impl From<&old_brain::BrainData> for Vec<Account> {
|
|
fn from(old_data: &old_brain::BrainData) -> Self {
|
|
let mut accounts = Vec::new();
|
|
for old_account in old_data.accounts.iter() {
|
|
let mut a = Account {
|
|
id: RecordId::from(("account", old_account.key())),
|
|
balance: old_account.value().balance,
|
|
tmp_locked: old_account.value().tmp_locked,
|
|
escrow: 0,
|
|
email: String::new(),
|
|
};
|
|
if let Some(operator) = old_data.operators.get(old_account.key()) {
|
|
a.escrow = operator.escrow;
|
|
a.email = operator.email.clone();
|
|
}
|
|
accounts.push(a);
|
|
}
|
|
accounts
|
|
}
|
|
}
|