diff --git a/src/db.rs b/src/db.rs index 5df9bae..acfd965 100644 --- a/src/db.rs +++ b/src/db.rs @@ -9,6 +9,10 @@ use surrealdb::{ }; static DB: LazyLock> = LazyLock::new(Surreal::init); +const ACCOUNT: &str = "account"; +const OPERATOR: &str = "operator"; +const VM_CONTRACT: &str = "vm_contract"; +const VM_NODE: &str = "vm_node"; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -48,7 +52,7 @@ pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<() } pub async fn account(address: &str) -> Result { - let id = ("account", address); + let id = (ACCOUNT, address); let account: Option = DB.select(id).await?; let account = match account { Some(account) => account, @@ -59,6 +63,37 @@ pub async fn account(address: &str) -> Result { Ok(account) } +pub async fn vm_contract_by_uuid(uuid: &str) -> Result, Error> { + let id = (VM_CONTRACT, uuid); + let contract: Option = DB.select(id).await?; + Ok(contract) +} + +pub async fn vm_contracts_by_admin(admin: &str) -> Result, Error> { + let mut result = + DB.query(format!("select * from {VM_CONTRACT} where in = {ACCOUNT}:{admin};")).await?; + let contracts: Vec = result.take(0)?; + Ok(contracts) +} + +pub async fn vm_contracts_by_operator(operator: &str) -> Result, Error> { + let mut result = DB + .query(format!( + "select ->{OPERATOR}->{VM_NODE}<-{VM_CONTRACT}.* as contracts from {ACCOUNT}:{operator};" + )) + .await?; + + #[derive(Deserialize)] + struct Wrapper { + contracts: Vec, + } + let c: Option = result.take(0)?; + match c { + Some(c) => Ok(c.contracts), + None => Ok(Vec::new()), + } +} + // I am not deleting this example cause I might need it later. // // async fn get_wallet_contracts() -> surrealdb::Result> { @@ -106,26 +141,44 @@ pub struct VmNode { #[derive(Debug, Serialize, Deserialize)] pub struct VmContract { - id: RecordId, + pub id: RecordId, #[serde(rename = "in")] - admin: RecordId, + pub admin: RecordId, #[serde(rename = "out")] - vm_node: RecordId, - state: String, - hostname: String, - mapped_ports: Vec<(u32, u32)>, - public_ipv4: String, - public_ipv6: String, - disk_size_gb: u32, - vcpus: u32, - memory_mb: u32, - dtrfs_sha: String, - kernel_sha: String, - created_at: Datetime, - updated_at: Datetime, - price_per_unit: u64, - locked_nano: u64, - collected_at: Datetime, + pub vm_node: RecordId, + pub state: String, + pub hostname: String, + pub mapped_ports: Vec<(u32, u32)>, + pub public_ipv4: String, + pub public_ipv6: String, + pub disk_size_gb: u32, + pub vcpus: u32, + pub memory_mb: u32, + pub dtrfs_sha: String, + pub kernel_sha: String, + pub created_at: Datetime, + pub updated_at: Datetime, + pub price_per_unit: u64, + pub locked_nano: u64, + pub collected_at: Datetime, +} + +impl VmContract { + /// total hardware units of this VM + fn total_units(&self) -> u64 { + // TODO: Optimize this based on price of hardware. + // I tried, but this can be done better. + // Storage cost should also be based on tier + (self.vcpus as u64 * 10) + + ((self.memory_mb + 256) as u64 / 200) + + (self.disk_size_gb as u64 / 10) + + (!self.public_ipv4.is_empty() as u64 * 10) + } + + /// Returns price per minute in nanoLP + pub fn price_per_minute(&self) -> u64 { + self.total_units() * self.price_per_unit + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/grpc.rs b/src/grpc.rs index 08ff394..5acb32e 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -13,6 +13,8 @@ use detee_shared::{ use log::info; use std::pin::Pin; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; // use tokio::sync::mpsc; // use tokio_stream::{wrappers::ReceiverStream, Stream}; use tokio_stream::Stream; @@ -26,6 +28,34 @@ impl From for AccountBalance { } } +impl From for VmContract { + fn from(contract: db::VmContract) -> Self { + let mut exposed_ports = Vec::new(); + for port in contract.mapped_ports.iter() { + exposed_ports.push(port.0); + } + VmContract { + uuid: contract.id.key().to_string(), + dtrfs_sha: contract.dtrfs_sha.clone(), + kernel_sha: contract.kernel_sha.clone(), + memory_mb: contract.memory_mb, + vcpus: contract.vcpus, + created_at: contract.created_at.to_string(), + disk_size_gb: contract.disk_size_gb, + public_ipv6: contract.public_ipv6.clone(), + public_ipv4: contract.public_ipv4.clone(), + updated_at: contract.updated_at.to_string(), + exposed_ports, + node_pubkey: contract.vm_node.key().to_string(), + nano_per_minute: contract.price_per_minute(), + locked_nano: contract.locked_nano, + admin_pubkey: contract.admin.key().to_string(), + hostname: contract.hostname.clone(), + collected_at: contract.hostname.clone(), + } + } +} + impl From for tonic::Status { fn from(e: db::Error) -> Self { Self::internal(format!("Internal error: {e}")) @@ -281,32 +311,29 @@ impl BrainVmCli for BrainVmCliMock { "CLI {} requested ListVMVmContractsStream. As operator: {}", req.wallet, req.as_operator ); - todo!(); - // let mut contracts = Vec::new(); - // if !req.uuid.is_empty() { - // if let Ok(specific_contract) = self.data.find_contract_by_uuid(&req.uuid) { - // if specific_contract.admin_pubkey == req.wallet { - // contracts.push(specific_contract); - // } - // // TODO: allow operator to inspect contracts - // } - // } else { - // if req.as_operator { - // contracts.append(&mut self.data.find_vm_contracts_by_operator(&req.wallet)); - // } else { - // contracts.append(&mut self.data.find_vm_contracts_by_admin(&req.wallet)); - // } - // } - // let (tx, rx) = mpsc::channel(6); - // tokio::spawn(async move { - // for contract in contracts { - // let _ = tx.send(Ok(contract.into())).await; - // } - // }); - // let output_stream = ReceiverStream::new(rx); - // Ok(Response::new( - // Box::pin(output_stream) as Self::ListVmContractsStream - // )) + let mut contracts = Vec::new(); + if !req.uuid.is_empty() { + if let Some(specific_contract) = db::vm_contract_by_uuid(&req.uuid).await? { + if specific_contract.admin.key().to_string() == req.wallet { + contracts.push(specific_contract.into()); + } + // TODO: allow operator to inspect contracts + } + } else { + if req.as_operator { + contracts.append(&mut db::vm_contracts_by_operator(&req.wallet).await?.into()); + } else { + contracts.append(&mut db::vm_contracts_by_admin(&req.wallet).await?.into()); + } + } + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for contract in contracts { + let _ = tx.send(Ok(contract.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(output_stream) as Self::ListVmContractsStream)) } async fn list_vm_nodes(