added code to send brain_messages to vm daemon

This commit is contained in:
ghe0 2025-04-26 04:44:28 +03:00
parent 6a99c146ce
commit 363724e5d7
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
3 changed files with 391 additions and 94 deletions

@ -20,22 +20,60 @@ DEFINE FIELD max_ports_per_vm ON TABLE vm_node TYPE int;
DEFINE FIELD price ON TABLE vm_node TYPE int; DEFINE FIELD price ON TABLE vm_node TYPE int;
DEFINE FIELD offline_minutes ON TABLE vm_node TYPE int; DEFINE FIELD offline_minutes ON TABLE vm_node TYPE int;
DEFINE TABLE vm_contract TYPE RELATION FROM account TO vm_node SCHEMAFULL; DEFINE TABLE new_vm_req TYPE RELATION FROM account TO vm_node SCHEMAFULL;
DEFINE FIELD state ON TABLE vm_contract TYPE string; DEFINE FIELD hostname ON TABLE new_vm_req TYPE string;
DEFINE FIELD hostname ON TABLE vm_contract TYPE string; DEFINE FIELD extra_ports ON TABLE new_vm_req TYPE array<int>;
DEFINE FIELD mapped_ports ON TABLE vm_contract TYPE array<[int, int]>; DEFINE FIELD public_ipv4 ON TABLE new_vm_req TYPE bool;
DEFINE FIELD public_ipv4 ON TABLE vm_contract TYPE string; DEFINE FIELD public_ipv6 ON TABLE new_vm_req TYPE bool;
DEFINE FIELD public_ipv6 ON TABLE vm_contract TYPE string; DEFINE FIELD disk_size_gb ON TABLE new_vm_req TYPE int;
DEFINE FIELD disk_size_gb ON TABLE vm_contract TYPE int; DEFINE FIELD vcpus ON TABLE new_vm_req TYPE int;
DEFINE FIELD vcpus ON TABLE vm_contract TYPE int; DEFINE FIELD memory_mb ON TABLE new_vm_req TYPE int;
DEFINE FIELD memory_mb ON TABLE vm_contract TYPE int; DEFINE FIELD dtrfs_sha ON TABLE new_vm_req TYPE string;
DEFINE FIELD dtrfs_sha ON TABLE vm_contract TYPE string; DEFINE FIELD dtrfs_url ON TABLE new_vm_req TYPE string;
DEFINE FIELD kernel_sha ON TABLE vm_contract TYPE string; DEFINE FIELD kernel_sha ON TABLE new_vm_req TYPE string;
DEFINE FIELD created_at ON TABLE vm_contract TYPE datetime; DEFINE FIELD kernel_url ON TABLE new_vm_req TYPE string;
DEFINE FIELD updated_at ON TABLE vm_contract TYPE datetime; DEFINE FIELD created_at ON TABLE new_vm_req TYPE datetime;
DEFINE FIELD price_per_unit ON TABLE vm_contract TYPE int; DEFINE FIELD updated_at ON TABLE new_vm_req TYPE datetime;
DEFINE FIELD locked_nano ON TABLE vm_contract TYPE int; DEFINE FIELD price_per_unit ON TABLE new_vm_req TYPE int;
DEFINE FIELD collected_at ON TABLE vm_contract TYPE datetime; DEFINE FIELD locked_nano ON TABLE new_vm_req TYPE int;
DEFINE TABLE active_vm TYPE RELATION FROM account TO vm_node SCHEMAFULL;
DEFINE FIELD hostname ON TABLE active_vm TYPE string;
DEFINE FIELD mapped_ports ON TABLE active_vm TYPE array<[int, int]>;
DEFINE FIELD public_ipv4 ON TABLE active_vm TYPE string;
DEFINE FIELD public_ipv6 ON TABLE active_vm TYPE string;
DEFINE FIELD disk_size_gb ON TABLE active_vm TYPE int;
DEFINE FIELD vcpus ON TABLE active_vm TYPE int;
DEFINE FIELD memory_mb ON TABLE active_vm TYPE int;
DEFINE FIELD dtrfs_sha ON TABLE active_vm TYPE string;
DEFINE FIELD kernel_sha ON TABLE active_vm TYPE string;
DEFINE FIELD created_at ON TABLE active_vm TYPE datetime;
DEFINE FIELD price_per_unit ON TABLE active_vm TYPE int;
DEFINE FIELD locked_nano ON TABLE active_vm TYPE int;
DEFINE FIELD collected_at ON TABLE active_vm TYPE datetime;
DEFINE TABLE update_vm_req TYPE RELATION FROM account TO vm_node SCHEMAFULL;
DEFINE FIELD vcpus ON TABLE update_vm_req TYPE int;
DEFINE FIELD memory_mb ON TABLE update_vm_req TYPE int;
DEFINE FIELD disk_size_gb ON TABLE update_vm_req TYPE int;
DEFINE FIELD dtrfs_sha ON TABLE update_vm_req TYPE string;
DEFINE FIELD dtrfs_url ON TABLE update_vm_req TYPE string;
DEFINE FIELD kernel_sha ON TABLE update_vm_req TYPE string;
DEFINE FIELD kernel_url ON TABLE update_vm_req TYPE string;
DEFINE TABLE deleted_vm TYPE RELATION FROM account TO vm_node SCHEMAFULL;
DEFINE FIELD hostname ON TABLE deleted_vm TYPE string;
DEFINE FIELD mapped_ports ON TABLE deleted_vm TYPE array<[int, int]>;
DEFINE FIELD public_ipv4 ON TABLE deleted_vm TYPE string;
DEFINE FIELD public_ipv6 ON TABLE deleted_vm TYPE string;
DEFINE FIELD disk_size_gb ON TABLE deleted_vm TYPE int;
DEFINE FIELD vcpus ON TABLE deleted_vm TYPE int;
DEFINE FIELD memory_mb ON TABLE deleted_vm TYPE int;
DEFINE FIELD dtrfs_sha ON TABLE deleted_vm TYPE string;
DEFINE FIELD kernel_sha ON TABLE deleted_vm TYPE string;
DEFINE FIELD created_at ON TABLE deleted_vm TYPE datetime;
DEFINE FIELD deleted_at ON TABLE deleted_vm TYPE datetime;
DEFINE FIELD price_per_unit ON TABLE deleted_vm TYPE int;
DEFINE TABLE app_node SCHEMAFULL; DEFINE TABLE app_node SCHEMAFULL;
DEFINE FIELD operator ON TABLE app_node TYPE record<account>; DEFINE FIELD operator ON TABLE app_node TYPE record<account>;
@ -51,22 +89,36 @@ DEFINE FIELD max_ports_per_app ON TABLE app_node TYPE int;
DEFINE FIELD price ON TABLE app_node TYPE int; DEFINE FIELD price ON TABLE app_node TYPE int;
DEFINE FIELD offline_minutes ON TABLE app_node TYPE int; DEFINE FIELD offline_minutes ON TABLE app_node TYPE int;
DEFINE TABLE app_contract TYPE RELATION FROM account TO app_node SCHEMAFULL; DEFINE TABLE active_app TYPE RELATION FROM account TO app_node SCHEMAFULL;
DEFINE FIELD state ON TABLE app_contract TYPE string; DEFINE FIELD app_name ON TABLE active_app TYPE string;
DEFINE FIELD app_name ON TABLE app_contract TYPE string; DEFINE FIELD mapped_ports ON TABLE active_app TYPE array<[int, int]>;
DEFINE FIELD mapped_ports ON TABLE app_contract TYPE array<[int, int]>; DEFINE FIELD host_ipv4 ON TABLE active_app TYPE string;
DEFINE FIELD host_ipv4 ON TABLE app_contract TYPE string; DEFINE FIELD vcpus ON TABLE active_app TYPE int;
DEFINE FIELD vcpus ON TABLE app_contract TYPE int; DEFINE FIELD memory_mb ON TABLE active_app TYPE int;
DEFINE FIELD memory_mb ON TABLE app_contract TYPE int; DEFINE FIELD disk_size_gb ON TABLE active_app TYPE int;
DEFINE FIELD disk_size_gb ON TABLE app_contract TYPE int; DEFINE FIELD created_at ON TABLE active_app TYPE datetime;
DEFINE FIELD created_at ON TABLE app_contract TYPE datetime; DEFINE FIELD price_per_unit ON TABLE active_app TYPE int;
DEFINE FIELD updated_at ON TABLE app_contract TYPE datetime; DEFINE FIELD locked_nano ON TABLE active_app TYPE int;
DEFINE FIELD price_per_unit ON TABLE app_contract TYPE int; DEFINE FIELD collected_at ON TABLE active_app TYPE datetime;
DEFINE FIELD locked_nano ON TABLE app_contract TYPE int; DEFINE FIELD mr_enclave ON TABLE active_app TYPE string;
DEFINE FIELD collected_at ON TABLE app_contract TYPE datetime; DEFINE FIELD package_url ON TABLE active_app TYPE string;
DEFINE FIELD mr_enclave ON TABLE app_contract TYPE string; DEFINE FIELD hratls_pubkey ON TABLE active_app TYPE string;
DEFINE FIELD package_url ON TABLE app_contract TYPE string;
DEFINE FIELD hratls_pubkey ON TABLE app_contract TYPE string; DEFINE TABLE deleted_app TYPE RELATION FROM account TO app_node SCHEMAFULL;
DEFINE FIELD app_name ON TABLE deleted_app TYPE string;
DEFINE FIELD mapped_ports ON TABLE deleted_app TYPE array<[int, int]>;
DEFINE FIELD host_ipv4 ON TABLE deleted_app TYPE string;
DEFINE FIELD vcpus ON TABLE deleted_app TYPE int;
DEFINE FIELD memory_mb ON TABLE deleted_app TYPE int;
DEFINE FIELD disk_size_gb ON TABLE deleted_app TYPE int;
DEFINE FIELD created_at ON TABLE deleted_app TYPE datetime;
DEFINE FIELD deleted_at ON TABLE deleted_app TYPE datetime;
DEFINE FIELD price_per_unit ON TABLE deleted_app TYPE int;
DEFINE FIELD locked_nano ON TABLE deleted_app TYPE int;
DEFINE FIELD collected_at ON TABLE deleted_app TYPE datetime;
DEFINE FIELD mr_enclave ON TABLE deleted_app TYPE string;
DEFINE FIELD package_url ON TABLE deleted_app TYPE string;
DEFINE FIELD hratls_pubkey ON TABLE deleted_app TYPE string;
DEFINE TABLE ban TYPE RELATION FROM account TO account; DEFINE TABLE ban TYPE RELATION FROM account TO account;
DEFINE FIELD created_at ON TABLE ban TYPE datetime; DEFINE FIELD created_at ON TABLE ban TYPE datetime;
@ -74,7 +126,7 @@ DEFINE FIELD created_at ON TABLE ban TYPE datetime;
DEFINE TABLE kick TYPE RELATION FROM account TO account; DEFINE TABLE kick TYPE RELATION FROM account TO account;
DEFINE FIELD created_at ON TABLE kick TYPE datetime; DEFINE FIELD created_at ON TABLE kick TYPE datetime;
DEFINE FIELD reason ON TABLE kick TYPE string; DEFINE FIELD reason ON TABLE kick TYPE string;
DEFINE FIELD contract ON TABLE kick TYPE record<vm_contract|app_contract>; DEFINE FIELD contract ON TABLE kick TYPE record<deleted_vm|deleted_app>;
DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node; DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node;
DEFINE FIELD created_at ON TABLE report TYPE datetime; DEFINE FIELD created_at ON TABLE report TYPE datetime;

227
src/db.rs

@ -5,18 +5,25 @@ use surrealdb::{
engine::remote::ws::{Client, Ws}, engine::remote::ws::{Client, Ws},
opt::auth::Root, opt::auth::Root,
sql::Datetime, sql::Datetime,
RecordId, Surreal, Notification, RecordId, Surreal,
}; };
use tokio::sync::mpsc::Sender;
use tokio_stream::StreamExt as _;
static DB: LazyLock<Surreal<Client>> = LazyLock::new(Surreal::init); static DB: LazyLock<Surreal<Client>> = LazyLock::new(Surreal::init);
pub const ACCOUNT: &str = "account"; pub const ACCOUNT: &str = "account";
pub const VM_CONTRACT: &str = "vm_contract";
pub const VM_NODE: &str = "vm_node"; pub const VM_NODE: &str = "vm_node";
pub const ACTIVE_VM: &str = "active_vm";
pub const NEW_VM_REQ: &str = "new_vm_req";
pub const UPDATE_VM_REQ: &str = "update_vm_req";
pub const DELETED_VM: &str = "deleted_vm";
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum Error { pub enum Error {
#[error(transparent)] #[error("Internal DB error: {0}")]
DataBase(#[from] surrealdb::Error), DataBase(#[from] surrealdb::Error),
#[error("Daemon channel got closed: {0}")]
DaemonConnection(#[from] tokio::sync::mpsc::error::SendError<DaemonNotification>),
} }
pub async fn init() -> surrealdb::Result<()> { pub async fn init() -> surrealdb::Result<()> {
@ -31,7 +38,7 @@ pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()
let accounts: Vec<Account> = old_data.into(); let accounts: Vec<Account> = old_data.into();
let vm_nodes: Vec<VmNode> = old_data.into(); let vm_nodes: Vec<VmNode> = old_data.into();
let app_nodes: Vec<AppNode> = old_data.into(); let app_nodes: Vec<AppNode> = old_data.into();
let vm_contracts: Vec<VmContract> = old_data.into(); let vm_contracts: Vec<ActiveVm> = old_data.into();
init().await?; init().await?;
@ -42,7 +49,7 @@ pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()
println!("Inserting app nodes..."); println!("Inserting app nodes...");
let _: Vec<AppNode> = DB.insert(()).content(app_nodes).await?; let _: Vec<AppNode> = DB.insert(()).content(app_nodes).await?;
println!("Inserting vm contracts..."); println!("Inserting vm contracts...");
let _: Vec<VmContract> = DB.insert("vm_contract").relation(vm_contracts).await?; let _: Vec<ActiveVm> = DB.insert("vm_contract").relation(vm_contracts).await?;
Ok(()) Ok(())
} }
@ -124,14 +131,60 @@ pub struct VmNodeWithReports {
pub reports: Vec<Report>, pub reports: Vec<Report>,
} }
pub enum DaemonNotification {
Create(NewVmReq),
Update(UpdateVmReq),
Delete(DeletedVm),
}
impl From<NewVmReq> for DaemonNotification {
fn from(value: NewVmReq) -> Self {
Self::Create(value)
}
}
impl From<UpdateVmReq> for DaemonNotification {
fn from(value: UpdateVmReq) -> Self {
Self::Update(value)
}
}
impl From<DeletedVm> for DaemonNotification {
fn from(value: DeletedVm) -> Self {
Self::Delete(value)
}
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct VmContract { pub struct NewVmReq {
pub id: RecordId,
#[serde(rename = "in")]
pub admin: RecordId,
#[serde(rename = "out")]
pub vm_node: RecordId,
pub hostname: String,
pub extra_ports: Vec<u32>,
pub public_ipv4: bool,
pub public_ipv6: bool,
pub disk_size_gb: u32,
pub vcpus: u32,
pub memory_mb: u32,
pub dtrfs_url: String,
pub dtrfs_sha: String,
pub kernel_sha: String,
pub kernel_url: String,
pub created_at: Datetime,
pub price_per_unit: u64,
pub locked_nano: u64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ActiveVm {
pub id: RecordId, pub id: RecordId,
#[serde(rename = "in")] #[serde(rename = "in")]
pub admin: RecordId, pub admin: RecordId,
#[serde(rename = "out")] #[serde(rename = "out")]
pub vm_node: RecordId, pub vm_node: RecordId,
pub state: String,
pub hostname: String, pub hostname: String,
pub mapped_ports: Vec<(u32, u32)>, pub mapped_ports: Vec<(u32, u32)>,
pub public_ipv4: String, pub public_ipv4: String,
@ -142,13 +195,124 @@ pub struct VmContract {
pub dtrfs_sha: String, pub dtrfs_sha: String,
pub kernel_sha: String, pub kernel_sha: String,
pub created_at: Datetime, pub created_at: Datetime,
pub updated_at: Datetime,
pub price_per_unit: u64, pub price_per_unit: u64,
pub locked_nano: u64, pub locked_nano: u64,
pub collected_at: Datetime, pub collected_at: Datetime,
} }
impl VmContract { #[derive(Debug, Serialize, Deserialize)]
pub struct UpdateVmReq {
pub id: RecordId,
#[serde(rename = "in")]
pub admin: RecordId,
#[serde(rename = "out")]
pub vm_node: RecordId,
pub disk_size_gb: u32,
pub vcpus: u32,
pub memory_mb: u32,
pub dtrfs_url: String,
pub dtrfs_sha: String,
pub kernel_sha: String,
pub kernel_url: String,
pub created_at: Datetime,
pub price_per_unit: u64,
pub locked_nano: u64,
}
pub async fn listen_for_node<T: Into<DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>>(
node: &str,
tx: Sender<DaemonNotification>,
) -> Result<(), Error> {
let table_name = match std::any::type_name::<T>() {
"surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(),
"surreal_brain::db::UpdateVmReq" => UPDATE_VM_REQ.to_string(),
"surreal_brain::db::DeletedVm" => DELETED_VM.to_string(),
wat => {
log::error!("listen_for_node: T has type {wat}");
String::from("wat")
},
};
let mut resp =
DB.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?;
let mut live_stream = resp.stream::<Notification<T>>(0)?;
while let Some(result) = live_stream.next().await {
match result {
Ok(notification) => match notification.action {
surrealdb::Action::Create => tx.send(notification.data.into()).await?,
_ => {}
},
Err(e) => {
log::warn!("listen_for_deletion DB stream failed for {node}: {e}");
return Err(Error::from(e));
}
}
}
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DeletedVm {
pub id: RecordId,
#[serde(rename = "in")]
pub admin: RecordId,
#[serde(rename = "out")]
pub vm_node: RecordId,
pub hostname: String,
pub mapped_ports: Vec<(u32, u32)>,
pub public_ipv4: String,
pub public_ipv6: String,
pub disk_size_gb: u32,
pub vcpus: u32,
pub memory_mb: u32,
pub dtrfs_sha: String,
pub kernel_sha: String,
pub created_at: Datetime,
pub deleted_at: Datetime,
pub price_per_unit: u64,
}
impl DeletedVm {
pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> {
let contract: Option<Self> =
DB.query(format!("select * from {DELETED_VM}:{uuid};")).await?.take(0)?;
Ok(contract)
}
pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> {
let mut result =
DB.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?;
let contracts: Vec<Self> = result.take(0)?;
Ok(contracts)
}
pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> {
let mut result =
DB.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?;
let contracts: Vec<Self> = result.take(0)?;
Ok(contracts)
}
pub async fn list_by_operator(operator: &str) -> Result<Vec<Self>, Error> {
let mut result = DB
.query(format!(
"select
(select * from ->operator->vm_node<-{DELETED_VM}) as contracts
from {ACCOUNT}:{operator};"
))
.await?;
#[derive(Deserialize)]
struct Wrapper {
contracts: Vec<DeletedVm>,
}
let c: Option<Wrapper> = result.take(0)?;
match c {
Some(c) => Ok(c.contracts),
None => Ok(Vec::new()),
}
}
/// total hardware units of this VM /// total hardware units of this VM
fn total_units(&self) -> u64 { fn total_units(&self) -> u64 {
// TODO: Optimize this based on price of hardware. // TODO: Optimize this based on price of hardware.
@ -166,10 +330,27 @@ impl VmContract {
} }
} }
impl VmContract {} impl ActiveVm {
/// total hardware units of this VM
fn total_units(&self) -> u64 {
// TODO: Optimize this based on price of hardware.
// I tried, but this can be done better.
// Storage cost should also be based on tier
(self.vcpus as u64 * 10)
+ ((self.memory_mb + 256) as u64 / 200)
+ (self.disk_size_gb as u64 / 10)
+ (!self.public_ipv4.is_empty() as u64 * 10)
}
/// Returns price per minute in nanoLP
pub fn price_per_minute(&self) -> u64 {
self.total_units() * self.price_per_unit
}
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct VmContractWithNode { pub struct ActiveVmWithNode {
pub id: RecordId, pub id: RecordId,
#[serde(rename = "in")] #[serde(rename = "in")]
pub admin: RecordId, pub admin: RecordId,
@ -192,16 +373,16 @@ pub struct VmContractWithNode {
pub collected_at: Datetime, pub collected_at: Datetime,
} }
impl VmContractWithNode { impl ActiveVmWithNode {
pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> { pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> {
let contract: Option<Self> = let contract: Option<Self> =
DB.query(format!("select * from {VM_CONTRACT}:{uuid} fetch out;")).await?.take(0)?; DB.query(format!("select * from {ACTIVE_VM}:{uuid} fetch out;")).await?.take(0)?;
Ok(contract) Ok(contract)
} }
pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> { pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> {
let mut result = DB let mut result = DB
.query(format!("select * from {VM_CONTRACT} where in = {ACCOUNT}:{admin} fetch out;")) .query(format!("select * from {ACTIVE_VM} where in = {ACCOUNT}:{admin} fetch out;"))
.await?; .await?;
let contracts: Vec<Self> = result.take(0)?; let contracts: Vec<Self> = result.take(0)?;
Ok(contracts) Ok(contracts)
@ -209,7 +390,7 @@ impl VmContractWithNode {
pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> { pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> {
let mut result = DB let mut result = DB
.query(format!("select * from {VM_CONTRACT} where out = {VM_NODE}:{admin} fetch out;")) .query(format!("select * from {ACTIVE_VM} where out = {VM_NODE}:{admin} fetch out;"))
.await?; .await?;
let contracts: Vec<Self> = result.take(0)?; let contracts: Vec<Self> = result.take(0)?;
Ok(contracts) Ok(contracts)
@ -219,14 +400,14 @@ impl VmContractWithNode {
let mut result = DB let mut result = DB
.query(format!( .query(format!(
"select "select
(select * from ->operator->vm_node<-vm_contract fetch out) as contracts (select * from ->operator->vm_node<-{ACTIVE_VM} fetch out) as contracts
from {ACCOUNT}:{operator};" from {ACCOUNT}:{operator};"
)) ))
.await?; .await?;
#[derive(Deserialize)] #[derive(Deserialize)]
struct Wrapper { struct Wrapper {
contracts: Vec<VmContractWithNode>, contracts: Vec<ActiveVmWithNode>,
} }
let c: Option<Wrapper> = result.take(0)?; let c: Option<Wrapper> = result.take(0)?;
@ -461,7 +642,7 @@ impl From<&old_brain::BrainData> for Vec<VmNode> {
} }
} }
impl From<&old_brain::BrainData> for Vec<VmContract> { impl From<&old_brain::BrainData> for Vec<ActiveVm> {
fn from(old_data: &old_brain::BrainData) -> Self { fn from(old_data: &old_brain::BrainData) -> Self {
let mut contracts = Vec::new(); let mut contracts = Vec::new();
for old_c in old_data.vm_contracts.iter() { for old_c in old_data.vm_contracts.iter() {
@ -469,15 +650,10 @@ impl From<&old_brain::BrainData> for Vec<VmContract> {
for port in old_c.exposed_ports.iter() { for port in old_c.exposed_ports.iter() {
mapped_ports.push((*port, 8080 as u32)); mapped_ports.push((*port, 8080 as u32));
} }
contracts.push(VmContract { contracts.push(ActiveVm {
id: RecordId::from(( id: RecordId::from((ACTIVE_VM, old_c.uuid.replace("-", ""))),
VM_CONTRACT,
old_c.node_pubkey.chars().take(20).collect::<String>()
+ &old_c.uuid.replace("-", "").chars().take(20).collect::<String>(),
)),
admin: RecordId::from((ACCOUNT, old_c.admin_pubkey.clone())), admin: RecordId::from((ACCOUNT, old_c.admin_pubkey.clone())),
vm_node: RecordId::from((VM_NODE, old_c.node_pubkey.clone())), vm_node: RecordId::from((VM_NODE, old_c.node_pubkey.clone())),
state: "active".to_string(),
hostname: old_c.hostname.clone(), hostname: old_c.hostname.clone(),
mapped_ports, mapped_ports,
public_ipv4: old_c.public_ipv4.clone(), public_ipv4: old_c.public_ipv4.clone(),
@ -490,7 +666,6 @@ impl From<&old_brain::BrainData> for Vec<VmContract> {
price_per_unit: old_c.price_per_unit, price_per_unit: old_c.price_per_unit,
locked_nano: old_c.locked_nano, locked_nano: old_c.locked_nano,
created_at: old_c.created_at.into(), created_at: old_c.created_at.into(),
updated_at: old_c.updated_at.into(),
collected_at: old_c.collected_at.into(), collected_at: old_c.collected_at.into(),
}); });
} }

@ -18,9 +18,7 @@ use log::info;
use std::pin::Pin; use std::pin::Pin;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
// use tokio::sync::mpsc; use tokio_stream::{Stream, StreamExt};
// use tokio_stream::{wrappers::ReceiverStream, Stream};
use tokio_stream::Stream;
use tonic::{Request, Response, Status, Streaming}; use tonic::{Request, Response, Status, Streaming};
pub struct BrainGeneralCliForReal {} pub struct BrainGeneralCliForReal {}
@ -31,8 +29,74 @@ impl From<db::Account> for AccountBalance {
} }
} }
impl From<db::VmContractWithNode> for VmContract { impl From<db::NewVmReq> for NewVmReq {
fn from(db_c: db::VmContractWithNode) -> Self { fn from(new_vm_req: db::NewVmReq) -> Self {
Self {
uuid: new_vm_req.id.key().to_string(),
hostname: new_vm_req.hostname,
admin_pubkey: new_vm_req.admin.key().to_string(),
node_pubkey: new_vm_req.vm_node.key().to_string(),
extra_ports: new_vm_req.extra_ports,
public_ipv4: new_vm_req.public_ipv4,
public_ipv6: new_vm_req.public_ipv6,
disk_size_gb: new_vm_req.disk_size_gb,
vcpus: new_vm_req.vcpus,
memory_mb: new_vm_req.memory_mb,
kernel_url: new_vm_req.kernel_url,
kernel_sha: new_vm_req.kernel_sha,
dtrfs_url: new_vm_req.dtrfs_url,
dtrfs_sha: new_vm_req.dtrfs_sha,
price_per_unit: new_vm_req.price_per_unit,
locked_nano: new_vm_req.locked_nano,
}
}
}
impl From<db::UpdateVmReq> for UpdateVmReq {
fn from(update_vm_req: db::UpdateVmReq) -> Self {
Self {
uuid: update_vm_req.id.key().to_string(),
// daemon does not care about VM hostname
hostname: String::new(),
admin_pubkey: update_vm_req.admin.key().to_string(),
disk_size_gb: update_vm_req.disk_size_gb,
vcpus: update_vm_req.vcpus,
memory_mb: update_vm_req.memory_mb,
kernel_url: update_vm_req.kernel_url,
kernel_sha: update_vm_req.kernel_sha,
dtrfs_url: update_vm_req.dtrfs_url,
dtrfs_sha: update_vm_req.dtrfs_sha,
}
}
}
impl From<db::DeletedVm> for DeleteVmReq {
fn from(delete_vm_req: db::DeletedVm) -> Self {
Self {
uuid: delete_vm_req.id.key().to_string(),
admin_pubkey: delete_vm_req.admin.key().to_string(),
}
}
}
impl From<db::DaemonNotification> for BrainVmMessage {
fn from(notification: db::DaemonNotification) -> Self {
match notification {
db::DaemonNotification::Create(new_vm_req) => {
BrainVmMessage { msg: Some(brain_vm_message::Msg::NewVmReq(new_vm_req.into())) }
}
db::DaemonNotification::Update(update_vm_req) => BrainVmMessage {
msg: Some(brain_vm_message::Msg::UpdateVmReq(update_vm_req.into())),
},
db::DaemonNotification::Delete(deleted_vm) => {
BrainVmMessage { msg: Some(brain_vm_message::Msg::DeleteVm(deleted_vm.into())) }
}
}
}
}
impl From<db::ActiveVmWithNode> for VmContract {
fn from(db_c: db::ActiveVmWithNode) -> Self {
let mut exposed_ports = Vec::new(); let mut exposed_ports = Vec::new();
for port in db_c.mapped_ports.iter() { for port in db_c.mapped_ports.iter() {
exposed_ports.push(port.0); exposed_ports.push(port.0);
@ -149,7 +213,7 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
.await?; .await?;
info!("Sending existing contracts to {}", req.node_pubkey); info!("Sending existing contracts to {}", req.node_pubkey);
let contracts = db::VmContractWithNode::list_by_node(&req.node_pubkey).await?; let contracts = db::ActiveVmWithNode::list_by_node(&req.node_pubkey).await?;
let (tx, rx) = mpsc::channel(6); let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move { tokio::spawn(async move {
for contract in contracts { for contract in contracts {
@ -165,7 +229,6 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
&self, &self,
req: Request<DaemonStreamAuth>, req: Request<DaemonStreamAuth>,
) -> Result<Response<Self::BrainMessagesStream>, Status> { ) -> Result<Response<Self::BrainMessagesStream>, Status> {
todo!();
let auth = req.into_inner(); let auth = req.into_inner();
let pubkey = auth.pubkey.clone(); let pubkey = auth.pubkey.clone();
check_sig_from_parts( check_sig_from_parts(
@ -175,29 +238,37 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
&auth.signature, &auth.signature,
)?; )?;
info!("Daemon {} connected to receive brain messages", pubkey); info!("Daemon {} connected to receive brain messages", pubkey);
// A surreql query that listens live for changes:
// live select * from vm_contract where meta::id(id) in "3zRxiGRnf46vd3zAEmpa".."3zRxiGRnf46vd3zAEmpb";
//
// And the same from Rust:
// let mut stream = db
// .select("vm_contract")
// .range("3zRxiGRnf46vd3zAEmpa".."3zRxiGRnf46vd3zAEmpb")
// .live()
// .await?;
//
// fn handle(result: Result<Notification<VmContract>, surrealdb::Error>) {
// println!("Received notification: {:?}", result);
// }
//
// while let Some(result) = stream.next().await {
// handle(result);
// }
//
// let (tx, rx) = mpsc::channel(6); let (tx, rx) = mpsc::channel(6);
//self.data.add_daemon_tx(&pubkey, tx); {
//let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg)); let pubkey = pubkey.clone();
//Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream)) let tx = tx.clone();
tokio::spawn(async move {
match db::listen_for_node::<db::DeletedVm>(&pubkey, tx).await {
Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"),
Err(e) => {
log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}")
}
};
});
}
{
let pubkey = pubkey.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _ = db::listen_for_node::<db::NewVmReq>(&pubkey, tx.clone()).await;
});
}
{
let pubkey = pubkey.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _ = db::listen_for_node::<db::UpdateVmReq>(&pubkey, tx.clone()).await;
});
}
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into()));
Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream))
} }
async fn daemon_messages( async fn daemon_messages(
@ -267,7 +338,7 @@ impl BrainGeneralCli for BrainGeneralCliForReal {
async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> { async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
let (account, node) = match db::VmContractWithNode::get_by_uuid(&req.contract).await? { let (account, node) = match db::ActiveVmWithNode::get_by_uuid(&req.contract).await? {
Some(vm_contract) Some(vm_contract)
if vm_contract.admin.key().to_string() == req.admin_pubkey if vm_contract.admin.key().to_string() == req.admin_pubkey
&& vm_contract.vm_node.id.key().to_string() == req.node_pubkey => && vm_contract.vm_node.id.key().to_string() == req.node_pubkey =>
@ -499,7 +570,7 @@ impl BrainVmCli for BrainVmCliForReal {
); );
let mut contracts = Vec::new(); let mut contracts = Vec::new();
if !req.uuid.is_empty() { if !req.uuid.is_empty() {
if let Some(specific_contract) = db::VmContractWithNode::get_by_uuid(&req.uuid).await? { if let Some(specific_contract) = db::ActiveVmWithNode::get_by_uuid(&req.uuid).await? {
if specific_contract.admin.key().to_string() == req.wallet { if specific_contract.admin.key().to_string() == req.wallet {
contracts.push(specific_contract.into()); contracts.push(specific_contract.into());
} }
@ -507,12 +578,11 @@ impl BrainVmCli for BrainVmCliForReal {
} }
} else { } else {
if req.as_operator { if req.as_operator {
contracts.append( contracts
&mut db::VmContractWithNode::list_by_operator(&req.wallet).await?.into(), .append(&mut db::ActiveVmWithNode::list_by_operator(&req.wallet).await?.into());
);
} else { } else {
contracts contracts
.append(&mut db::VmContractWithNode::list_by_admin(&req.wallet).await?.into()); .append(&mut db::ActiveVmWithNode::list_by_admin(&req.wallet).await?.into());
} }
} }
let (tx, rx) = mpsc::channel(6); let (tx, rx) = mpsc::channel(6);