1023 lines
34 KiB
Rust
1023 lines
34 KiB
Rust
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
use std::str::FromStr;
|
|
use std::time::Duration;
|
|
|
|
use super::Error;
|
|
use crate::constants::{
|
|
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_DAEMON_TIMEOUT, VM_NODE,
|
|
VM_UPDATE_EVENT,
|
|
};
|
|
use crate::db::{Account, ErrorFromTable, Report};
|
|
use detee_shared::vm_proto;
|
|
use serde::{Deserialize, Serialize};
|
|
use surrealdb::engine::remote::ws::Client;
|
|
use surrealdb::sql::Datetime;
|
|
use surrealdb::{Notification, RecordId, Surreal};
|
|
use tokio_stream::StreamExt as _;
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct VmNode {
|
|
pub id: RecordId,
|
|
pub operator: RecordId,
|
|
pub pub_sub_node: String,
|
|
pub country: String,
|
|
pub region: String,
|
|
pub city: String,
|
|
pub ip: String,
|
|
pub avail_ipv4: u32,
|
|
pub avail_ipv6: u32,
|
|
pub avail_ports: u32,
|
|
pub max_ports_per_vm: u32,
|
|
pub connected_at: Datetime,
|
|
pub disconnected_at: Datetime,
|
|
pub offers: Vec<VmNodeOffer>,
|
|
}
|
|
|
|
impl VmNode {
|
|
pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> {
|
|
Account::get_or_create(db, &self.operator.key().to_string()).await?;
|
|
let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn set_online(db: &Surreal<Client>, vm_node_id: &str) -> Result<(), Error> {
|
|
db.query(format!("UPDATE {VM_NODE}:{vm_node_id} SET connected_at = time::now();")).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn set_offline(db: &Surreal<Client>, vm_node_id: &str) -> Result<(), Error> {
|
|
db.query(format!("UPDATE {VM_NODE}:{vm_node_id} SET disconnected_at = time::now();"))
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct VmNodeResources {
|
|
pub avail_ipv4: u32,
|
|
pub avail_ipv6: u32,
|
|
pub avail_ports: u32,
|
|
pub max_ports_per_vm: u32,
|
|
pub offers: Vec<VmNodeOffer>,
|
|
}
|
|
|
|
impl VmNodeResources {
|
|
pub async fn merge(self, db: &Surreal<Client>, node_id: &str) -> Result<(), Error> {
|
|
let _: Option<VmNode> = db.update((VM_NODE, node_id)).merge(self).await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
pub struct VmNodeOffer {
|
|
pub price: u64,
|
|
pub memory_mib: u64,
|
|
pub vcpus: u64,
|
|
pub disk_mib: u64,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct VmNodeWithReports {
|
|
pub id: RecordId,
|
|
pub operator: RecordId,
|
|
pub country: String,
|
|
pub region: String,
|
|
pub city: String,
|
|
pub ip: String,
|
|
pub avail_ipv4: u32,
|
|
pub avail_ipv6: u32,
|
|
pub avail_ports: u32,
|
|
pub max_ports_per_vm: u32,
|
|
pub reports: Vec<Report>,
|
|
pub offers: Vec<VmNodeOffer>,
|
|
}
|
|
|
|
impl VmNodeWithReports {
|
|
pub async fn find_by_filters(
|
|
db: &Surreal<Client>,
|
|
filters: vm_proto::VmNodeFilters,
|
|
) -> Result<Vec<Self>, Error> {
|
|
let mut query = format!(
|
|
"select *, <-report.* as reports
|
|
from {VM_NODE} where
|
|
avail_ports >= {} &&
|
|
max_ports_per_vm >= {} &&
|
|
avail_ipv4 >= {} &&
|
|
avail_ipv6 >= {} &&
|
|
offers.*.vcpus >= {} &&
|
|
offers.*.memory_mib >= {} &&
|
|
offers.*.disk_mib >= {}\n",
|
|
filters.free_ports,
|
|
filters.free_ports,
|
|
filters.offers_ipv4 as u32,
|
|
filters.offers_ipv6 as u32,
|
|
filters.vcpus,
|
|
filters.memory_mib,
|
|
filters.storage_mib
|
|
);
|
|
if !filters.city.is_empty() {
|
|
query += &format!("&& city = '{}' ", filters.city);
|
|
}
|
|
if !filters.region.is_empty() {
|
|
query += &format!("&& region = '{}' ", filters.region);
|
|
}
|
|
if !filters.country.is_empty() {
|
|
query += &format!("&& country = '{}' ", filters.country);
|
|
}
|
|
if !filters.ip.is_empty() {
|
|
query += &format!("&& ip = '{}' ", filters.ip);
|
|
}
|
|
query += " && connected_at > disconnected_at;";
|
|
let mut result = db.query(query).await?;
|
|
let vm_nodes: Vec<Self> = result.take(0)?;
|
|
Ok(vm_nodes)
|
|
}
|
|
|
|
pub async fn find_by_daemon_pubkey(
|
|
db: &Surreal<Client>,
|
|
daemon_key: &str,
|
|
) -> Result<Option<Self>, Error> {
|
|
let vm_node: Option<VmNodeWithReports> = db
|
|
.query(format!(
|
|
"SELECT *, <-report.* as reports
|
|
FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};"
|
|
))
|
|
.await?
|
|
.take(0)?;
|
|
Ok(vm_node)
|
|
}
|
|
}
|
|
|
|
pub enum VmDaemonMsg {
|
|
Create(NewVmReq),
|
|
Update(UpdateVmReq),
|
|
Delete(DeletedVm),
|
|
}
|
|
|
|
impl From<NewVmReq> for VmDaemonMsg {
|
|
fn from(value: NewVmReq) -> Self {
|
|
Self::Create(value)
|
|
}
|
|
}
|
|
|
|
impl From<UpdateVmReq> for VmDaemonMsg {
|
|
fn from(value: UpdateVmReq) -> Self {
|
|
Self::Update(value)
|
|
}
|
|
}
|
|
|
|
impl From<DeletedVm> for VmDaemonMsg {
|
|
fn from(value: DeletedVm) -> Self {
|
|
Self::Delete(value)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct NewVmReq {
|
|
pub id: RecordId,
|
|
#[serde(rename = "in")]
|
|
pub admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
pub vm_node: RecordId,
|
|
pub hostname: String,
|
|
pub extra_ports: Vec<u32>,
|
|
pub public_ipv4: bool,
|
|
pub public_ipv6: bool,
|
|
pub disk_size_mib: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mib: u32,
|
|
pub dtrfs_url: String,
|
|
pub dtrfs_sha: String,
|
|
pub kernel_sha: String,
|
|
pub kernel_url: String,
|
|
pub created_at: Datetime,
|
|
pub price_per_unit: u64,
|
|
pub locked_nano: u64,
|
|
pub error: String,
|
|
}
|
|
|
|
impl NewVmReq {
|
|
pub async fn get(db: &Surreal<Client>, id: &str) -> Result<Option<Self>, Error> {
|
|
let new_vm_req: Option<Self> = db.select((NEW_VM_REQ, id)).await?;
|
|
Ok(new_vm_req)
|
|
}
|
|
|
|
pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<(), Error> {
|
|
let _: Option<Self> = db.delete((NEW_VM_REQ, id)).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> {
|
|
let tx_query = String::from(
|
|
"
|
|
BEGIN TRANSACTION;
|
|
|
|
LET $new_vm_req = $new_vm_req_input;
|
|
LET $error = $error_input;
|
|
LET $record = (select * from $new_vm_req)[0];
|
|
LET $admin = $record.in;
|
|
|
|
if $record == None {{
|
|
THROW 'vm req not exist ' + <string>$new_vm_req
|
|
}};
|
|
|
|
UPDATE $new_vm_req SET error = $error;
|
|
|
|
UPDATE $admin SET tmp_locked -= $record.locked_nano;
|
|
UPDATE $admin SET balance += $record.locked_nano;
|
|
|
|
COMMIT TRANSACTION;",
|
|
);
|
|
|
|
log::trace!("submit_new_vm_err query: {tx_query}");
|
|
|
|
let mut query_resp = db
|
|
.query(tx_query)
|
|
.bind(("new_vm_req_input", RecordId::from((NEW_VM_REQ, id))))
|
|
.bind(("error_input", error))
|
|
.await?;
|
|
|
|
let query_err = query_resp.take_errors();
|
|
if !query_err.is_empty() {
|
|
log::trace!("errors in submit_new_vm_err: {query_err:?}");
|
|
let tx_fail_err_str =
|
|
String::from("The query was not executed due to a failed transaction");
|
|
|
|
if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str {
|
|
log::error!("vm req not exist: {}", query_err[&4]);
|
|
return Err(Error::ContractNotFound);
|
|
} else {
|
|
log::error!("Unknown error in submit_new_vm_err: {query_err:?}");
|
|
return Err(Error::Unknown("submit_new_vm_req".to_string()));
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn submit(self, db: &Surreal<Client>) -> Result<(), Error> {
|
|
let locked_nano = self.locked_nano;
|
|
let tx_query = format!("
|
|
BEGIN TRANSACTION;
|
|
|
|
LET $account = $account_input;
|
|
LET $new_vm_req = $new_vm_req_input;
|
|
LET $vm_node = $vm_node_input;
|
|
LET $hostname = $hostname_input;
|
|
LET $dtrfs_url = $dtrfs_url_input;
|
|
LET $dtrfs_sha = $dtrfs_sha_input;
|
|
LET $kernel_url = $kernel_url_input;
|
|
LET $kernel_sha = $kernel_sha_input;
|
|
|
|
UPDATE $account SET balance -= {locked_nano};
|
|
IF $account.balance < 0 {{
|
|
THROW 'Insufficient funds.'
|
|
}};
|
|
UPDATE $account SET tmp_locked += {locked_nano};
|
|
RELATE
|
|
$account
|
|
->$new_vm_req
|
|
->$vm_node
|
|
CONTENT {{
|
|
created_at: time::now(), hostname: $hostname, vcpus: {}, memory_mib: {}, disk_size_mib: {},
|
|
extra_ports: {:?}, public_ipv4: {}, public_ipv6: {},
|
|
dtrfs_url: $dtrfs_url, dtrfs_sha: $dtrfs_sha, kernel_url: $kernel_url, kernel_sha: $kernel_sha,
|
|
price_per_unit: {}, locked_nano: {locked_nano}, error: ''
|
|
}};
|
|
|
|
COMMIT TRANSACTION;",
|
|
self.vcpus,
|
|
self.memory_mib,
|
|
self.disk_size_mib,
|
|
self.extra_ports,
|
|
self.public_ipv4,
|
|
self.public_ipv6,
|
|
self.price_per_unit
|
|
);
|
|
|
|
log::trace!("submit_new_vm_req query: {tx_query}");
|
|
|
|
let mut query_resp = db
|
|
.query(tx_query)
|
|
.bind(("account_input", self.admin))
|
|
.bind(("new_vm_req_input", self.id))
|
|
.bind(("vm_node_input", self.vm_node))
|
|
.bind(("hostname_input", self.hostname))
|
|
.bind(("dtrfs_url_input", self.dtrfs_url))
|
|
.bind(("dtrfs_sha_input", self.dtrfs_sha))
|
|
.bind(("kernel_url_input", self.kernel_url))
|
|
.bind(("kernel_sha_input", self.kernel_sha))
|
|
.await?;
|
|
|
|
let query_err = query_resp.take_errors();
|
|
if !query_err.is_empty() {
|
|
log::trace!("errors in submit_new_vm_req: {query_err:?}");
|
|
let tx_fail_err_str =
|
|
String::from("The query was not executed due to a failed transaction");
|
|
|
|
if query_err.contains_key(&9) && query_err[&9].to_string() != tx_fail_err_str {
|
|
log::error!("Transaction error: {}", query_err[&9]);
|
|
return Err(Error::InsufficientFunds);
|
|
} else {
|
|
log::error!("Unknown error in submit_new_vm_req: {query_err:?}");
|
|
return Err(Error::Unknown("submit_new_vm_req".to_string()));
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// first string is the vm_id
|
|
pub enum WrappedMeasurement {
|
|
Args(String, vm_proto::MeasurementArgs),
|
|
Error(String, String),
|
|
}
|
|
|
|
impl WrappedMeasurement {
|
|
/// table must be NEW_VM_REQ or UPDATE_VM_REQ
|
|
/// it will however be enforced if you send anything else
|
|
pub async fn listen(
|
|
db: &Surreal<Client>,
|
|
vm_id: &str,
|
|
table: &str,
|
|
) -> Result<WrappedMeasurement, Error> {
|
|
let table = match table {
|
|
UPDATE_VM_REQ => UPDATE_VM_REQ,
|
|
_ => NEW_VM_REQ,
|
|
};
|
|
let mut resp = db
|
|
.query(format!("live select error from {table} where id = {table}:{vm_id};"))
|
|
.query(format!(
|
|
"live select * from measurement_args where id = measurement_args:{vm_id};"
|
|
))
|
|
.await?;
|
|
let mut error_stream = resp.stream::<Notification<ErrorFromTable>>(0)?;
|
|
let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?;
|
|
|
|
let mut error =
|
|
db.query(format!("select error from {table} where id = {NEW_VM_REQ}:{vm_id}")).await?;
|
|
if let Some(error_on_newvm_req) = error.take::<Option<ErrorFromTable>>(0)? {
|
|
if !error_on_newvm_req.error.is_empty() {
|
|
return Ok(Self::Error(vm_id.to_string(), error_on_newvm_req.error));
|
|
}
|
|
}
|
|
|
|
let args: Option<vm_proto::MeasurementArgs> =
|
|
db.delete(("measurement_args", vm_id)).await?;
|
|
if let Some(args) = args {
|
|
return Ok(Self::Args(vm_id.to_string(), args));
|
|
}
|
|
log::trace!("listening for table: {table}");
|
|
|
|
tokio::time::timeout(Duration::from_secs(VM_DAEMON_TIMEOUT), async {
|
|
loop {
|
|
tokio::select! {
|
|
error_notification = error_stream.next() => {
|
|
if let Some(err_notif) = error_notification {
|
|
match err_notif {
|
|
Ok(err_notif) => {
|
|
if err_notif.action == surrealdb::Action::Update
|
|
&& !err_notif.data.error.is_empty() {
|
|
return Ok(Self::Error(vm_id.to_string(), err_notif.data.error));
|
|
};
|
|
},
|
|
Err(e) => return Err(e.into()),
|
|
}
|
|
}
|
|
}
|
|
args_notif = args_stream.next() => {
|
|
if let Some(args_notif) = args_notif {
|
|
match args_notif {
|
|
Ok(args_notif) => {
|
|
if args_notif.action == surrealdb::Action::Create {
|
|
let _: Option<vm_proto::MeasurementArgs> = db.delete(("measurement_args", vm_id)).await?;
|
|
return Ok(Self::Args(vm_id.to_string(), args_notif.data));
|
|
};
|
|
},
|
|
Err(e) => return Err(e.into()),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
.await?
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
pub struct ActiveVm {
|
|
pub id: RecordId,
|
|
#[serde(rename = "in")]
|
|
pub admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
pub vm_node: RecordId,
|
|
pub hostname: String,
|
|
pub mapped_ports: Vec<(u32, u32)>,
|
|
pub public_ipv4: String,
|
|
pub public_ipv6: String,
|
|
pub disk_size_mib: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mib: u32,
|
|
pub dtrfs_sha: String,
|
|
pub kernel_sha: String,
|
|
pub created_at: Datetime,
|
|
pub price_per_unit: u64,
|
|
pub locked_nano: u64,
|
|
pub collected_at: Datetime,
|
|
}
|
|
|
|
impl ActiveVm {
|
|
/// total hardware units of this VM
|
|
fn total_units(&self) -> u64 {
|
|
// TODO: Optimize this based on price of hardware.
|
|
// I tried, but this can be done better.
|
|
// Storage cost should also be based on tier
|
|
(self.vcpus as u64 * 10)
|
|
+ ((self.memory_mib + 256) as u64 / 200)
|
|
+ (self.disk_size_mib as u64 / 10)
|
|
+ (!self.public_ipv4.is_empty() as u64 * 10)
|
|
}
|
|
|
|
/// Returns price per minute in nanoLP
|
|
pub fn price_per_minute(&self) -> u64 {
|
|
self.total_units() * self.price_per_unit
|
|
}
|
|
|
|
pub async fn get_by_id(db: &Surreal<Client>, vm_id: &str) -> Result<Option<Self>, Error> {
|
|
let contract: Option<Self> = db
|
|
.query("select * from $active_vm_id;".to_string())
|
|
.bind(("active_vm_id", RecordId::from((ACTIVE_VM, vm_id))))
|
|
.await?
|
|
.take(0)?;
|
|
Ok(contract)
|
|
}
|
|
|
|
pub async fn activate(
|
|
db: &Surreal<Client>,
|
|
id: &str,
|
|
args: vm_proto::MeasurementArgs,
|
|
) -> Result<(), Error> {
|
|
let new_vm_req = match NewVmReq::get(db, id).await? {
|
|
Some(r) => r,
|
|
None => return Ok(()),
|
|
};
|
|
|
|
let mut public_ipv4 = String::new();
|
|
let mut public_ipv6 = String::new();
|
|
|
|
for ip in args.ips.iter() {
|
|
if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) {
|
|
if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() {
|
|
public_ipv4 = ipv4_addr.to_string();
|
|
}
|
|
continue;
|
|
}
|
|
if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) {
|
|
public_ipv6 = ipv6_addr.to_string();
|
|
}
|
|
}
|
|
|
|
let mut mapped_ports = Vec::new();
|
|
let mut guest_ports = vec![22];
|
|
guest_ports.append(&mut new_vm_req.extra_ports.clone());
|
|
let mut i = 0;
|
|
while i < args.exposed_ports.len() && i < guest_ports.len() {
|
|
mapped_ports.push((args.exposed_ports[i], guest_ports[i]));
|
|
i += 1;
|
|
}
|
|
|
|
let active_vm = ActiveVm {
|
|
id: RecordId::from((ACTIVE_VM, id)),
|
|
admin: new_vm_req.admin,
|
|
vm_node: new_vm_req.vm_node,
|
|
hostname: new_vm_req.hostname,
|
|
mapped_ports,
|
|
public_ipv4,
|
|
public_ipv6,
|
|
disk_size_mib: new_vm_req.disk_size_mib,
|
|
vcpus: new_vm_req.vcpus,
|
|
memory_mib: new_vm_req.memory_mib,
|
|
dtrfs_sha: new_vm_req.dtrfs_sha,
|
|
kernel_sha: new_vm_req.kernel_sha,
|
|
created_at: new_vm_req.created_at.clone(),
|
|
price_per_unit: new_vm_req.price_per_unit,
|
|
locked_nano: new_vm_req.locked_nano,
|
|
collected_at: new_vm_req.created_at,
|
|
};
|
|
|
|
let admin_account = active_vm.admin.key().to_string();
|
|
let locked_nano = active_vm.locked_nano;
|
|
let _: Vec<ActiveVm> = db.insert(()).relation(active_vm).await?;
|
|
NewVmReq::delete(db, id).await?;
|
|
db.query(format!("UPDATE {ACCOUNT}:{admin_account} SET tmp_locked -= {locked_nano};"))
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn update(db: &Surreal<Client>, id: &str) -> Result<(), Error> {
|
|
let update_vm_req = match UpdateVmReq::get(db, id).await? {
|
|
Some(r) => r,
|
|
None => return Ok(()),
|
|
};
|
|
|
|
let mut active_vm = match Self::get_by_id(db, id).await? {
|
|
Some(vm) => vm,
|
|
None => return Ok(()),
|
|
};
|
|
|
|
if update_vm_req.vcpus > 0 {
|
|
active_vm.vcpus = update_vm_req.vcpus;
|
|
}
|
|
if update_vm_req.memory_mib > 0 {
|
|
active_vm.memory_mib = update_vm_req.memory_mib;
|
|
}
|
|
if update_vm_req.disk_size_mib > 0 {
|
|
active_vm.disk_size_mib = update_vm_req.disk_size_mib;
|
|
}
|
|
if !update_vm_req.dtrfs_sha.is_empty() && !update_vm_req.kernel_sha.is_empty() {
|
|
active_vm.dtrfs_sha = update_vm_req.dtrfs_sha;
|
|
active_vm.kernel_sha = update_vm_req.kernel_sha;
|
|
}
|
|
|
|
let _: Option<ActiveVm> = db.update(active_vm.id.clone()).content(active_vm).await?;
|
|
UpdateVmReq::delete(db, id).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn change_hostname(
|
|
db: &Surreal<Client>,
|
|
id: &str,
|
|
new_hostname: &str,
|
|
) -> Result<bool, Error> {
|
|
let contract: Option<Self> = db
|
|
.query(format!(
|
|
"UPDATE {ACTIVE_VM}:{id} SET hostname = '{new_hostname}' RETURN BEFORE;"
|
|
))
|
|
.await?
|
|
.take(0)?;
|
|
if let Some(contract) = contract {
|
|
if contract.hostname != new_hostname {
|
|
return Ok(true);
|
|
}
|
|
}
|
|
Ok(false)
|
|
}
|
|
|
|
pub async fn delete(db: &Surreal<Client>, admin: &str, id: &str) -> Result<(), Error> {
|
|
let mut vm_del_resp = db
|
|
.query(
|
|
"
|
|
BEGIN TRANSACTION;
|
|
|
|
LET $active_vm = $vm_id_input;
|
|
LET $admin = $admin_input;
|
|
|
|
IF $active_vm.in != $admin {
|
|
THROW 'Unauthorized'
|
|
};
|
|
|
|
return fn::delete_vm($active_vm);
|
|
|
|
COMMIT TRANSACTION;
|
|
",
|
|
)
|
|
.bind(("vm_id_input", RecordId::from((ACTIVE_VM, id))))
|
|
.bind(("admin_input", RecordId::from((ACCOUNT, admin))))
|
|
.await?;
|
|
|
|
log::trace!("delete_vm query response: {vm_del_resp:?}");
|
|
|
|
let query_err = vm_del_resp.take_errors();
|
|
if !query_err.is_empty() {
|
|
log::trace!("errors in delete_vm: {query_err:?}");
|
|
let tx_fail_err_str =
|
|
String::from("The query was not executed due to a failed transaction");
|
|
|
|
if query_err.contains_key(&2) && query_err[&2].to_string() != tx_fail_err_str {
|
|
log::error!("Unauthorized: {}", query_err[&2]);
|
|
return Err(Error::AccessDenied);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn extend_time(
|
|
db: &Surreal<Client>,
|
|
id: &str,
|
|
admin: &str,
|
|
nano_lp: u64,
|
|
) -> Result<(), Error> {
|
|
if nano_lp > 100_000_000_000_000 {
|
|
return Err(Error::TooBigTransaction(nano_lp.to_string()));
|
|
}
|
|
|
|
let tx_query = format!(
|
|
"
|
|
BEGIN TRANSACTION;
|
|
|
|
LET $contract = $contract_input;
|
|
LET $admin = $admin_input;
|
|
LET $lock_amt = {nano_lp};
|
|
|
|
if !record::exists($contract) {{
|
|
THROW 'contract not exist ' + <string>$contract
|
|
}};
|
|
if $contract.in != $admin {{
|
|
THROW 'Unauthorized'
|
|
}};
|
|
if $admin.balance + $contract.locked_nano < $lock_amt {{
|
|
THROW 'InsufficientFunds'
|
|
}};
|
|
|
|
UPDATE $admin SET balance = $admin.balance + $contract.locked_nano - $lock_amt;
|
|
UPDATE $contract SET locked_nano = $lock_amt;
|
|
|
|
COMMIT TRANSACTION;
|
|
"
|
|
);
|
|
|
|
log::trace!("extend_time query: {tx_query}");
|
|
|
|
let mut query_res = db
|
|
.query(tx_query)
|
|
.bind(("contract_input", RecordId::from((ACTIVE_VM, id))))
|
|
.bind(("admin_input", RecordId::from((ACCOUNT, admin))))
|
|
.await?;
|
|
|
|
log::trace!("tx_query response: {query_res:?}");
|
|
|
|
let query_err = query_res.take_errors();
|
|
if !query_err.is_empty() {
|
|
let tx_fail_err_str =
|
|
String::from("The query was not executed due to a failed transaction");
|
|
|
|
if query_err.contains_key(&3) && query_err[&3].to_string() != tx_fail_err_str {
|
|
log::error!("contract not exist: {}", query_err[&3]);
|
|
return Err(Error::ContractNotFound);
|
|
}
|
|
|
|
if query_err.contains_key(&4) && query_err[&4].to_string() != tx_fail_err_str {
|
|
log::error!("Unauthorized: {}", query_err[&4]);
|
|
return Err(Error::AccessDenied);
|
|
}
|
|
|
|
if query_err.contains_key(&5) && query_err[&5].to_string() != tx_fail_err_str {
|
|
log::error!("InsufficientFunds: {}", query_err[&5]);
|
|
return Err(Error::InsufficientFunds);
|
|
}
|
|
|
|
log::error!("Unknown error in extend_time: {query_err:?}");
|
|
|
|
return Err(Error::Unknown("extend_time".to_string()));
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct UpdateVmReq {
|
|
pub id: RecordId,
|
|
#[serde(rename = "in")]
|
|
pub admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
pub vm_node: RecordId,
|
|
pub disk_size_mib: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mib: u32,
|
|
pub dtrfs_url: String,
|
|
pub dtrfs_sha: String,
|
|
pub kernel_sha: String,
|
|
pub kernel_url: String,
|
|
pub created_at: Datetime,
|
|
pub error: String,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct UpdateVmEvent {
|
|
pub vm_id: RecordId,
|
|
#[serde(rename = "in")]
|
|
pub admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
pub vm_node: RecordId,
|
|
pub disk_size_mib: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mib: u32,
|
|
pub dtrfs_url: String,
|
|
pub dtrfs_sha: String,
|
|
pub kernel_sha: String,
|
|
pub kernel_url: String,
|
|
pub executed_at: Datetime,
|
|
}
|
|
|
|
impl From<UpdateVmReq> for UpdateVmEvent {
|
|
fn from(update_vm_req: UpdateVmReq) -> Self {
|
|
Self {
|
|
vm_id: RecordId::from((VM_UPDATE_EVENT, update_vm_req.id.key().to_string())),
|
|
admin: update_vm_req.admin,
|
|
vm_node: update_vm_req.vm_node,
|
|
disk_size_mib: update_vm_req.disk_size_mib,
|
|
vcpus: update_vm_req.vcpus,
|
|
memory_mib: update_vm_req.memory_mib,
|
|
dtrfs_url: update_vm_req.dtrfs_url,
|
|
dtrfs_sha: update_vm_req.dtrfs_sha,
|
|
kernel_sha: update_vm_req.kernel_sha,
|
|
kernel_url: update_vm_req.kernel_url,
|
|
executed_at: Datetime::default(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl UpdateVmReq {
|
|
pub async fn get(db: &Surreal<Client>, id: &str) -> Result<Option<Self>, Error> {
|
|
let update_vm_req: Option<Self> = db.select((UPDATE_VM_REQ, id)).await?;
|
|
Ok(update_vm_req)
|
|
}
|
|
|
|
pub async fn delete(db: &Surreal<Client>, id: &str) -> Result<(), Error> {
|
|
let update_vm_req: Option<Self> = db.delete((UPDATE_VM_REQ, id)).await?;
|
|
if let Some(update_vm_req) = update_vm_req {
|
|
let update_vm_event: UpdateVmEvent = update_vm_req.into();
|
|
let _: Option<UpdateVmEvent> =
|
|
db.create(VM_UPDATE_EVENT).content(update_vm_event).await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// returns None if VM does not exist
|
|
/// returns Some(false) if hw update is not needed
|
|
/// returns Some(true) if hw update is needed and got submitted
|
|
/// returns error if something happened with the DB
|
|
pub async fn request_hw_update(mut self, db: &Surreal<Client>) -> Result<Option<bool>, Error> {
|
|
let contract = ActiveVm::get_by_id(db, &self.id.key().to_string()).await?;
|
|
|
|
if contract.is_none() {
|
|
return Ok(None);
|
|
}
|
|
let contract = contract.unwrap();
|
|
let mem_per_cpu = contract.memory_mib / contract.vcpus;
|
|
let disk_per_cpu = contract.disk_size_mib / contract.vcpus;
|
|
self.vm_node = contract.vm_node;
|
|
|
|
if !((self.vcpus != 0 && contract.vcpus != self.vcpus)
|
|
|| (self.memory_mib != 0 && contract.memory_mib != self.memory_mib)
|
|
|| (!self.dtrfs_sha.is_empty() && contract.dtrfs_sha != self.dtrfs_sha)
|
|
|| (self.disk_size_mib != 0 && contract.disk_size_mib != self.disk_size_mib))
|
|
{
|
|
return Ok(Some(false));
|
|
}
|
|
|
|
// Do not allow user to unbalance node resources
|
|
if self.vcpus == 0 {
|
|
self.vcpus = self.memory_mib / mem_per_cpu;
|
|
}
|
|
if self.memory_mib == 0 {
|
|
self.memory_mib = self.vcpus * mem_per_cpu;
|
|
}
|
|
if self.vcpus == 0 {
|
|
self.vcpus = self.disk_size_mib / disk_per_cpu;
|
|
}
|
|
if self.disk_size_mib == 0 {
|
|
self.disk_size_mib = self.vcpus * disk_per_cpu;
|
|
}
|
|
|
|
let _: Vec<Self> = db.insert(UPDATE_VM_REQ).relation(self).await?;
|
|
Ok(Some(true))
|
|
}
|
|
|
|
pub async fn submit_error(db: &Surreal<Client>, id: &str, error: String) -> Result<(), Error> {
|
|
#[derive(Serialize)]
|
|
struct UpdateVmError {
|
|
error: String,
|
|
}
|
|
let _: Option<Self> = db.update((UPDATE_VM_REQ, id)).merge(UpdateVmError { error }).await?;
|
|
let _: Option<Self> = db.delete((UPDATE_VM_REQ, id)).await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct DeletedVm {
|
|
pub id: RecordId,
|
|
#[serde(rename = "in")]
|
|
pub admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
pub vm_node: RecordId,
|
|
pub hostname: String,
|
|
pub mapped_ports: Vec<(u32, u32)>,
|
|
pub public_ipv4: String,
|
|
pub public_ipv6: String,
|
|
pub disk_size_mib: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mib: u32,
|
|
pub dtrfs_sha: String,
|
|
pub kernel_sha: String,
|
|
pub created_at: Datetime,
|
|
pub deleted_at: Datetime,
|
|
pub price_per_unit: u64,
|
|
}
|
|
|
|
impl From<ActiveVm> for DeletedVm {
|
|
fn from(active_vm: ActiveVm) -> Self {
|
|
Self {
|
|
id: RecordId::from((DELETED_VM, active_vm.id.key().to_string())),
|
|
admin: active_vm.admin,
|
|
vm_node: active_vm.vm_node,
|
|
hostname: active_vm.hostname,
|
|
mapped_ports: active_vm.mapped_ports,
|
|
public_ipv4: active_vm.public_ipv4,
|
|
public_ipv6: active_vm.public_ipv6,
|
|
disk_size_mib: active_vm.disk_size_mib,
|
|
vcpus: active_vm.vcpus,
|
|
memory_mib: active_vm.memory_mib,
|
|
dtrfs_sha: active_vm.dtrfs_sha,
|
|
kernel_sha: active_vm.kernel_sha,
|
|
created_at: active_vm.created_at,
|
|
deleted_at: Datetime::default(),
|
|
price_per_unit: active_vm.price_per_unit,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl DeletedVm {
|
|
pub async fn get_by_id(db: &Surreal<Client>, vm_id: &str) -> Result<Option<Self>, Error> {
|
|
let contract: Option<Self> =
|
|
db.query(format!("select * from {DELETED_VM}:{vm_id};")).await?.take(0)?;
|
|
Ok(contract)
|
|
}
|
|
|
|
pub async fn list_by_admin(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> {
|
|
let mut result =
|
|
db.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?;
|
|
let contracts: Vec<Self> = result.take(0)?;
|
|
Ok(contracts)
|
|
}
|
|
|
|
pub async fn list_by_node(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> {
|
|
let mut result =
|
|
db.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?;
|
|
let contracts: Vec<Self> = result.take(0)?;
|
|
Ok(contracts)
|
|
}
|
|
|
|
pub async fn list_by_operator(
|
|
db: &Surreal<Client>,
|
|
operator: &str,
|
|
) -> Result<Vec<Self>, Error> {
|
|
let mut result = db
|
|
.query(format!(
|
|
"select
|
|
(select * from ->operator->vm_node<-{DELETED_VM}) as contracts
|
|
from {ACCOUNT}:{operator};"
|
|
))
|
|
.await?;
|
|
|
|
#[derive(Deserialize)]
|
|
struct Wrapper {
|
|
contracts: Vec<DeletedVm>,
|
|
}
|
|
|
|
let c: Option<Wrapper> = result.take(0)?;
|
|
match c {
|
|
Some(c) => Ok(c.contracts),
|
|
None => Ok(Vec::new()),
|
|
}
|
|
}
|
|
|
|
/// total hardware units of this VM
|
|
fn total_units(&self) -> u64 {
|
|
// TODO: Optimize this based on price of hardware.
|
|
// I tried, but this can be done better.
|
|
// Storage cost should also be based on tier
|
|
(self.vcpus as u64 * 10)
|
|
+ ((self.memory_mib + 256) as u64 / 200)
|
|
+ (self.disk_size_mib as u64 / 10)
|
|
+ (!self.public_ipv4.is_empty() as u64 * 10)
|
|
}
|
|
|
|
/// Returns price per minute in nanoLP
|
|
pub fn price_per_minute(&self) -> u64 {
|
|
self.total_units() * self.price_per_unit
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct ActiveVmWithNode {
|
|
pub id: RecordId,
|
|
#[serde(rename = "in")]
|
|
pub admin: RecordId,
|
|
#[serde(rename = "out")]
|
|
pub vm_node: VmNode,
|
|
pub hostname: String,
|
|
pub mapped_ports: Vec<(u32, u32)>,
|
|
pub public_ipv4: String,
|
|
pub public_ipv6: String,
|
|
pub disk_size_mib: u32,
|
|
pub vcpus: u32,
|
|
pub memory_mib: u32,
|
|
pub dtrfs_sha: String,
|
|
pub kernel_sha: String,
|
|
pub created_at: Datetime,
|
|
pub price_per_unit: u64,
|
|
pub locked_nano: u64,
|
|
pub collected_at: Datetime,
|
|
}
|
|
|
|
impl From<ActiveVmWithNode> for ActiveVm {
|
|
fn from(val: ActiveVmWithNode) -> Self {
|
|
Self {
|
|
id: val.id,
|
|
admin: val.admin,
|
|
vm_node: val.vm_node.id,
|
|
hostname: val.hostname,
|
|
mapped_ports: val.mapped_ports,
|
|
public_ipv4: val.public_ipv4,
|
|
public_ipv6: val.public_ipv6,
|
|
disk_size_mib: val.disk_size_mib,
|
|
vcpus: val.vcpus,
|
|
memory_mib: val.memory_mib,
|
|
dtrfs_sha: val.dtrfs_sha,
|
|
kernel_sha: val.kernel_sha,
|
|
created_at: val.created_at,
|
|
price_per_unit: val.price_per_unit,
|
|
locked_nano: val.locked_nano,
|
|
collected_at: val.collected_at,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl ActiveVmWithNode {
|
|
pub async fn get_by_id(db: &Surreal<Client>, vm_id: &str) -> Result<Option<Self>, Error> {
|
|
let contract: Option<Self> = db
|
|
.query(format!("select * from {ACTIVE_VM} where id = $vm_id fetch out;"))
|
|
.bind(("vm_id", RecordId::from((ACTIVE_VM, vm_id))))
|
|
.await?
|
|
.take(0)?;
|
|
Ok(contract)
|
|
}
|
|
|
|
pub async fn list_by_admin(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> {
|
|
let mut result = db
|
|
.query(format!("select * from {ACTIVE_VM} where in = {ACCOUNT}:{admin} fetch out;"))
|
|
.await?;
|
|
let contracts: Vec<Self> = result.take(0)?;
|
|
Ok(contracts)
|
|
}
|
|
|
|
pub async fn list_by_node(db: &Surreal<Client>, admin: &str) -> Result<Vec<Self>, Error> {
|
|
let mut result = db
|
|
.query(format!("select * from {ACTIVE_VM} where out = {VM_NODE}:{admin} fetch out;"))
|
|
.await?;
|
|
let contracts: Vec<Self> = result.take(0)?;
|
|
Ok(contracts)
|
|
}
|
|
|
|
pub async fn list_by_operator(
|
|
db: &Surreal<Client>,
|
|
operator: &str,
|
|
) -> Result<Vec<Self>, Error> {
|
|
let mut result = db
|
|
.query(format!(
|
|
"select
|
|
(select * from ->operator->vm_node<-{ACTIVE_VM} fetch out) as contracts
|
|
from {ACCOUNT}:{operator};"
|
|
))
|
|
.await?;
|
|
|
|
#[derive(Deserialize)]
|
|
struct Wrapper {
|
|
contracts: Vec<ActiveVmWithNode>,
|
|
}
|
|
|
|
let c: Option<Wrapper> = result.take(0)?;
|
|
match c {
|
|
Some(c) => Ok(c.contracts),
|
|
None => Ok(Vec::new()),
|
|
}
|
|
}
|
|
|
|
/// total hardware units of this VM
|
|
fn total_units(&self) -> u64 {
|
|
// TODO: Optimize this based on price of hardware.
|
|
// I tried, but this can be done better.
|
|
// Storage cost should also be based on tier
|
|
(self.vcpus as u64 * 10)
|
|
+ ((self.memory_mib + 256) as u64 / 200)
|
|
+ (self.disk_size_mib as u64 / 1024 / 10)
|
|
+ (!self.public_ipv4.is_empty() as u64 * 10)
|
|
}
|
|
|
|
/// Returns price per minute in nanoLP
|
|
pub fn price_per_minute(&self) -> u64 {
|
|
self.total_units() * self.price_per_unit
|
|
}
|
|
|
|
pub async fn list_all(db: &Surreal<Client>) -> Result<Vec<Self>, Error> {
|
|
let mut query_response = db.query(format!("SELECT * FROM {ACTIVE_VM} FETCH out;")).await?;
|
|
let active_vms: Vec<Self> = query_response.take(0)?;
|
|
Ok(active_vms)
|
|
}
|
|
}
|