Compare commits

...

2 Commits

Author SHA1 Message Date
e62fa91bfa
WIP: adding vm updates
I will overwrite this commit when it's working
2025-05-06 16:43:15 +03:00
4dfaa3f465
fix: type matching on live_vmnode_msgs
generic type matching to table name
update type paths with
some logging in tests
2025-05-06 17:25:42 +05:30
6 changed files with 136 additions and 33 deletions

@ -90,9 +90,9 @@ pub async fn live_vmnode_msgs<
tx: Sender<vm::VmDaemonMsg>, tx: Sender<vm::VmDaemonMsg>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let table_name = match std::any::type_name::<T>() { let table_name = match std::any::type_name::<T>() {
"surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(), t if t == std::any::type_name::<crate::db::vm::NewVmReq>() => NEW_VM_REQ.to_string(),
"surreal_brain::db::UpdateVmReq" => UPDATE_VM_REQ.to_string(), t if t == std::any::type_name::<crate::db::vm::UpdateVmReq>() => UPDATE_VM_REQ.to_string(),
"surreal_brain::db::DeletedVm" => DELETED_VM.to_string(), t if t == std::any::type_name::<crate::db::vm::DeletedVm>() => DELETED_VM.to_string(),
wat => { wat => {
log::error!("listen_for_node: T has type {wat}"); log::error!("listen_for_node: T has type {wat}");
String::from("wat") String::from("wat")
@ -109,7 +109,7 @@ pub async fn live_vmnode_msgs<
} }
} }
Err(e) => { Err(e) => {
log::warn!("listen_for_deletion DB stream failed for {node}: {e}"); log::error!("listen_for_{table_name} DB stream failed for {node}: {e}");
return Err(Error::from(e)); return Err(Error::from(e));
} }
} }

