diff --git a/interim_tables.surql b/interim_tables.surql index b02b0b9..662dea2 100644 --- a/interim_tables.surql +++ b/interim_tables.surql @@ -20,22 +20,60 @@ DEFINE FIELD max_ports_per_vm ON TABLE vm_node TYPE int; DEFINE FIELD price ON TABLE vm_node TYPE int; DEFINE FIELD offline_minutes ON TABLE vm_node TYPE int; -DEFINE TABLE vm_contract TYPE RELATION FROM account TO vm_node SCHEMAFULL; -DEFINE FIELD state ON TABLE vm_contract TYPE string; -DEFINE FIELD hostname ON TABLE vm_contract TYPE string; -DEFINE FIELD mapped_ports ON TABLE vm_contract TYPE array<[int, int]>; -DEFINE FIELD public_ipv4 ON TABLE vm_contract TYPE string; -DEFINE FIELD public_ipv6 ON TABLE vm_contract TYPE string; -DEFINE FIELD disk_size_gb ON TABLE vm_contract TYPE int; -DEFINE FIELD vcpus ON TABLE vm_contract TYPE int; -DEFINE FIELD memory_mb ON TABLE vm_contract TYPE int; -DEFINE FIELD dtrfs_sha ON TABLE vm_contract TYPE string; -DEFINE FIELD kernel_sha ON TABLE vm_contract TYPE string; -DEFINE FIELD created_at ON TABLE vm_contract TYPE datetime; -DEFINE FIELD updated_at ON TABLE vm_contract TYPE datetime; -DEFINE FIELD price_per_unit ON TABLE vm_contract TYPE int; -DEFINE FIELD locked_nano ON TABLE vm_contract TYPE int; -DEFINE FIELD collected_at ON TABLE vm_contract TYPE datetime; +DEFINE TABLE new_vm_req TYPE RELATION FROM account TO vm_node SCHEMAFULL; +DEFINE FIELD hostname ON TABLE new_vm_req TYPE string; +DEFINE FIELD extra_ports ON TABLE new_vm_req TYPE array; +DEFINE FIELD public_ipv4 ON TABLE new_vm_req TYPE bool; +DEFINE FIELD public_ipv6 ON TABLE new_vm_req TYPE bool; +DEFINE FIELD disk_size_gb ON TABLE new_vm_req TYPE int; +DEFINE FIELD vcpus ON TABLE new_vm_req TYPE int; +DEFINE FIELD memory_mb ON TABLE new_vm_req TYPE int; +DEFINE FIELD dtrfs_sha ON TABLE new_vm_req TYPE string; +DEFINE FIELD dtrfs_url ON TABLE new_vm_req TYPE string; +DEFINE FIELD kernel_sha ON TABLE new_vm_req TYPE string; +DEFINE FIELD kernel_url ON TABLE new_vm_req TYPE string; +DEFINE FIELD created_at ON TABLE new_vm_req TYPE datetime; +DEFINE FIELD updated_at ON TABLE new_vm_req TYPE datetime; +DEFINE FIELD price_per_unit ON TABLE new_vm_req TYPE int; +DEFINE FIELD locked_nano ON TABLE new_vm_req TYPE int; + +DEFINE TABLE active_vm TYPE RELATION FROM account TO vm_node SCHEMAFULL; +DEFINE FIELD hostname ON TABLE active_vm TYPE string; +DEFINE FIELD mapped_ports ON TABLE active_vm TYPE array<[int, int]>; +DEFINE FIELD public_ipv4 ON TABLE active_vm TYPE string; +DEFINE FIELD public_ipv6 ON TABLE active_vm TYPE string; +DEFINE FIELD disk_size_gb ON TABLE active_vm TYPE int; +DEFINE FIELD vcpus ON TABLE active_vm TYPE int; +DEFINE FIELD memory_mb ON TABLE active_vm TYPE int; +DEFINE FIELD dtrfs_sha ON TABLE active_vm TYPE string; +DEFINE FIELD kernel_sha ON TABLE active_vm TYPE string; +DEFINE FIELD created_at ON TABLE active_vm TYPE datetime; +DEFINE FIELD price_per_unit ON TABLE active_vm TYPE int; +DEFINE FIELD locked_nano ON TABLE active_vm TYPE int; +DEFINE FIELD collected_at ON TABLE active_vm TYPE datetime; + +DEFINE TABLE update_vm_req TYPE RELATION FROM account TO vm_node SCHEMAFULL; +DEFINE FIELD vcpus ON TABLE update_vm_req TYPE int; +DEFINE FIELD memory_mb ON TABLE update_vm_req TYPE int; +DEFINE FIELD disk_size_gb ON TABLE update_vm_req TYPE int; +DEFINE FIELD dtrfs_sha ON TABLE update_vm_req TYPE string; +DEFINE FIELD dtrfs_url ON TABLE update_vm_req TYPE string; +DEFINE FIELD kernel_sha ON TABLE update_vm_req TYPE string; +DEFINE FIELD kernel_url ON TABLE update_vm_req TYPE string; + +DEFINE TABLE deleted_vm TYPE RELATION FROM account TO vm_node SCHEMAFULL; +DEFINE FIELD hostname ON TABLE deleted_vm TYPE string; +DEFINE FIELD mapped_ports ON TABLE deleted_vm TYPE array<[int, int]>; +DEFINE FIELD public_ipv4 ON TABLE deleted_vm TYPE string; +DEFINE FIELD public_ipv6 ON TABLE deleted_vm TYPE string; +DEFINE FIELD disk_size_gb ON TABLE deleted_vm TYPE int; +DEFINE FIELD vcpus ON TABLE deleted_vm TYPE int; +DEFINE FIELD memory_mb ON TABLE deleted_vm TYPE int; +DEFINE FIELD dtrfs_sha ON TABLE deleted_vm TYPE string; +DEFINE FIELD kernel_sha ON TABLE deleted_vm TYPE string; +DEFINE FIELD created_at ON TABLE deleted_vm TYPE datetime; +DEFINE FIELD deleted_at ON TABLE deleted_vm TYPE datetime; +DEFINE FIELD price_per_unit ON TABLE deleted_vm TYPE int; DEFINE TABLE app_node SCHEMAFULL; DEFINE FIELD operator ON TABLE app_node TYPE record; @@ -51,22 +89,36 @@ DEFINE FIELD max_ports_per_app ON TABLE app_node TYPE int; DEFINE FIELD price ON TABLE app_node TYPE int; DEFINE FIELD offline_minutes ON TABLE app_node TYPE int; -DEFINE TABLE app_contract TYPE RELATION FROM account TO app_node SCHEMAFULL; -DEFINE FIELD state ON TABLE app_contract TYPE string; -DEFINE FIELD app_name ON TABLE app_contract TYPE string; -DEFINE FIELD mapped_ports ON TABLE app_contract TYPE array<[int, int]>; -DEFINE FIELD host_ipv4 ON TABLE app_contract TYPE string; -DEFINE FIELD vcpus ON TABLE app_contract TYPE int; -DEFINE FIELD memory_mb ON TABLE app_contract TYPE int; -DEFINE FIELD disk_size_gb ON TABLE app_contract TYPE int; -DEFINE FIELD created_at ON TABLE app_contract TYPE datetime; -DEFINE FIELD updated_at ON TABLE app_contract TYPE datetime; -DEFINE FIELD price_per_unit ON TABLE app_contract TYPE int; -DEFINE FIELD locked_nano ON TABLE app_contract TYPE int; -DEFINE FIELD collected_at ON TABLE app_contract TYPE datetime; -DEFINE FIELD mr_enclave ON TABLE app_contract TYPE string; -DEFINE FIELD package_url ON TABLE app_contract TYPE string; -DEFINE FIELD hratls_pubkey ON TABLE app_contract TYPE string; +DEFINE TABLE active_app TYPE RELATION FROM account TO app_node SCHEMAFULL; +DEFINE FIELD app_name ON TABLE active_app TYPE string; +DEFINE FIELD mapped_ports ON TABLE active_app TYPE array<[int, int]>; +DEFINE FIELD host_ipv4 ON TABLE active_app TYPE string; +DEFINE FIELD vcpus ON TABLE active_app TYPE int; +DEFINE FIELD memory_mb ON TABLE active_app TYPE int; +DEFINE FIELD disk_size_gb ON TABLE active_app TYPE int; +DEFINE FIELD created_at ON TABLE active_app TYPE datetime; +DEFINE FIELD price_per_unit ON TABLE active_app TYPE int; +DEFINE FIELD locked_nano ON TABLE active_app TYPE int; +DEFINE FIELD collected_at ON TABLE active_app TYPE datetime; +DEFINE FIELD mr_enclave ON TABLE active_app TYPE string; +DEFINE FIELD package_url ON TABLE active_app TYPE string; +DEFINE FIELD hratls_pubkey ON TABLE active_app TYPE string; + +DEFINE TABLE deleted_app TYPE RELATION FROM account TO app_node SCHEMAFULL; +DEFINE FIELD app_name ON TABLE deleted_app TYPE string; +DEFINE FIELD mapped_ports ON TABLE deleted_app TYPE array<[int, int]>; +DEFINE FIELD host_ipv4 ON TABLE deleted_app TYPE string; +DEFINE FIELD vcpus ON TABLE deleted_app TYPE int; +DEFINE FIELD memory_mb ON TABLE deleted_app TYPE int; +DEFINE FIELD disk_size_gb ON TABLE deleted_app TYPE int; +DEFINE FIELD created_at ON TABLE deleted_app TYPE datetime; +DEFINE FIELD deleted_at ON TABLE deleted_app TYPE datetime; +DEFINE FIELD price_per_unit ON TABLE deleted_app TYPE int; +DEFINE FIELD locked_nano ON TABLE deleted_app TYPE int; +DEFINE FIELD collected_at ON TABLE deleted_app TYPE datetime; +DEFINE FIELD mr_enclave ON TABLE deleted_app TYPE string; +DEFINE FIELD package_url ON TABLE deleted_app TYPE string; +DEFINE FIELD hratls_pubkey ON TABLE deleted_app TYPE string; DEFINE TABLE ban TYPE RELATION FROM account TO account; DEFINE FIELD created_at ON TABLE ban TYPE datetime; @@ -74,7 +126,7 @@ DEFINE FIELD created_at ON TABLE ban TYPE datetime; DEFINE TABLE kick TYPE RELATION FROM account TO account; DEFINE FIELD created_at ON TABLE kick TYPE datetime; DEFINE FIELD reason ON TABLE kick TYPE string; -DEFINE FIELD contract ON TABLE kick TYPE record; +DEFINE FIELD contract ON TABLE kick TYPE record; DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node; DEFINE FIELD created_at ON TABLE report TYPE datetime; diff --git a/src/db.rs b/src/db.rs index fdee374..39f41e0 100644 --- a/src/db.rs +++ b/src/db.rs @@ -5,18 +5,25 @@ use surrealdb::{ engine::remote::ws::{Client, Ws}, opt::auth::Root, sql::Datetime, - RecordId, Surreal, + Notification, RecordId, Surreal, }; +use tokio::sync::mpsc::Sender; +use tokio_stream::StreamExt as _; static DB: LazyLock> = LazyLock::new(Surreal::init); pub const ACCOUNT: &str = "account"; -pub const VM_CONTRACT: &str = "vm_contract"; pub const VM_NODE: &str = "vm_node"; +pub const ACTIVE_VM: &str = "active_vm"; +pub const NEW_VM_REQ: &str = "new_vm_req"; +pub const UPDATE_VM_REQ: &str = "update_vm_req"; +pub const DELETED_VM: &str = "deleted_vm"; #[derive(thiserror::Error, Debug)] pub enum Error { - #[error(transparent)] + #[error("Internal DB error: {0}")] DataBase(#[from] surrealdb::Error), + #[error("Daemon channel got closed: {0}")] + DaemonConnection(#[from] tokio::sync::mpsc::error::SendError), } pub async fn init() -> surrealdb::Result<()> { @@ -31,7 +38,7 @@ pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<() let accounts: Vec = old_data.into(); let vm_nodes: Vec = old_data.into(); let app_nodes: Vec = old_data.into(); - let vm_contracts: Vec = old_data.into(); + let vm_contracts: Vec = old_data.into(); init().await?; @@ -42,7 +49,7 @@ pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<() println!("Inserting app nodes..."); let _: Vec = DB.insert(()).content(app_nodes).await?; println!("Inserting vm contracts..."); - let _: Vec = DB.insert("vm_contract").relation(vm_contracts).await?; + let _: Vec = DB.insert("vm_contract").relation(vm_contracts).await?; Ok(()) } @@ -124,14 +131,60 @@ pub struct VmNodeWithReports { pub reports: Vec, } +pub enum DaemonNotification { + Create(NewVmReq), + Update(UpdateVmReq), + Delete(DeletedVm), +} + +impl From for DaemonNotification { + fn from(value: NewVmReq) -> Self { + Self::Create(value) + } +} + +impl From for DaemonNotification { + fn from(value: UpdateVmReq) -> Self { + Self::Update(value) + } +} + +impl From for DaemonNotification { + fn from(value: DeletedVm) -> Self { + Self::Delete(value) + } +} + #[derive(Debug, Serialize, Deserialize)] -pub struct VmContract { +pub struct NewVmReq { + pub id: RecordId, + #[serde(rename = "in")] + pub admin: RecordId, + #[serde(rename = "out")] + pub vm_node: RecordId, + pub hostname: String, + pub extra_ports: Vec, + pub public_ipv4: bool, + pub public_ipv6: bool, + pub disk_size_gb: u32, + pub vcpus: u32, + pub memory_mb: u32, + pub dtrfs_url: String, + pub dtrfs_sha: String, + pub kernel_sha: String, + pub kernel_url: String, + pub created_at: Datetime, + pub price_per_unit: u64, + pub locked_nano: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ActiveVm { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] pub vm_node: RecordId, - pub state: String, pub hostname: String, pub mapped_ports: Vec<(u32, u32)>, pub public_ipv4: String, @@ -142,13 +195,124 @@ pub struct VmContract { pub dtrfs_sha: String, pub kernel_sha: String, pub created_at: Datetime, - pub updated_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: Datetime, } -impl VmContract { +#[derive(Debug, Serialize, Deserialize)] +pub struct UpdateVmReq { + pub id: RecordId, + #[serde(rename = "in")] + pub admin: RecordId, + #[serde(rename = "out")] + pub vm_node: RecordId, + pub disk_size_gb: u32, + pub vcpus: u32, + pub memory_mb: u32, + pub dtrfs_url: String, + pub dtrfs_sha: String, + pub kernel_sha: String, + pub kernel_url: String, + pub created_at: Datetime, + pub price_per_unit: u64, + pub locked_nano: u64, +} + +pub async fn listen_for_node + std::marker::Unpin + for<'de> Deserialize<'de>>( + node: &str, + tx: Sender, +) -> Result<(), Error> { + let table_name = match std::any::type_name::() { + "surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(), + "surreal_brain::db::UpdateVmReq" => UPDATE_VM_REQ.to_string(), + "surreal_brain::db::DeletedVm" => DELETED_VM.to_string(), + wat => { + log::error!("listen_for_node: T has type {wat}"); + String::from("wat") + }, + }; + let mut resp = + DB.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; + let mut live_stream = resp.stream::>(0)?; + while let Some(result) = live_stream.next().await { + match result { + Ok(notification) => match notification.action { + surrealdb::Action::Create => tx.send(notification.data.into()).await?, + _ => {} + }, + Err(e) => { + log::warn!("listen_for_deletion DB stream failed for {node}: {e}"); + return Err(Error::from(e)); + } + } + } + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeletedVm { + pub id: RecordId, + #[serde(rename = "in")] + pub admin: RecordId, + #[serde(rename = "out")] + pub vm_node: RecordId, + pub hostname: String, + pub mapped_ports: Vec<(u32, u32)>, + pub public_ipv4: String, + pub public_ipv6: String, + pub disk_size_gb: u32, + pub vcpus: u32, + pub memory_mb: u32, + pub dtrfs_sha: String, + pub kernel_sha: String, + pub created_at: Datetime, + pub deleted_at: Datetime, + pub price_per_unit: u64, +} + +impl DeletedVm { + pub async fn get_by_uuid(uuid: &str) -> Result, Error> { + let contract: Option = + DB.query(format!("select * from {DELETED_VM}:{uuid};")).await?.take(0)?; + Ok(contract) + } + + pub async fn list_by_admin(admin: &str) -> Result, Error> { + let mut result = + DB.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?; + let contracts: Vec = result.take(0)?; + Ok(contracts) + } + + pub async fn list_by_node(admin: &str) -> Result, Error> { + let mut result = + DB.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?; + let contracts: Vec = result.take(0)?; + Ok(contracts) + } + + pub async fn list_by_operator(operator: &str) -> Result, Error> { + let mut result = DB + .query(format!( + "select + (select * from ->operator->vm_node<-{DELETED_VM}) as contracts + from {ACCOUNT}:{operator};" + )) + .await?; + + #[derive(Deserialize)] + struct Wrapper { + contracts: Vec, + } + + let c: Option = result.take(0)?; + match c { + Some(c) => Ok(c.contracts), + None => Ok(Vec::new()), + } + } + /// total hardware units of this VM fn total_units(&self) -> u64 { // TODO: Optimize this based on price of hardware. @@ -166,10 +330,27 @@ impl VmContract { } } -impl VmContract {} +impl ActiveVm { + /// total hardware units of this VM + fn total_units(&self) -> u64 { + // TODO: Optimize this based on price of hardware. + // I tried, but this can be done better. + // Storage cost should also be based on tier + (self.vcpus as u64 * 10) + + ((self.memory_mb + 256) as u64 / 200) + + (self.disk_size_gb as u64 / 10) + + (!self.public_ipv4.is_empty() as u64 * 10) + } + + /// Returns price per minute in nanoLP + pub fn price_per_minute(&self) -> u64 { + self.total_units() * self.price_per_unit + } + +} #[derive(Debug, Serialize, Deserialize)] -pub struct VmContractWithNode { +pub struct ActiveVmWithNode { pub id: RecordId, #[serde(rename = "in")] pub admin: RecordId, @@ -192,16 +373,16 @@ pub struct VmContractWithNode { pub collected_at: Datetime, } -impl VmContractWithNode { +impl ActiveVmWithNode { pub async fn get_by_uuid(uuid: &str) -> Result, Error> { let contract: Option = - DB.query(format!("select * from {VM_CONTRACT}:{uuid} fetch out;")).await?.take(0)?; + DB.query(format!("select * from {ACTIVE_VM}:{uuid} fetch out;")).await?.take(0)?; Ok(contract) } pub async fn list_by_admin(admin: &str) -> Result, Error> { let mut result = DB - .query(format!("select * from {VM_CONTRACT} where in = {ACCOUNT}:{admin} fetch out;")) + .query(format!("select * from {ACTIVE_VM} where in = {ACCOUNT}:{admin} fetch out;")) .await?; let contracts: Vec = result.take(0)?; Ok(contracts) @@ -209,7 +390,7 @@ impl VmContractWithNode { pub async fn list_by_node(admin: &str) -> Result, Error> { let mut result = DB - .query(format!("select * from {VM_CONTRACT} where out = {VM_NODE}:{admin} fetch out;")) + .query(format!("select * from {ACTIVE_VM} where out = {VM_NODE}:{admin} fetch out;")) .await?; let contracts: Vec = result.take(0)?; Ok(contracts) @@ -219,14 +400,14 @@ impl VmContractWithNode { let mut result = DB .query(format!( "select - (select * from ->operator->vm_node<-vm_contract fetch out) as contracts + (select * from ->operator->vm_node<-{ACTIVE_VM} fetch out) as contracts from {ACCOUNT}:{operator};" )) .await?; #[derive(Deserialize)] struct Wrapper { - contracts: Vec, + contracts: Vec, } let c: Option = result.take(0)?; @@ -461,7 +642,7 @@ impl From<&old_brain::BrainData> for Vec { } } -impl From<&old_brain::BrainData> for Vec { +impl From<&old_brain::BrainData> for Vec { fn from(old_data: &old_brain::BrainData) -> Self { let mut contracts = Vec::new(); for old_c in old_data.vm_contracts.iter() { @@ -469,15 +650,10 @@ impl From<&old_brain::BrainData> for Vec { for port in old_c.exposed_ports.iter() { mapped_ports.push((*port, 8080 as u32)); } - contracts.push(VmContract { - id: RecordId::from(( - VM_CONTRACT, - old_c.node_pubkey.chars().take(20).collect::() - + &old_c.uuid.replace("-", "").chars().take(20).collect::(), - )), + contracts.push(ActiveVm { + id: RecordId::from((ACTIVE_VM, old_c.uuid.replace("-", ""))), admin: RecordId::from((ACCOUNT, old_c.admin_pubkey.clone())), vm_node: RecordId::from((VM_NODE, old_c.node_pubkey.clone())), - state: "active".to_string(), hostname: old_c.hostname.clone(), mapped_ports, public_ipv4: old_c.public_ipv4.clone(), @@ -490,7 +666,6 @@ impl From<&old_brain::BrainData> for Vec { price_per_unit: old_c.price_per_unit, locked_nano: old_c.locked_nano, created_at: old_c.created_at.into(), - updated_at: old_c.updated_at.into(), collected_at: old_c.collected_at.into(), }); } diff --git a/src/grpc.rs b/src/grpc.rs index 15baba9..29366f5 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -18,9 +18,7 @@ use log::info; use std::pin::Pin; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -// use tokio::sync::mpsc; -// use tokio_stream::{wrappers::ReceiverStream, Stream}; -use tokio_stream::Stream; +use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; pub struct BrainGeneralCliForReal {} @@ -31,8 +29,74 @@ impl From for AccountBalance { } } -impl From for VmContract { - fn from(db_c: db::VmContractWithNode) -> Self { +impl From for NewVmReq { + fn from(new_vm_req: db::NewVmReq) -> Self { + Self { + uuid: new_vm_req.id.key().to_string(), + hostname: new_vm_req.hostname, + admin_pubkey: new_vm_req.admin.key().to_string(), + node_pubkey: new_vm_req.vm_node.key().to_string(), + extra_ports: new_vm_req.extra_ports, + public_ipv4: new_vm_req.public_ipv4, + public_ipv6: new_vm_req.public_ipv6, + disk_size_gb: new_vm_req.disk_size_gb, + vcpus: new_vm_req.vcpus, + memory_mb: new_vm_req.memory_mb, + kernel_url: new_vm_req.kernel_url, + kernel_sha: new_vm_req.kernel_sha, + dtrfs_url: new_vm_req.dtrfs_url, + dtrfs_sha: new_vm_req.dtrfs_sha, + price_per_unit: new_vm_req.price_per_unit, + locked_nano: new_vm_req.locked_nano, + } + } +} + +impl From for UpdateVmReq { + fn from(update_vm_req: db::UpdateVmReq) -> Self { + Self { + uuid: update_vm_req.id.key().to_string(), + // daemon does not care about VM hostname + hostname: String::new(), + admin_pubkey: update_vm_req.admin.key().to_string(), + disk_size_gb: update_vm_req.disk_size_gb, + vcpus: update_vm_req.vcpus, + memory_mb: update_vm_req.memory_mb, + kernel_url: update_vm_req.kernel_url, + kernel_sha: update_vm_req.kernel_sha, + dtrfs_url: update_vm_req.dtrfs_url, + dtrfs_sha: update_vm_req.dtrfs_sha, + } + } +} + +impl From for DeleteVmReq { + fn from(delete_vm_req: db::DeletedVm) -> Self { + Self { + uuid: delete_vm_req.id.key().to_string(), + admin_pubkey: delete_vm_req.admin.key().to_string(), + } + } +} + +impl From for BrainVmMessage { + fn from(notification: db::DaemonNotification) -> Self { + match notification { + db::DaemonNotification::Create(new_vm_req) => { + BrainVmMessage { msg: Some(brain_vm_message::Msg::NewVmReq(new_vm_req.into())) } + } + db::DaemonNotification::Update(update_vm_req) => BrainVmMessage { + msg: Some(brain_vm_message::Msg::UpdateVmReq(update_vm_req.into())), + }, + db::DaemonNotification::Delete(deleted_vm) => { + BrainVmMessage { msg: Some(brain_vm_message::Msg::DeleteVm(deleted_vm.into())) } + } + } + } +} + +impl From for VmContract { + fn from(db_c: db::ActiveVmWithNode) -> Self { let mut exposed_ports = Vec::new(); for port in db_c.mapped_ports.iter() { exposed_ports.push(port.0); @@ -149,7 +213,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal { .await?; info!("Sending existing contracts to {}", req.node_pubkey); - let contracts = db::VmContractWithNode::list_by_node(&req.node_pubkey).await?; + let contracts = db::ActiveVmWithNode::list_by_node(&req.node_pubkey).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { @@ -165,7 +229,6 @@ impl BrainVmDaemon for BrainVmDaemonForReal { &self, req: Request, ) -> Result, Status> { - todo!(); let auth = req.into_inner(); let pubkey = auth.pubkey.clone(); check_sig_from_parts( @@ -175,29 +238,37 @@ impl BrainVmDaemon for BrainVmDaemonForReal { &auth.signature, )?; info!("Daemon {} connected to receive brain messages", pubkey); - // A surreql query that listens live for changes: - // live select * from vm_contract where meta::id(id) in "3zRxiGRnf46vd3zAEmpa".."3zRxiGRnf46vd3zAEmpb"; - // - // And the same from Rust: - // let mut stream = db - // .select("vm_contract") - // .range("3zRxiGRnf46vd3zAEmpa".."3zRxiGRnf46vd3zAEmpb") - // .live() - // .await?; - // - // fn handle(result: Result, surrealdb::Error>) { - // println!("Received notification: {:?}", result); - // } - // - // while let Some(result) = stream.next().await { - // handle(result); - // } - // - // let (tx, rx) = mpsc::channel(6); - //self.data.add_daemon_tx(&pubkey, tx); - //let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg)); - //Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream)) + let (tx, rx) = mpsc::channel(6); + { + let pubkey = pubkey.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + match db::listen_for_node::(&pubkey, tx).await { + Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"), + Err(e) => { + log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}") + } + }; + }); + } + { + let pubkey = pubkey.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + let _ = db::listen_for_node::(&pubkey, tx.clone()).await; + }); + } + { + let pubkey = pubkey.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + let _ = db::listen_for_node::(&pubkey, tx.clone()).await; + }); + } + + let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into())); + Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream)) } async fn daemon_messages( @@ -267,7 +338,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal { async fn report_node(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; - let (account, node) = match db::VmContractWithNode::get_by_uuid(&req.contract).await? { + let (account, node) = match db::ActiveVmWithNode::get_by_uuid(&req.contract).await? { Some(vm_contract) if vm_contract.admin.key().to_string() == req.admin_pubkey && vm_contract.vm_node.id.key().to_string() == req.node_pubkey => @@ -499,7 +570,7 @@ impl BrainVmCli for BrainVmCliForReal { ); let mut contracts = Vec::new(); if !req.uuid.is_empty() { - if let Some(specific_contract) = db::VmContractWithNode::get_by_uuid(&req.uuid).await? { + if let Some(specific_contract) = db::ActiveVmWithNode::get_by_uuid(&req.uuid).await? { if specific_contract.admin.key().to_string() == req.wallet { contracts.push(specific_contract.into()); } @@ -507,12 +578,11 @@ impl BrainVmCli for BrainVmCliForReal { } } else { if req.as_operator { - contracts.append( - &mut db::VmContractWithNode::list_by_operator(&req.wallet).await?.into(), - ); + contracts + .append(&mut db::ActiveVmWithNode::list_by_operator(&req.wallet).await?.into()); } else { contracts - .append(&mut db::VmContractWithNode::list_by_admin(&req.wallet).await?.into()); + .append(&mut db::ActiveVmWithNode::list_by_admin(&req.wallet).await?.into()); } } let (tx, rx) = mpsc::channel(6);