use std::str::FromStr; use std::time::Duration; use super::Error; use crate::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, VM_NODE}; use crate::db::general::Report; use crate::old_brain; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::Client; use surrealdb::sql::Datetime; use surrealdb::{Notification, RecordId, Surreal}; use tokio_stream::StreamExt as _; #[derive(Debug, Serialize, Deserialize)] pub struct VmNode { pub id: RecordId, pub operator: RecordId, pub country: String, pub region: String, pub city: String, pub ip: String, pub avail_mem_mb: u32, pub avail_vcpus: u32, pub avail_storage_gbs: u32, pub avail_ipv4: u32, pub avail_ipv6: u32, pub avail_ports: u32, pub max_ports_per_vm: u32, pub price: u64, pub offline_minutes: u64, } #[derive(Serialize)] pub struct VmNodeResources { pub avail_mem_mb: u32, pub avail_vcpus: u32, pub avail_storage_gbs: u32, pub avail_ipv4: u32, pub avail_ipv6: u32, pub avail_ports: u32, pub max_ports_per_vm: u32, } impl VmNodeResources { pub async fn merge(self, db: &Surreal, node_id: &str) -> Result<(), Error> { let _: Option = db.update((VM_NODE, node_id)).merge(self).await?; Ok(()) } } impl VmNode { pub async fn register(self, db: &Surreal) -> Result<(), Error> { let _: Option = db.upsert(self.id.clone()).content(self).await?; Ok(()) } } #[derive(Debug, Serialize, Deserialize)] pub struct VmNodeWithReports { pub id: RecordId, pub operator: RecordId, pub country: String, pub region: String, pub city: String, pub ip: String, pub avail_mem_mb: u32, pub avail_vcpus: u32, pub avail_storage_gbs: u32, pub avail_ipv4: u32, pub avail_ipv6: u32, pub avail_ports: u32, pub max_ports_per_vm: u32, pub price: u64, pub offline_minutes: u64, pub reports: Vec, } impl VmNodeWithReports { // TODO: find a more elegant way to do this than importing gRPC in the DB module // https://en.wikipedia.org/wiki/Dependency_inversion_principle pub async fn find_by_filters( db: &Surreal, filters: detee_shared::snp::pb::vm_proto::VmNodeFilters, ) -> Result, Error> { let mut query = format!( "select *, <-report.* as reports from {VM_NODE} where avail_ports >= {} && max_ports_per_vm >= {} && avail_ipv4 >= {} && avail_ipv6 >= {} && avail_vcpus >= {} && avail_mem_mb >= {} && avail_storage_gbs >= {}\n", filters.free_ports, filters.free_ports, filters.offers_ipv4 as u32, filters.offers_ipv6 as u32, filters.vcpus, filters.memory_mb, filters.storage_gb ); if !filters.city.is_empty() { query += &format!("&& city = '{}' ", filters.city); } if !filters.region.is_empty() { query += &format!("&& region = '{}' ", filters.region); } if !filters.country.is_empty() { query += &format!("&& country = '{}' ", filters.country); } if !filters.ip.is_empty() { query += &format!("&& ip = '{}' ", filters.ip); } query += ";"; let mut result = db.query(query).await?; let vm_nodes: Vec = result.take(0)?; Ok(vm_nodes) } } pub enum VmDaemonNotification { Create(NewVmReq), Update(UpdateVmReq), Delete(DeletedVm), } impl From for VmDaemonNotification { fn from(value: NewVmReq) -> Self { Self::Create(value) } } impl From for VmDaemonNotification { fn from(value: UpdateVmReq) -> Self { Self::Update(value) } } impl From for VmDaemonNotification { fn from(value: DeletedVm) -> Self { Self::Delete(value) } } #[derive(Debug, Serialize, Deserialize)] pub struct NewVmReq { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub vm_node: RecordId, pub hostname: String, pub extra_ports: Vec, pub public_ipv4: bool, pub public_ipv6: bool, pub disk_size_gb: u32, pub vcpus: u32, pub memory_mb: u32, pub dtrfs_url: String, pub dtrfs_sha: String, pub kernel_sha: String, pub kernel_url: String, pub created_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, pub error: String, } impl NewVmReq { pub async fn get(db: &Surreal, id: &str) -> Result, Error> { let new_vm_req: Option = db.select((NEW_VM_REQ, id)).await?; Ok(new_vm_req) } pub async fn delete(db: &Surreal, id: &str) -> Result<(), Error> { let _: Option = db.delete((NEW_VM_REQ, id)).await?; Ok(()) } pub async fn submit_error(db: &Surreal, id: &str, error: String) -> Result<(), Error> { #[derive(Serialize)] struct NewVmError { error: String, } let _: Option = db.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?; Ok(()) } pub async fn submit(self, db: &Surreal) -> Result<(), Error> { let _: Vec = db.insert(NEW_VM_REQ).relation(self).await?; Ok(()) } } /// first string is the vm_id pub enum NewVmResp { // TODO: find a more elegant way to do this than importing gRPC in the DB module // https://en.wikipedia.org/wiki/Dependency_inversion_principle Args(String, detee_shared::snp::pb::vm_proto::MeasurementArgs), Error(String, String), } impl NewVmResp { pub async fn listen(db: &Surreal, vm_id: &str) -> Result { let mut resp = db .query(format!("live select * from {NEW_VM_REQ} where id = {NEW_VM_REQ}:{vm_id};")) .query(format!( "live select * from measurement_args where id = measurement_args:{vm_id};" )) .await?; let mut new_vm_stream = resp.stream::>(0)?; let mut args_stream = resp.stream::>(1)?; tokio::time::timeout(Duration::from_secs(10), async { loop { tokio::select! { new_vm_req_notif = new_vm_stream.next() => { log::debug!("Got stream 1..."); if let Some(new_vm_req_notif) = new_vm_req_notif { match new_vm_req_notif { Ok(new_vm_req_notif) => { if new_vm_req_notif.action == surrealdb::Action::Update && !new_vm_req_notif.data.error.is_empty() { return Ok::(Self::Error(vm_id.to_string(), new_vm_req_notif.data.error)); }; }, Err(e) => return Err(e.into()), } } } args_notif = args_stream.next() => { if let Some(args_notif) = args_notif { match args_notif { Ok(args_notif) => { if args_notif.action == surrealdb::Action::Create { return Ok(Self::Args(vm_id.to_string(), args_notif.data)); }; }, Err(e) => return Err(e.into()), } } } } } }).await? } } #[derive(Debug, Serialize, Deserialize)] pub struct ActiveVm { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub vm_node: RecordId, pub hostname: String, pub mapped_ports: Vec<(u32, u32)>, pub public_ipv4: String, pub public_ipv6: String, pub disk_size_gb: u32, pub vcpus: u32, pub memory_mb: u32, pub dtrfs_sha: String, pub kernel_sha: String, pub created_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: Datetime, } impl ActiveVm { pub async fn activate( db: &Surreal, id: &str, args: detee_shared::vm_proto::MeasurementArgs, ) -> Result<(), Error> { let new_vm_req = match NewVmReq::get(db, id).await? { Some(r) => r, None => return Ok(()), }; let mut public_ipv4 = String::new(); let mut public_ipv6 = String::new(); for ip in args.ips.iter() { if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { public_ipv4 = ipv4_addr.to_string(); } continue; } if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) { public_ipv6 = ipv6_addr.to_string(); } } let mut mapped_ports = Vec::new(); let mut guest_ports = vec![22]; guest_ports.append(&mut args.exposed_ports.clone()); let mut i = 0; while i < new_vm_req.extra_ports.len() && i < guest_ports.len() { mapped_ports.push((args.exposed_ports[i], guest_ports[i])); i += 1; } let active_vm = ActiveVm { id: RecordId::from((ACTIVE_VM, id)), admin: new_vm_req.admin, vm_node: new_vm_req.vm_node, hostname: new_vm_req.hostname, mapped_ports, public_ipv4, public_ipv6, disk_size_gb: new_vm_req.disk_size_gb, vcpus: new_vm_req.vcpus, memory_mb: new_vm_req.memory_mb, dtrfs_sha: new_vm_req.dtrfs_sha, kernel_sha: new_vm_req.kernel_sha, created_at: new_vm_req.created_at.clone(), price_per_unit: new_vm_req.price_per_unit, locked_nano: new_vm_req.locked_nano, collected_at: new_vm_req.created_at, }; let _: Vec = db.insert(()).relation(active_vm).await?; NewVmReq::delete(db, id).await?; Ok(()) } } #[derive(Debug, Serialize, Deserialize)] pub struct UpdateVmReq { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub vm_node: RecordId, pub disk_size_gb: u32, pub vcpus: u32, pub memory_mb: u32, pub dtrfs_url: String, pub dtrfs_sha: String, pub kernel_sha: String, pub kernel_url: String, pub created_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, } #[derive(Debug, Serialize, Deserialize)] pub struct DeletedVm { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub vm_node: RecordId, pub hostname: String, pub mapped_ports: Vec<(u32, u32)>, pub public_ipv4: String, pub public_ipv6: String, pub disk_size_gb: u32, pub vcpus: u32, pub memory_mb: u32, pub dtrfs_sha: String, pub kernel_sha: String, pub created_at: Datetime, pub deleted_at: Datetime, pub price_per_unit: u64, } impl DeletedVm { pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { let contract: Option = db.query(format!("select * from {DELETED_VM}:{uuid};")).await?.take(0)?; Ok(contract) } pub async fn list_by_admin(db: &Surreal, admin: &str) -> Result, Error> { let mut result = db.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?; let contracts: Vec = result.take(0)?; Ok(contracts) } pub async fn list_by_node(db: &Surreal, admin: &str) -> Result, Error> { let mut result = db.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?; let contracts: Vec = result.take(0)?; Ok(contracts) } pub async fn list_by_operator( db: &Surreal, operator: &str, ) -> Result, Error> { let mut result = db .query(format!( "select (select * from ->operator->vm_node<-{DELETED_VM}) as contracts from {ACCOUNT}:{operator};" )) .await?; #[derive(Deserialize)] struct Wrapper { contracts: Vec, } let c: Option = result.take(0)?; match c { Some(c) => Ok(c.contracts), None => Ok(Vec::new()), } } /// total hardware units of this VM fn total_units(&self) -> u64 { // TODO: Optimize this based on price of hardware. // I tried, but this can be done better. // Storage cost should also be based on tier (self.vcpus as u64 * 10) + ((self.memory_mb + 256) as u64 / 200) + (self.disk_size_gb as u64 / 10) + (!self.public_ipv4.is_empty() as u64 * 10) } /// Returns price per minute in nanoLP pub fn price_per_minute(&self) -> u64 { self.total_units() * self.price_per_unit } } impl ActiveVm { /// total hardware units of this VM fn total_units(&self) -> u64 { // TODO: Optimize this based on price of hardware. // I tried, but this can be done better. // Storage cost should also be based on tier (self.vcpus as u64 * 10) + ((self.memory_mb + 256) as u64 / 200) + (self.disk_size_gb as u64 / 10) + (!self.public_ipv4.is_empty() as u64 * 10) } /// Returns price per minute in nanoLP pub fn price_per_minute(&self) -> u64 { self.total_units() * self.price_per_unit } } #[derive(Debug, Serialize, Deserialize)] pub struct ActiveVmWithNode { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub vm_node: VmNode, pub hostname: String, pub mapped_ports: Vec<(u32, u32)>, pub public_ipv4: String, pub public_ipv6: String, pub disk_size_gb: u32, pub vcpus: u32, pub memory_mb: u32, pub dtrfs_sha: String, pub kernel_sha: String, pub created_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: Datetime, } impl ActiveVmWithNode { pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { let contract: Option = db.query(format!("select * from {ACTIVE_VM}:{uuid} fetch out;")).await?.take(0)?; Ok(contract) } pub async fn list_by_admin(db: &Surreal, admin: &str) -> Result, Error> { let mut result = db .query(format!("select * from {ACTIVE_VM} where in = {ACCOUNT}:{admin} fetch out;")) .await?; let contracts: Vec = result.take(0)?; Ok(contracts) } pub async fn list_by_node(db: &Surreal, admin: &str) -> Result, Error> { let mut result = db .query(format!("select * from {ACTIVE_VM} where out = {VM_NODE}:{admin} fetch out;")) .await?; let contracts: Vec = result.take(0)?; Ok(contracts) } pub async fn list_by_operator( db: &Surreal, operator: &str, ) -> Result, Error> { let mut result = db .query(format!( "select (select * from ->operator->vm_node<-{ACTIVE_VM} fetch out) as contracts from {ACCOUNT}:{operator};" )) .await?; #[derive(Deserialize)] struct Wrapper { contracts: Vec, } let c: Option = result.take(0)?; match c { Some(c) => Ok(c.contracts), None => Ok(Vec::new()), } } /// total hardware units of this VM fn total_units(&self) -> u64 { // TODO: Optimize this based on price of hardware. // I tried, but this can be done better. // Storage cost should also be based on tier (self.vcpus as u64 * 10) + ((self.memory_mb + 256) as u64 / 200) + (self.disk_size_gb as u64 / 10) + (!self.public_ipv4.is_empty() as u64 * 10) } /// Returns price per minute in nanoLP pub fn price_per_minute(&self) -> u64 { self.total_units() * self.price_per_unit } } // TODO: delete all of these From implementation after migration 0 gets executed impl From<&old_brain::BrainData> for Vec { fn from(old_data: &old_brain::BrainData) -> Self { let mut nodes = Vec::new(); for old_node in old_data.vm_nodes.iter() { nodes.push(VmNode { id: RecordId::from((VM_NODE, old_node.public_key.clone())), operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), country: old_node.country.clone(), region: old_node.region.clone(), city: old_node.city.clone(), ip: old_node.ip.clone(), avail_mem_mb: old_node.avail_mem_mb, avail_vcpus: old_node.avail_vcpus, avail_storage_gbs: old_node.avail_storage_gbs, avail_ipv4: old_node.avail_ipv4, avail_ipv6: old_node.avail_ipv6, avail_ports: old_node.avail_ports, max_ports_per_vm: old_node.max_ports_per_vm, price: old_node.price, offline_minutes: old_node.offline_minutes, }); } nodes } } impl From<&old_brain::BrainData> for Vec { fn from(old_data: &old_brain::BrainData) -> Self { let mut contracts = Vec::new(); for old_c in old_data.vm_contracts.iter() { let mut mapped_ports = Vec::new(); for port in old_c.exposed_ports.iter() { mapped_ports.push((*port, 8080u32)); } contracts.push(ActiveVm { id: RecordId::from((ACTIVE_VM, old_c.uuid.replace("-", ""))), admin: RecordId::from((ACCOUNT, old_c.admin_pubkey.clone())), vm_node: RecordId::from((VM_NODE, old_c.node_pubkey.clone())), hostname: old_c.hostname.clone(), mapped_ports, public_ipv4: old_c.public_ipv4.clone(), public_ipv6: old_c.public_ipv6.clone(), disk_size_gb: old_c.disk_size_gb, vcpus: old_c.vcpus, memory_mb: old_c.memory_mb, dtrfs_sha: old_c.dtrfs_sha.clone(), kernel_sha: old_c.kernel_sha.clone(), price_per_unit: old_c.price_per_unit, locked_nano: old_c.locked_nano, created_at: old_c.created_at.into(), collected_at: old_c.collected_at.into(), }); } contracts } }