@ -2,7 +2,7 @@ use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use super::Error; use super::Error;
use crate::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, VM_NODE}; use crate::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE};
use crate::db::general::Report; use crate::db::general::Report;
use crate::old_brain; use crate::old_brain;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -269,6 +269,28 @@ pub struct ActiveVm {
} }
impl ActiveVm { impl ActiveVm {
/// total hardware units of this VM
fn total_units(&self) -> u64 {
// TODO: Optimize this based on price of hardware.
// I tried, but this can be done better.
// Storage cost should also be based on tier
(self.vcpus as u64 * 10)
+ ((self.memory_mb + 256) as u64 / 200)
+ (self.disk_size_gb as u64 / 10)
+ (!self.public_ipv4.is_empty() as u64 * 10)
}
/// Returns price per minute in nanoLP
pub fn price_per_minute(&self) -> u64 {
self.total_units() * self.price_per_unit
}
pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> {
let contract: Option<Self> =
db.query(format!("select * from {ACTIVE_VM}:{uuid};")).await?.take(0)?;
Ok(contract)
}
pub async fn activate( pub async fn activate(
db: &Surreal<Client>, db: &Surreal<Client>,
id: &str, id: &str,
@ -327,6 +349,25 @@ impl ActiveVm {
NewVmReq::delete(db, id).await?; NewVmReq::delete(db, id).await?;
Ok(()) Ok(())
} }
pub async fn change_hostname(
db: &Surreal<Client>,
id: &str,
new_hostname: &str,
) -> Result<bool, Error> {
let contract: Option<Self> = db
.query(format!(
"UPDATE {ACTIVE_VM}:{id} SET hostname = '{new_hostname}' RETURN BEFORE;"
))
.await?
.take(0)?;
if let Some(contract) = contract {
if contract.hostname != new_hostname {
return Ok(true);
}
}
Ok(false)
}
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -344,8 +385,35 @@ pub struct UpdateVmReq {
pub kernel_sha: String, pub kernel_sha: String,
pub kernel_url: String, pub kernel_url: String,
pub created_at: Datetime, pub created_at: Datetime,
pub price_per_unit: u64, pub error: String,
pub locked_nano: u64, }
impl UpdateVmReq {
/// returns None if VM does not exist
/// returns Some(false) if hw update is not needed
/// returns Some(true) if hw update is needed and got submitted
/// returns error if something happened with the DB
pub async fn request_hw_update(mut self, db: &Surreal<Client>) -> Result<Option<bool>, Error> {
let contract = ActiveVm::get_by_uuid(db, &self.id.key().to_string()).await?;
if contract.is_none() {
return Ok(None);
}
let contract = contract.unwrap();
// this is needed cause TryFrom does not support await
self.vm_node = contract.vm_node;
if !((self.vcpus != 0 && contract.vcpus != self.vcpus)
|| (self.memory_mb != 0 && contract.memory_mb != self.memory_mb)
|| (!self.dtrfs_sha.is_empty() && contract.dtrfs_sha != self.dtrfs_sha)
|| (self.disk_size_gb != 0 && contract.disk_size_gb != self.disk_size_gb))
{
return Ok(Some(false));
}
let _: Vec<Self> = db.insert(UPDATE_VM_REQ).relation(self).await?;
Ok(Some(true))
}
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -431,24 +499,6 @@ impl DeletedVm {
} }
} }
impl ActiveVm {
/// total hardware units of this VM
fn total_units(&self) -> u64 {
// TODO: Optimize this based on price of hardware.
// I tried, but this can be done better.
// Storage cost should also be based on tier
(self.vcpus as u64 * 10)
+ ((self.memory_mb + 256) as u64 / 200)
+ (self.disk_size_gb as u64 / 10)
+ (!self.public_ipv4.is_empty() as u64 * 10)
}
/// Returns price per minute in nanoLP
pub fn price_per_minute(&self) -> u64 {
self.total_units() * self.price_per_unit
}
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct ActiveVmWithNode { pub struct ActiveVmWithNode {
pub id: RecordId, pub id: RecordId,

@ -74,6 +74,26 @@ impl From<db::NewVmResp> for NewVmResp {
} }
} }
impl From<UpdateVmReq> for db::UpdateVmReq {
fn from(new_vm_req: UpdateVmReq) -> Self {
Self {
id: RecordId::from((NEW_VM_REQ, nanoid!(40, &ID_ALPHABET))),
admin: RecordId::from((ACCOUNT, new_vm_req.admin_pubkey)),
// vm_node gets modified later, and only if the db::UpdateVmReq is required
vm_node: RecordId::from((VM_NODE, String::new())),
disk_size_gb: new_vm_req.disk_size_gb,
vcpus: new_vm_req.vcpus,
memory_mb: new_vm_req.memory_mb,
kernel_url: new_vm_req.kernel_url,
kernel_sha: new_vm_req.kernel_sha,
dtrfs_url: new_vm_req.dtrfs_url,
dtrfs_sha: new_vm_req.dtrfs_sha,
created_at: surrealdb::sql::Datetime::default(),
error: String::new(),
}
}
}
impl From<db::UpdateVmReq> for UpdateVmReq { impl From<db::UpdateVmReq> for UpdateVmReq {
fn from(update_vm_req: db::UpdateVmReq) -> Self { fn from(update_vm_req: db::UpdateVmReq) -> Self {
Self { Self {

@ -207,7 +207,9 @@ impl BrainVmCli for VmCliServer {
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> { async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("New VM requested via CLI: {req:?}"); info!("New VM requested via CLI: {req:?}");
if db::general::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { if db::general::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey)
.await?
{
return Err(Status::permission_denied("This operator banned you. What did you do?")); return Err(Status::permission_denied("This operator banned you. What did you do?"));
} }
@ -223,12 +225,14 @@ impl BrainVmCli for VmCliServer {
new_vm_req.submit(&self.db).await?; new_vm_req.submit(&self.db).await?;
match oneshot_rx.await { match oneshot_rx.await {
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded("Request failed due to timeout. Please try again later or contact the DeTEE devs team.")), Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
"Network timeout. Please try again later or contact the DeTEE devs team.",
)),
Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())),
Err(e) => { Err(e) => {
log::error!("Something weird happened. Reached error {e:?}"); log::error!("Something weird happened. Reached error {e:?}");
Err(Status::unknown( Err(Status::unknown(
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.", "Unknown error. Please try again or contact the DeTEE devs team.",
)) ))
} }
} }
@ -237,7 +241,34 @@ impl BrainVmCli for VmCliServer {
async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> { async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("Update VM requested via CLI: {req:?}"); info!("Update VM requested via CLI: {req:?}");
todo!();
let db_req: db::UpdateVmReq = req.clone().into();
let hw_change_submitted = db_req.request_hw_update(&self.db).await?;
if hw_change_submitted.is_none() {
return Ok(Response::new(UpdateVmResp {
uuid: req.uuid.clone(),
error: "VM Contract does not exist.".to_string(),
args: None,
}));
}
let hw_change_submitted = hw_change_submitted.unwrap();
let mut hostname_changed = false;
if !req.hostname.is_empty() {
hostname_changed =
db::ActiveVm::change_hostname(&self.db, &req.uuid, &req.hostname).await?;
}
if !hw_change_submitted && !hostname_changed {
return Ok(Response::new(UpdateVmResp {
uuid: req.uuid.clone(),
error: "No modification required".to_string(),
args: None,
}));
}
todo!("process update response");
// let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); // let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
// self.data.submit_updatevm_req(req, oneshot_tx).await; // self.data.submit_updatevm_req(req, oneshot_tx).await;
// match oneshot_rx.await { // match oneshot_rx.await {

@ -5,10 +5,8 @@ use dotenv::dotenv;
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use surreal_brain::grpc::{ use surreal_brain::grpc::general::GeneralCliServer;
general::GeneralCliServer, use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer};
vm::{VmCliServer, VmDaemonServer},
};
use surrealdb::engine::remote::ws::Client; use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal; use surrealdb::Surreal;
use tokio::io::DuplexStream; use tokio::io::DuplexStream;

@ -37,6 +37,7 @@ pub async fn register_vm_node(
key: Key, key: Key,
operator_wallet: String, operator_wallet: String,
) -> Vec<vm_proto::VmContract> { ) -> Vec<vm_proto::VmContract> {
log::info!("Registering vm_node: {}", key.pubkey);
let node_pubkey = key.pubkey.clone(); let node_pubkey = key.pubkey.clone();
let req = RegisterVmNodeReq { let req = RegisterVmNodeReq {
@ -71,6 +72,7 @@ pub async fn daemon_listener(
key: Key, key: Key,
tx: mpsc::Sender<vm_proto::BrainVmMessage>, tx: mpsc::Sender<vm_proto::BrainVmMessage>,
) { ) {
log::info!("listening vm_daemon");
let mut grpc_stream = let mut grpc_stream =
client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner(); client.brain_messages(key.sign_stream_auth(vec![]).unwrap()).await.unwrap().into_inner();
@ -86,6 +88,7 @@ pub async fn daemon_msg_sender(
tx: mpsc::Sender<vm_proto::VmDaemonMessage>, tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
rx: mpsc::Receiver<vm_proto::VmDaemonMessage>, rx: mpsc::Receiver<vm_proto::VmDaemonMessage>,
) { ) {
log::info!("sender vm_daemon");
let rx_stream = ReceiverStream::new(rx); let rx_stream = ReceiverStream::new(rx);
tx.send(vm_proto::VmDaemonMessage { tx.send(vm_proto::VmDaemonMessage {
msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())), msg: Some(vm_proto::vm_daemon_message::Msg::Auth(key.sign_stream_auth(vec![]).unwrap())),
@ -99,6 +102,7 @@ pub async fn daemon_engine(
tx: mpsc::Sender<vm_proto::VmDaemonMessage>, tx: mpsc::Sender<vm_proto::VmDaemonMessage>,
mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>, mut rx: mpsc::Receiver<vm_proto::BrainVmMessage>,
) { ) {
log::info!("daemon engine vm_daemon");
while let Some(brain_msg) = rx.recv().await { while let Some(brain_msg) = rx.recv().await {
match brain_msg.msg { match brain_msg.msg {
Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { Some(vm_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {