diff --git a/Cargo.lock b/Cargo.lock index 42912f3..9f56629 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#fb38352e1b47837b14f32d8df5ae7f6b17202aae" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d0d4622c52efdf74ed6582fbac23a6159986ade3" dependencies = [ "bincode 2.0.1", "prost", @@ -3779,6 +3779,7 @@ dependencies = [ "env_logger", "futures", "log", + "nanoid", "serde", "serde_json", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index 96832e2..9d1c65f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ tokio-stream = "0.1.17" log = "0.4.27" env_logger = "0.11.8" thiserror = "2.0.12" +nanoid = "0.4.0" [profile.release] lto = true diff --git a/interim_tables.surql b/interim_tables.surql index 662dea2..d5f831d 100644 --- a/interim_tables.surql +++ b/interim_tables.surql @@ -36,6 +36,7 @@ DEFINE FIELD created_at ON TABLE new_vm_req TYPE datetime; DEFINE FIELD updated_at ON TABLE new_vm_req TYPE datetime; DEFINE FIELD price_per_unit ON TABLE new_vm_req TYPE int; DEFINE FIELD locked_nano ON TABLE new_vm_req TYPE int; +DEFINE FIELD error ON TABLE new_vm_req TYPE string; DEFINE TABLE active_vm TYPE RELATION FROM account TO vm_node SCHEMAFULL; DEFINE FIELD hostname ON TABLE active_vm TYPE string; diff --git a/src/db.rs b/src/db.rs index 39f41e0..77d9e72 100644 --- a/src/db.rs +++ b/src/db.rs @@ -18,6 +18,13 @@ pub const NEW_VM_REQ: &str = "new_vm_req"; pub const UPDATE_VM_REQ: &str = "update_vm_req"; pub const DELETED_VM: &str = "deleted_vm"; +pub const ID_ALPHABET: [char; 62] = [ + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', + 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', + 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', + 'V', 'W', 'X', 'Y', 'Z', +]; + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Internal DB error: {0}")] @@ -34,6 +41,17 @@ pub async fn init() -> surrealdb::Result<()> { Ok(()) } +pub async fn upsert_record( + table: &str, + id: &str, + my_record: SomeRecord, +) -> Result<(), Error> { + #[derive(Deserialize)] + struct Wrapper {} + let _: Option = DB.create((table, id)).content(my_record).await?; + Ok(()) +} + pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()> { let accounts: Vec = old_data.into(); let vm_nodes: Vec = old_data.into(); @@ -85,6 +103,21 @@ impl Account { } } +impl Account { + pub async fn is_banned_by_node(user: &str, node: &str) -> Result { + let ban: Option = DB + .query(format!( + "(select operator->ban[0] as ban + from vm_node:{node} + where operator->ban->account contains account:{user} + ).ban;" + )) + .await? + .take(0)?; + Ok(ban.is_some()) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct VmNode { pub id: RecordId, @@ -104,6 +137,24 @@ pub struct VmNode { pub offline_minutes: u64, } +#[derive(Serialize)] +pub struct VmNodeResources { + pub avail_mem_mb: u32, + pub avail_vcpus: u32, + pub avail_storage_gbs: u32, + pub avail_ipv4: u32, + pub avail_ipv6: u32, + pub avail_ports: u32, + pub max_ports_per_vm: u32, +} + +impl VmNodeResources { + pub async fn merge(self, node_id: &str) -> Result<(), Error> { + let _: Option = DB.update((VM_NODE, node_id)).merge(self).await?; + Ok(()) + } +} + impl VmNode { pub async fn register(self) -> Result<(), Error> { let _: Option = DB.upsert(self.id.clone()).content(self).await?; @@ -176,6 +227,82 @@ pub struct NewVmReq { pub created_at: Datetime, pub price_per_unit: u64, pub locked_nano: u64, + pub error: String, +} + +impl NewVmReq { + pub async fn submit_error(id: &str, error: String) -> Result<(), Error> { + #[derive(Serialize)] + struct NewVmError { + error: String, + } + let _: Option = DB.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?; + Ok(()) + } + + pub async fn submit(self) -> Result<(), Error> { + let _: Option = DB.create(self.id.clone()).content(self).await?; + Ok(()) + } +} + +/// first string is the vm_id +pub enum NewVmResp { + // TODO: find a more elegant way to do this than importing gRPC in the DB module + // https://en.wikipedia.org/wiki/Dependency_inversion_principle + Args(String, detee_shared::snp::pb::vm_proto::MeasurementArgs), + Error(String, String), +} + +impl NewVmResp { + pub async fn listen(vm_id: &str) -> Result { + let mut resp = DB + .query(format!("live select * from {NEW_VM_REQ} where id = {NEW_VM_REQ}:{vm_id};")) + .query(format!( + "live select * from measurement_args where id = measurement_args:{vm_id};" + )) + .await?; + let mut live_stream1 = resp.stream::>(0)?; + let mut live_stream2 = + resp.stream::>(1)?; + + loop { + tokio::select! { + new_vm_req_notif = live_stream1.next() => { + if let Some(new_vm_req_notif) = new_vm_req_notif { + match new_vm_req_notif { + Ok(new_vm_req_notif) => { + match new_vm_req_notif.action { + surrealdb::Action::Update => { + if !new_vm_req_notif.data.error.is_empty() { + return Ok(Self::Error(vm_id.to_string(), new_vm_req_notif.data.error)); + } + }, + _ => {} + }; + }, + Err(e) => return Err(e.into()), + } + } + } + args_notif = live_stream2.next() => { + if let Some(args_notif) = args_notif { + match args_notif { + Ok(args_notif) => { + match args_notif.action { + surrealdb::Action::Create => { + return Ok(Self::Args(vm_id.to_string(), args_notif.data)); + }, + _ => {} + }; + }, + Err(e) => return Err(e.into()), + } + } + } + } + } + } } #[derive(Debug, Serialize, Deserialize)] @@ -219,7 +346,9 @@ pub struct UpdateVmReq { pub locked_nano: u64, } -pub async fn listen_for_node + std::marker::Unpin + for<'de> Deserialize<'de>>( +pub async fn listen_for_node< + T: Into + std::marker::Unpin + for<'de> Deserialize<'de>, +>( node: &str, tx: Sender, ) -> Result<(), Error> { @@ -230,7 +359,7 @@ pub async fn listen_for_node + std::marker::Unpin + wat => { log::error!("listen_for_node: T has type {wat}"); String::from("wat") - }, + } }; let mut resp = DB.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?; @@ -346,7 +475,6 @@ impl ActiveVm { pub fn price_per_minute(&self) -> u64 { self.total_units() * self.price_per_unit } - } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/grpc.rs b/src/grpc.rs index 29366f5..9588635 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -13,9 +13,11 @@ use detee_shared::{ *, }, }; +use nanoid::nanoid; use log::info; use std::pin::Pin; +use surrealdb::RecordId; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; @@ -29,6 +31,31 @@ impl From for AccountBalance { } } +impl From for db::NewVmReq { + fn from(new_vm_req: NewVmReq) -> Self { + Self { + id: RecordId::from((db::NEW_VM_REQ, nanoid!(40, &db::ID_ALPHABET))), + hostname: new_vm_req.hostname, + admin: RecordId::from((db::ACCOUNT, new_vm_req.admin_pubkey)), + vm_node: RecordId::from((db::VM_NODE, new_vm_req.node_pubkey)), + extra_ports: new_vm_req.extra_ports, + public_ipv4: new_vm_req.public_ipv4, + public_ipv6: new_vm_req.public_ipv6, + disk_size_gb: new_vm_req.disk_size_gb, + vcpus: new_vm_req.vcpus, + memory_mb: new_vm_req.memory_mb, + kernel_url: new_vm_req.kernel_url, + kernel_sha: new_vm_req.kernel_sha, + dtrfs_url: new_vm_req.dtrfs_url, + dtrfs_sha: new_vm_req.dtrfs_sha, + price_per_unit: new_vm_req.price_per_unit, + locked_nano: new_vm_req.locked_nano, + created_at: surrealdb::sql::Datetime::default(), + error: String::new(), + } + } +} + impl From for NewVmReq { fn from(new_vm_req: db::NewVmReq) -> Self { Self { @@ -52,6 +79,19 @@ impl From for NewVmReq { } } +impl From for NewVmResp { + fn from(resp: db::NewVmResp) -> Self { + match resp { + // TODO: This will require a small architecture change to pass MeasurementArgs from + // Daemon to CLI + db::NewVmResp::Args(uuid, args) => { + NewVmResp { uuid, error: String::new(), args: Some(args) } + } + db::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None }, + } + } +} + impl From for UpdateVmReq { fn from(update_vm_req: db::UpdateVmReq) -> Self { Self { @@ -181,6 +221,20 @@ impl From for AppNodeListResp { } } +impl From for db::VmNodeResources { + fn from(res: VmNodeResources) -> Self { + Self { + avail_mem_mb: res.avail_memory_mb, + avail_vcpus: res.avail_vcpus, + avail_storage_gbs: res.avail_storage_gb, + avail_ipv4: res.avail_ipv4, + avail_ipv6: res.avail_ipv6, + avail_ports: res.avail_ports, + max_ports_per_vm: res.max_ports_per_vm, + } + } +} + struct BrainVmDaemonForReal {} #[tonic::async_trait] @@ -273,52 +327,54 @@ impl BrainVmDaemon for BrainVmDaemonForReal { async fn daemon_messages( &self, - _req: Request>, + req: Request>, ) -> Result, Status> { - todo!(); - // let mut req_stream = req.into_inner(); - // let pubkey: String; - // if let Some(Ok(msg)) = req_stream.next().await { - // log::debug!("demon_messages received the following auth message: {:?}", msg.msg); - // if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg { - // pubkey = auth.pubkey.clone(); - // check_sig_from_parts( - // &pubkey, - // &auth.timestamp, - // &format!("{:?}", auth.contracts), - // &auth.signature, - // )?; - // } else { - // return Err(Status::unauthenticated( - // "Could not authenticate the daemon: could not extract auth signature", - // )); - // } - // } else { - // return Err(Status::unauthenticated("Could not authenticate the daemon")); - // } + let mut req_stream = req.into_inner(); + let pubkey: String; + if let Some(Ok(msg)) = req_stream.next().await { + log::debug!("demon_messages received the following auth message: {:?}", msg.msg); + if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg { + pubkey = auth.pubkey.clone(); + check_sig_from_parts( + &pubkey, + &auth.timestamp, + &format!("{:?}", auth.contracts), + &auth.signature, + )?; + } else { + return Err(Status::unauthenticated( + "Could not authenticate the daemon: could not extract auth signature", + )); + } + } else { + return Err(Status::unauthenticated("Could not authenticate the daemon")); + } - // // info!("Received a message from daemon {pubkey}: {daemon_message:?}"); - // while let Some(daemon_message) = req_stream.next().await { - // match daemon_message { - // Ok(msg) => match msg.msg { - // Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { - // self.data.submit_newvm_resp(new_vm_resp).await; - // } - // Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { - // self.data.submit_updatevm_resp(update_vm_resp).await; - // } - // Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { - // self.data.submit_node_resources(node_resources); - // } - // _ => {} - // }, - // Err(e) => { - // log::warn!("Daemon disconnected: {e:?}"); - // self.data.del_daemon_tx(&pubkey); - // } - // } - // } - // Ok(Response::new(Empty {})) + while let Some(daemon_message) = req_stream.next().await { + match daemon_message { + Ok(msg) => match msg.msg { + Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { + if !new_vm_resp.error.is_empty() { + } else { + db::upsert_record("measurement_args", &new_vm_resp.uuid, new_vm_resp.args).await?; + } + } + Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { + todo!(); + // self.data.submit_updatevm_resp(update_vm_resp).await; + } + Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { + let node_resources: db::VmNodeResources = node_resources.into(); + node_resources.merge(&pubkey).await?; + } + _ => {} + }, + Err(e) => { + log::warn!("Daemon disconnected: {e:?}"); + } + } + } + Ok(Response::new(Empty {})) } } @@ -495,30 +551,27 @@ impl BrainVmCli for BrainVmCliForReal { async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("New VM requested via CLI: {req:?}"); - todo!(); - // if self - // .data - // .is_user_banned_by_node(&req.admin_pubkey, &req.node_pubkey) - // { - // return Err(Status::permission_denied( - // "This operator banned you. What did you do?", - // )); - // } - // let admin_pubkey = req.admin_pubkey.clone(); - // let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); - // self.data.submit_newvm_req(req, oneshot_tx).await; - // match oneshot_rx.await { - // Ok(response) => { - // info!("Sending VM confirmation to {admin_pubkey}: {response:?}"); - // Ok(Response::new(response)) - // } - // Err(e) => { - // log::error!("Something weird happened. Reached error {e:?}"); - // Err(Status::unknown( - // "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", - // )) - // } - // } + if db::Account::is_banned_by_node(&req.admin_pubkey, &req.node_pubkey).await? { + return Err(Status::permission_denied("This operator banned you. What did you do?")); + } + + let new_vm_req: db::NewVmReq = req.into(); + let id = new_vm_req.id.key().to_string(); + let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + let _ = oneshot_tx.send(db::NewVmResp::listen(&id).await); + }); + new_vm_req.submit().await?; + + match oneshot_rx.await { + Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), + Err(e) => { + log::error!("Something weird happened. Reached error {e:?}"); + Err(Status::unknown( + "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", + )) + } + } } async fn update_vm(&self, req: Request) -> Result, Status> {