From b14c590d013608b43d86a1a8eba490042d0344fa Mon Sep 17 00:00:00 2001 From: ghe0 Date: Fri, 20 Dec 2024 21:56:34 +0200 Subject: [PATCH] added code to process new vm request --- Cargo.lock | 75 +++++++++++++++++++++++++++ Cargo.toml | 2 + brain.proto | 14 ++++-- src/data.rs | 142 ++++++++++++++++++++++++++++++++++++++-------------- src/grpc.rs | 84 ++++++++++++++++++++----------- 5 files changed, 247 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d92707..e45c7ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,12 +155,14 @@ checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" name = "brain-mock" version = "0.1.0" dependencies = [ + "dashmap", "prost", "prost-types", "tokio", "tokio-stream", "tonic", "tonic-build", + "uuid", ] [[package]] @@ -181,6 +183,26 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "either" version = "1.13.0" @@ -302,6 +324,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -460,6 +488,16 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.22" @@ -525,6 +563,19 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -692,6 +743,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.11.1" @@ -746,6 +806,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.216" @@ -1005,6 +1071,15 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +[[package]] +name = "uuid" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +dependencies = [ + "getrandom", +] + [[package]] name = "want" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 7e93393..fae1b2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,11 +4,13 @@ version = "0.1.0" edition = "2021" [dependencies] +dashmap = "6.1.0" prost = "0.13.4" prost-types = "0.13.4" tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } tokio-stream = "0.1.17" tonic = "0.12" +uuid = { version = "1.11.0", features = ["v4"] } [build-dependencies] tonic-build = "0.12" diff --git a/brain.proto b/brain.proto index 9509be3..c1b14b9 100644 --- a/brain.proto +++ b/brain.proto @@ -4,6 +4,10 @@ package brain; message Empty { } +message NodePubkey { + string node_pubkey = 1; +} + message RegisterNodeRequest { string node_pubkey = 1; string country = 2; @@ -55,9 +59,10 @@ message ListVMContractsReq { } message NewVMConfirmation { - repeated uint32 exposed_ports = 1; - string public_ipv4 = 2; - string public_ipv6 = 3; + string uuid = 1; + repeated uint32 exposed_ports = 2; + string public_ipv4 = 3; + string public_ipv6 = 4; } message DeletedVMUpdate { @@ -66,7 +71,8 @@ message DeletedVMUpdate { service BrainDaemonService { rpc RegisterNode (RegisterNodeRequest) returns (Empty); - rpc NewVMUpdates (stream NewVMConfirmation) returns (stream NewVMRequest); + rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); + rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); rpc DeletedVMUpdates (Empty) returns (stream DeletedVMUpdate); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); } diff --git a/src/data.rs b/src/data.rs index 72f4276..f2daf29 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,11 +1,10 @@ #![allow(dead_code)] +use crate::grpc::brain as grpc; +use dashmap::DashMap; use std::sync::Arc; -use tokio::sync::RwLock; - -pub struct BrainData { - pub nodes: RwLock>, - pub contracts: RwLock>, -} +use std::sync::RwLock; +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot::Sender as OneshotSender; #[derive(Eq, Hash, PartialEq, Clone)] pub struct Node { @@ -24,9 +23,9 @@ pub struct Node { pub max_ports_per_vm: u32, } -impl Into for Node { - fn into(self) -> crate::grpc::brain::NodeListResp { - crate::grpc::brain::NodeListResp { +impl Into for Node { + fn into(self) -> grpc::NodeListResp { + grpc::NodeListResp { node_pubkey: self.public_key, country: self.country, city: self.city, @@ -54,9 +53,9 @@ pub struct Contract { pub created_at: String, } -impl Into for Contract { - fn into(self) -> crate::grpc::brain::VmContract { - crate::grpc::brain::VmContract { +impl Into for Contract { + fn into(self) -> grpc::VmContract { + grpc::VmContract { uuid: self.uuid, hostname: self.hostname, admin_pubkey: self.admin_pubkey, @@ -74,39 +73,108 @@ impl Into for Contract { } } -impl BrainData { - pub fn new() -> Arc { - Arc::new(Self { - nodes: RwLock::new(Vec::new()), - contracts: RwLock::new(Vec::new()), - }) - } +#[derive(Default)] +pub struct BrainData { + nodes: RwLock>, + contracts: RwLock>, + tmp_vmrequests: DashMap, + cli_vmcontract_tx: DashMap>, + daemon_deletevm_tx: DashMap>, + daemon_newvm_tx: DashMap>, +} +pub struct GuardCliTx { + brain_data: Arc, + admin_pubkey: String, +} + +impl Drop for GuardCliTx { + fn drop(&mut self) { + self.brain_data.del_cli_vmcontract_tx(&self.admin_pubkey); + } +} + +impl BrainData { pub async fn insert_node(&self, node: Node) { - let mut nodes = self.nodes.write().await; + let mut nodes = self.nodes.write().unwrap(); nodes.push(node); } - pub async fn insert_contract(&self, contract: Contract) { - let mut contracts = self.contracts.write().await; + pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender) { + self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); + } + + pub fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender) { + self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); + } + + pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) { + let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) { + Some((_, r)) => r, + None => return, + }; + if let Some((_, client_tx)) = self.cli_vmcontract_tx.remove(&newvmreq.admin_pubkey) { + let _ = client_tx.send(confirmation.clone()); + } + let contract = Contract { + uuid: confirmation.uuid, + exposed_ports: confirmation.exposed_ports, + public_ipv4: confirmation.public_ipv4, + public_ipv6: confirmation.public_ipv6, + created_at: format!("{:?}", std::time::SystemTime::now()), + hostname: newvmreq.hostname, + admin_pubkey: newvmreq.admin_pubkey, + node_pubkey: newvmreq.node_pubkey, + disk_size_gb: newvmreq.disk_size_gb, + vcpus: newvmreq.vcpus, + memory_mb: newvmreq.memory_mb, + kernel_sha: newvmreq.kernel_sha, + dtrfs_sha: newvmreq.dtrfs_sha, + }; + self.contracts.write().unwrap().push(contract); + } + + pub async fn submit_newvmrequest( + self: Arc, + mut req: grpc::NewVmRequest, + tx: OneshotSender, + ) -> Option { + req.uuid = uuid::Uuid::new_v4().to_string(); + self.tmp_vmrequests.insert(req.uuid.clone(), req.clone()); + self.cli_vmcontract_tx + .insert(req.admin_pubkey.to_string(), tx); + if let Some(server_tx) = self.clone().daemon_newvm_tx.get(&req.node_pubkey) { + if server_tx.send(req.clone()).await.is_err() { + return None; + } + } + Some(GuardCliTx { + brain_data: self, + admin_pubkey: req.admin_pubkey.to_string(), + }) + } + + pub fn del_cli_vmcontract_tx(&self, admin_pubkey: &str) { + self.cli_vmcontract_tx.remove(admin_pubkey); + } + + pub fn insert_contract(&self, contract: Contract) { + let mut contracts = self.contracts.write().unwrap(); contracts.push(contract); } - pub async fn find_nodes_by_pubkey(&self, public_key: &str) -> Option { - let nodes = self.nodes.read().await; + pub fn find_nodes_by_pubkey(&self, public_key: &str) -> Option { + let nodes = self.nodes.read().unwrap(); nodes.iter().cloned().find(|n| n.public_key == public_key) } - pub async fn find_nodes_by_owner_key(&self, owner_key: &str) -> Option { - let nodes = self.nodes.read().await; + pub fn find_ns_by_owner_key(&self, owner_key: &str) -> Option { + let nodes = self.nodes.read().unwrap(); nodes.iter().cloned().find(|n| n.owner_key == owner_key) } - pub async fn find_nodes_by_filters( - &self, - filters: &crate::grpc::brain::NodeFilters, - ) -> Vec { - let nodes = self.nodes.read().await; + pub fn find_nodes_by_filters(&self, filters: &crate::grpc::brain::NodeFilters) -> Vec { + let nodes = self.nodes.read().unwrap(); nodes .iter() .cloned() @@ -121,13 +189,13 @@ impl BrainData { .collect() } - pub async fn find_contract_by_uuid(&self, uuid: &str) -> Option { - let contracts = self.contracts.read().await; + pub fn find_contract_by_uuid(&self, uuid: &str) -> Option { + let contracts = self.contracts.read().unwrap(); contracts.iter().cloned().find(|c| c.uuid == uuid) } - pub async fn find_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec { - let contracts = self.contracts.read().await; + pub fn find_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec { + let contracts = self.contracts.read().unwrap(); contracts .iter() .cloned() @@ -135,8 +203,8 @@ impl BrainData { .collect() } - pub async fn find_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec { - let contracts = self.contracts.read().await; + pub fn find_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec { + let contracts = self.contracts.read().unwrap(); contracts .iter() .cloned() diff --git a/src/grpc.rs b/src/grpc.rs index 7684327..5ca5bd4 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -11,7 +11,7 @@ use brain::*; use std::pin::Pin; use std::sync::Arc; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; struct BrainDaemonMock { @@ -30,13 +30,38 @@ impl BrainDaemonService for BrainDaemonMock { todo!() } - type NewVMUpdatesStream = Pin> + Send>>; + type GetNewVMReqsStream = Pin> + Send>>; - async fn new_vm_updates( + async fn get_new_vm_reqs( &self, - _req: Request>, - ) -> Result, Status> { - todo!() + req: Request, + ) -> Result, Status> { + let (grpc_tx, grpc_rx) = mpsc::channel(6); + let (data_tx, mut data_rx) = mpsc::channel(6); + self.data + .add_daemon_newvm_tx(&req.into_inner().node_pubkey, data_tx); + tokio::spawn(async move { + while let Some(newvmreq) = data_rx.recv().await { + let _ = grpc_tx.send(Ok(newvmreq)).await; + } + }); + let output_stream = ReceiverStream::new(grpc_rx); + Ok(Response::new( + Box::pin(output_stream) as Self::GetNewVMReqsStream + )) + } + + async fn send_vm_confirmations( + &self, + req: Request>, + ) -> Result, Status> { + let mut confirmations = req.into_inner(); + while let Some(confirmation) = confirmations.next().await { + if let Ok(confirmation) = confirmation { + self.data.submit_vmconfirmation(confirmation).await; + } + } + Ok(Response::new(Empty {})) } type DeletedVMUpdatesStream = @@ -56,16 +81,11 @@ impl BrainDaemonService for BrainDaemonMock { req: Request, ) -> Result, Status> { let req = req.into_inner(); - let contracts = self - .data - .find_contracts_by_admin_pubkey(&req.node_pubkey) - .await; - let (tx, rx) = mpsc::channel(128); + let contracts = self.data.find_contracts_by_admin_pubkey(&req.node_pubkey); + let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { - let _ = tx - .send(Ok(contract.into())) - .await; + let _ = tx.send(Ok(contract.into())).await; } }); @@ -80,9 +100,22 @@ impl BrainDaemonService for BrainDaemonMock { impl BrainCliService for BrainCliMock { async fn create_vm_contract( &self, - _req: Request, + req: Request, ) -> Result, Status> { - todo!(); + let req = req.into_inner(); + let (engine_tx, engine_rx) = tokio::sync::oneshot::channel(); + let dropper = self.data.clone().submit_newvmrequest(req, engine_tx).await; + if dropper.is_none() { + return Err(Status::unavailable( + "The node you picked is currently offline.", + )); + } + if let Ok(response) = engine_rx.await { + return Ok(Response::new(response)); + } + Err(Status::unknown( + "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", + )) } type ListVMContractsStream = Pin> + Send>>; @@ -91,16 +124,11 @@ impl BrainCliService for BrainCliMock { req: Request, ) -> Result, Status> { let req = req.into_inner(); - let contracts = self - .data - .find_contracts_by_admin_pubkey(&req.admin_pubkey) - .await; - let (tx, rx) = mpsc::channel(128); + let contracts = self.data.find_contracts_by_admin_pubkey(&req.admin_pubkey); + let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { - let _ = tx - .send(Ok(contract.into())) - .await; + let _ = tx.send(Ok(contract.into())).await; } }); @@ -116,13 +144,11 @@ impl BrainCliService for BrainCliMock { req: Request, ) -> Result, tonic::Status> { let req = req.into_inner(); - let nodes = self.data.find_nodes_by_filters(&req).await; - let (tx, rx) = mpsc::channel(128); + let nodes = self.data.find_nodes_by_filters(&req); + let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for node in nodes { - let _ = tx - .send(Ok(node.into())) - .await; + let _ = tx.send(Ok(node.into())).await; } });