From 784414815e2a54aff17753c808a03615bff83d07 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Sun, 27 Apr 2025 03:37:43 +0300 Subject: [PATCH] after creating a VM, save it to active_vm --- src/db.rs | 81 +++++++++++++++++++++++++++++++++++++++++++++++------ src/grpc.rs | 14 +++++++-- 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/src/db.rs b/src/db.rs index d883645..9d04392 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,6 +1,6 @@ use crate::old_brain; use serde::{Deserialize, Serialize}; -use std::sync::LazyLock; +use std::{str::FromStr, sync::LazyLock}; use surrealdb::{ engine::remote::ws::{Client, Ws}, opt::auth::Root, @@ -218,8 +218,7 @@ impl VmNodeWithReports { query += &format!("&& ip = '{}' ", filters.ip); } query += ";"; - let mut result = - DB.query(query).await?; + let mut result = DB.query(query).await?; let vm_nodes: Vec = result.take(0)?; Ok(vm_nodes) } @@ -274,12 +273,22 @@ pub struct NewVmReq { } impl NewVmReq { + pub async fn get(id: &str) -> Result, Error> { + let new_vm_req: Option = DB.select((NEW_VM_REQ, id)).await?; + Ok(new_vm_req) + } + + pub async fn delete(id: &str) -> Result<(), Error> { + let _: Option = DB.delete((NEW_VM_REQ, id)).await?; + Ok(()) + } + pub async fn submit_error(id: &str, error: String) -> Result<(), Error> { #[derive(Serialize)] struct NewVmError { error: String, } - let _: Option = DB.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?; + let _: Option = DB.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?; Ok(()) } @@ -371,6 +380,66 @@ pub struct ActiveVm { pub collected_at: Datetime, } +impl ActiveVm { + pub async fn activate( + id: &str, + args: detee_shared::vm_proto::MeasurementArgs, + ) -> Result<(), Error> { + let new_vm_req = match NewVmReq::get(id).await? { + Some(r) => r, + None => return Ok(()), + }; + + let mut public_ipv4 = String::new(); + let mut public_ipv6 = String::new(); + + for ip in args.ips.iter() { + if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { + if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { + public_ipv4 = ipv4_addr.to_string(); + } + continue; + } + if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) { + public_ipv6 = ipv6_addr.to_string(); + } + } + + let mut mapped_ports = Vec::new(); + let mut guest_ports= vec![ 22 ]; + guest_ports.append(&mut args.exposed_ports.clone()); + let mut i = 0; + while i < new_vm_req.extra_ports.len() && i < guest_ports.len() { + mapped_ports.push((args.exposed_ports[i], guest_ports[i])); + i += 1; + } + + let active_vm = ActiveVm { + id: RecordId::from((ACTIVE_VM, id)), + admin: new_vm_req.admin, + vm_node: new_vm_req.vm_node, + hostname: new_vm_req.hostname, + mapped_ports, + public_ipv4, + public_ipv6, + disk_size_gb: new_vm_req.disk_size_gb, + vcpus: new_vm_req.vcpus, + memory_mb: new_vm_req.memory_mb, + dtrfs_sha: new_vm_req.dtrfs_sha, + kernel_sha: new_vm_req.kernel_sha, + created_at: new_vm_req.created_at.clone(), + price_per_unit: new_vm_req.price_per_unit, + locked_nano: new_vm_req.locked_nano, + collected_at: new_vm_req.created_at, + }; + + let _: Vec = DB.insert(()).relation(active_vm).await?; + + NewVmReq::delete(id).await?; + Ok(()) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct UpdateVmReq { pub id: RecordId, @@ -528,7 +597,6 @@ pub struct ActiveVmWithNode { pub admin: RecordId, #[serde(rename = "out")] pub vm_node: VmNode, - pub state: String, pub hostname: String, pub mapped_ports: Vec<(u32, u32)>, pub public_ipv4: String, @@ -539,7 +607,6 @@ pub struct ActiveVmWithNode { pub dtrfs_sha: String, pub kernel_sha: String, pub created_at: Datetime, - pub updated_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, pub collected_at: Datetime, @@ -648,7 +715,6 @@ pub struct AppContract { admin: RecordId, #[serde(rename = "out")] app_node: RecordId, - state: String, app_name: String, mapped_ports: Vec<(u64, u64)>, host_ipv4: String, @@ -656,7 +722,6 @@ pub struct AppContract { memory_mb: u64, disk_size_gb: u64, created_at: Datetime, - updated_at: Datetime, price_per_unit: u64, locked_nano: u64, collected_at: Datetime, diff --git a/src/grpc.rs b/src/grpc.rs index b0069ba..0ff40ff 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -166,7 +166,10 @@ impl From for VmContract { kernel_sha: db_c.kernel_sha.clone(), nano_per_minute: db_c.price_per_minute(), created_at: db_c.created_at.to_rfc3339(), - updated_at: db_c.updated_at.to_rfc3339(), + // TODO: remove updated_at from the proto + // This will get moved to VM history (users will be able to + // query old contracts, which also shows updates of existing contracts). + updated_at: db_c.created_at.to_rfc3339(), collected_at: db_c.collected_at.to_rfc3339(), } } @@ -354,14 +357,21 @@ impl BrainVmDaemon for BrainVmDaemonForReal { match daemon_message { Ok(msg) => match msg.msg { Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { + // TODO: move new_vm_req to active_vm + // also handle failure properly if !new_vm_resp.error.is_empty() { + db::NewVmReq::submit_error(&new_vm_resp.uuid, new_vm_resp.error) + .await?; } else { db::upsert_record( "measurement_args", &new_vm_resp.uuid, - new_vm_resp.args, + new_vm_resp.args.clone(), ) .await?; + if let Some(args) = new_vm_resp.args { + db::ActiveVm::activate(&new_vm_resp.uuid, args).await?; + } } } Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {