From 0215be3acd4ea4706b0095fac74b777287899780 Mon Sep 17 00:00:00 2001 From: ramrem Date: Wed, 25 Dec 2024 22:59:18 +0000 Subject: [PATCH] add support for VM updates Co-authored-by: Ramil_Algayev Co-authored-by: ghe0 Reviewed-on: https://gitea.detee.cloud/ghe0/brain-mock/pulls/1 Co-authored-by: ramrem Co-committed-by: ramrem --- Cargo.lock | 162 ++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + brain.proto | 42 ++++++++++---- src/data.rs | 133 ++++++++++++++++++++++++++++++++++++------ src/grpc.rs | 109 +++++++++++++++++++++++++++++------ 5 files changed, 402 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 97a4195..2957b24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.18" @@ -204,6 +219,7 @@ checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" name = "brain-mock" version = "0.1.0" dependencies = [ + "chrono", "dashmap", "env_logger", "log", @@ -216,6 +232,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + [[package]] name = "byteorder" version = "1.5.0" @@ -228,18 +250,47 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" +[[package]] +name = "cc" +version = "1.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31a0499c1dc64f458ad13872de75c0eb7e3fdb0e67964610c914b034fc5956e" +dependencies = [ + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets", +] + [[package]] name = "colorchoice" version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -527,6 +578,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -568,6 +642,16 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +[[package]] +name = "js-sys" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "libc" version = "0.2.168" @@ -640,6 +724,15 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "object" version = "0.36.5" @@ -924,6 +1017,12 @@ dependencies = [ "syn", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "slab" version = "0.4.9" @@ -1193,6 +1292,69 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79" +dependencies = [ + "bumpalo", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 4a78dd4..20866c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +chrono = "0.4.39" dashmap = "6.1.0" env_logger = "0.11.6" log = "0.4.22" diff --git a/brain.proto b/brain.proto index 05d8e4e..9a9e84f 100644 --- a/brain.proto +++ b/brain.proto @@ -8,7 +8,7 @@ message NodePubkey { string node_pubkey = 1; } -message RegisterNodeRequest { +message RegisterNodeReq { string node_pubkey = 1; string owner_pubkey = 2; string ip = 3; @@ -23,7 +23,7 @@ message RegisterNodeRequest { uint32 max_ports_per_vm = 12; } -message NewVMRequest { +message NewVMReq { string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID string hostname = 2; string admin_pubkey = 3; @@ -40,6 +40,22 @@ message NewVMRequest { string dtrfs_sha = 14; } +message UpdateVMReq { + string uuid = 1; + uint32 disk_size_gb = 3; + uint32 vcpus = 4; + uint32 memory_mb = 5; + string kernel_url = 6; + string kernel_sha = 7; + string dtrfs_url = 8; + string dtrfs_sha = 9; +} + +message UpdateVMResp { + string uuid = 1; + string error = 3; +} + message VMContract { string uuid = 1; string hostname = 2; @@ -54,6 +70,7 @@ message VMContract { string kernel_sha = 11; string dtrfs_sha = 12; string created_at = 13; + string updated_at = 14; } message ListVMContractsReq { @@ -61,7 +78,7 @@ message ListVMContractsReq { string node_pubkey = 2; } -message NewVMConfirmation { +message NewVMResp { string uuid = 1; repeated uint32 exposed_ports = 2; string public_ipv4 = 3; @@ -69,18 +86,21 @@ message NewVMConfirmation { string error = 5; } -message DeletedVMUpdate { +message DeleteVMReq { string uuid = 1; } service BrainDaemonService { - rpc RegisterNode (RegisterNodeRequest) returns (Empty); - rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); - rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); - rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); + rpc RegisterNode (RegisterNodeReq) returns (Empty); + rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); + rpc SendNewVMResp (stream NewVMResp) returns (Empty); + rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); + rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); + rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); } + message NodeFilters { uint32 free_ports = 1; bool offers_ipv4 = 2; @@ -101,9 +121,9 @@ message NodeListResp { } service BrainCliService { - rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); + rpc CreateVMContract (NewVMReq) returns (NewVMResp); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListNodes (NodeFilters) returns (stream NodeListResp); - rpc DeleteVM (DeletedVMUpdate) returns (Empty); + rpc DeleteVM (DeleteVMReq) returns (Empty); + rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); } - diff --git a/src/data.rs b/src/data.rs index 639fdd1..a04f2d3 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] use crate::grpc::brain as grpc; +use chrono::Utc; use dashmap::DashMap; use log::{debug, info, warn}; use std::sync::RwLock; @@ -22,8 +23,8 @@ pub struct Node { pub max_ports_per_vm: u32, } -impl From for Node { - fn from(node: grpc::RegisterNodeRequest) -> Self { +impl From for Node { + fn from(node: grpc::RegisterNodeReq) -> Self { Node { public_key: node.node_pubkey, owner_key: node.owner_pubkey, @@ -69,6 +70,7 @@ pub struct Contract { pub kernel_sha: String, pub dtrfs_sha: String, pub created_at: String, + pub updated_at: String, } impl Into for Contract { @@ -87,6 +89,7 @@ impl Into for Contract { kernel_sha: self.kernel_sha, dtrfs_sha: self.dtrfs_sha, created_at: self.created_at, + updated_at: self.updated_at, } } } @@ -95,9 +98,11 @@ impl Into for Contract { pub struct BrainData { nodes: RwLock>, contracts: RwLock>, - tmp_vmrequests: DashMap)>, - daemon_deletevm_tx: DashMap>, - daemon_newvm_tx: DashMap>, + tmp_newvm_reqs: DashMap)>, + tmp_updatevm_reqs: DashMap)>, + daemon_deletevm_tx: DashMap>, + daemon_newvm_tx: DashMap>, + daemon_updatevm_tx: DashMap>, } #[derive(Debug)] @@ -112,13 +117,15 @@ impl BrainData { Self { nodes: RwLock::new(Vec::new()), contracts: RwLock::new(Vec::new()), - tmp_vmrequests: DashMap::new(), + tmp_newvm_reqs: DashMap::new(), + tmp_updatevm_reqs: DashMap::new(), daemon_deletevm_tx: DashMap::new(), daemon_newvm_tx: DashMap::new(), + daemon_updatevm_tx: DashMap::new(), } } - pub fn insert_node(&self, node: grpc::RegisterNodeRequest) { + pub fn insert_node(&self, node: grpc::RegisterNodeReq) { info!("Registering node {node:?}"); let mut nodes = self.nodes.write().unwrap(); for n in nodes.iter_mut() { @@ -131,11 +138,17 @@ impl BrainData { nodes.push(node.into()); } - pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender) { + pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender) { self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); } - pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { + pub fn add_daemon_updatevm_tx(&self, node_pubkey: &str, tx: Sender) { + log::debug!("Adding daemon updatevm tx for node_pubkey: {}", node_pubkey); + self.daemon_updatevm_tx.insert(node_pubkey.to_string(), tx); + info!("Added daemon TX for {}", node_pubkey); + } + + pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) { if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) { info!("Found vm {}. Deleting...", delete_vm.uuid); if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) { @@ -155,14 +168,14 @@ impl BrainData { } } - pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender) { - self.tmp_vmrequests + pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender) { + self.tmp_newvm_reqs .retain(|_, req| req.0.node_pubkey != node_pubkey); self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); } - pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) { - let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) { + pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmResp) { + let newvmreq = match self.tmp_newvm_reqs.remove(&confirmation.uuid) { Some((_, r)) => r, None => { log::error!( @@ -188,7 +201,8 @@ impl BrainData { exposed_ports: confirmation.exposed_ports, public_ipv4: confirmation.public_ipv4, public_ipv6: confirmation.public_ipv6, - created_at: format!("{:?}", std::time::SystemTime::now()), + created_at: Utc::now().to_rfc3339(), + updated_at: String::new(), hostname: newvmreq.0.hostname, admin_pubkey: newvmreq.0.admin_pubkey, node_pubkey: newvmreq.0.node_pubkey, @@ -202,16 +216,44 @@ impl BrainData { self.contracts.write().unwrap().push(contract); } + pub async fn submit_updatevm_resp(&self, resp: grpc::UpdateVmResp) { + let updatevmreq = match self.tmp_updatevm_reqs.remove(&resp.uuid) { + Some((_, r)) => r, + None => { + log::error!( + "Received confirmation for ghost UpdateVMRequest {}", + resp.uuid + ); + return; + } + }; + if let Err(e) = updatevmreq.1.send(resp.clone()) { + log::error!("CLI RX dropped before receiving UpdateVMResp {resp:?}. Error is: {e:?}"); + } + if resp.error != "" { + return; + } + let mut contracts = self.contracts.write().unwrap(); + if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == resp.uuid) { + contract.disk_size_gb = updatevmreq.0.disk_size_gb; + contract.vcpus = updatevmreq.0.vcpus; + contract.memory_mb = updatevmreq.0.memory_mb; + contract.kernel_sha = updatevmreq.0.kernel_sha; + contract.dtrfs_sha = updatevmreq.0.dtrfs_sha; + contract.updated_at = Utc::now().to_rfc3339(); + } + } + pub async fn submit_newvmrequest( &self, - mut req: grpc::NewVmRequest, - tx: OneshotSender, + mut req: grpc::NewVmReq, + tx: OneshotSender, ) { let uuid = uuid::Uuid::new_v4().to_string(); req.uuid = uuid.clone(); info!("Inserting new vm request in memory: {req:?}"); - self.tmp_vmrequests + self.tmp_newvm_reqs .insert(req.uuid.clone(), (req.clone(), tx)); if let Some(server_tx) = self.daemon_newvm_tx.get(&req.node_pubkey) { debug!( @@ -225,7 +267,7 @@ impl BrainData { "Daemon {} RX dropped before sending update. Cleaning memory...", req.node_pubkey ); - self.submit_vmconfirmation(grpc::NewVmConfirmation { + self.submit_vmconfirmation(grpc::NewVmResp { error: "Daemon is offline.".to_string(), uuid, exposed_ports: Vec::new(), @@ -237,6 +279,61 @@ impl BrainData { } } + pub async fn submit_updatevm_req( + &self, + req: grpc::UpdateVmReq, + tx: OneshotSender, + ) { + let uuid = req.uuid.clone(); + info!("Inserting new vm update request in memory: {req:?}"); + let node_pubkey = match self.find_contract_by_uuid(&req.uuid) { + Some(contract) => contract.node_pubkey, + None => { + log::warn!( + "Received UpdateVMReq for a contract that does not exist: {}", + req.uuid + ); + let _ = tx.send(grpc::UpdateVmResp { + uuid, + error: "Contract does not exist.".to_string(), + }); + return; + } + }; + self.tmp_updatevm_reqs + .insert(req.uuid.clone(), (req.clone(), tx)); + if let Some(server_tx) = self.daemon_updatevm_tx.get(&node_pubkey) { + debug!( + "Found daemon TX for {}. Sending updateVMReq {}", + node_pubkey, req.uuid + ); + match server_tx.send(req.clone()).await { + Ok(_) => { + debug!("Successfully sent updateVMReq to {}", node_pubkey); + return; + } + Err(e) => { + warn!( + "Failed to send updateVMReq to {}: {}. Cleaning memory...", + node_pubkey, e + ); + self.submit_updatevm_resp(grpc::UpdateVmResp { + uuid, + error: "Daemon is offline.".to_string(), + }) + .await; + } + } + } else { + warn!("No daemon TX found for {}", node_pubkey); + self.submit_updatevm_resp(grpc::UpdateVmResp { + uuid, + error: "Daemon is offline.".to_string(), + }) + .await; + } + } + pub fn insert_contract(&self, contract: Contract) { let mut contracts = self.contracts.write().unwrap(); contracts.push(contract); diff --git a/src/grpc.rs b/src/grpc.rs index 72d2f69..6714051 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -41,13 +41,13 @@ impl BrainCliMock { impl BrainDaemonService for BrainDaemonMock { async fn register_node( &self, - req: Request, + req: Request, ) -> Result, Status> { self.data.insert_node(req.into_inner()); Ok(Response::new(Empty {})) } - type GetNewVMReqsStream = Pin> + Send>>; + type GetNewVMReqsStream = Pin> + Send>>; async fn get_new_vm_reqs( &self, @@ -67,7 +67,7 @@ impl BrainDaemonService for BrainDaemonMock { debug!("Sending NewVMRequest to {}: {newvmreq:?}", req.node_pubkey); if let Err(e) = grpc_tx.send(Ok(newvmreq)).await { warn!("Could not send NewVMRequest to {}: {e:?}", req.node_pubkey); - data.submit_vmconfirmation(NewVmConfirmation { + data.submit_vmconfirmation(NewVmResp { error: "Daemon not connected.".to_string(), uuid, ..Default::default() @@ -83,11 +83,11 @@ impl BrainDaemonService for BrainDaemonMock { )) } - async fn send_vm_confirmations( + async fn send_new_vm_resp( &self, - req: Request>, + req: Request>, ) -> Result, Status> { - debug!("Some node connected to stream NewVmConfirmation"); + debug!("Some node connected to stream NewVMResp"); let mut confirmations = req.into_inner(); while let Some(confirmation) = confirmations.next().await { match confirmation { @@ -96,22 +96,21 @@ impl BrainDaemonService for BrainDaemonMock { self.data.submit_vmconfirmation(c).await; } Err(e) => { - log::warn!("Daemon disconnected from Streaming: {e:?}") + log::warn!("Daemon disconnected from Streaming: {e:?}") } } } Ok(Response::new(Empty {})) } - type DeletedVMUpdatesStream = - Pin> + Send>>; + type GetDeleteVMReqStream = Pin> + Send>>; - async fn deleted_vm_updates( + async fn get_delete_vm_req( &self, req: Request, - ) -> Result, Status> { + ) -> Result, Status> { let node_pubkey = req.into_inner().node_pubkey; - info!("Daemon {node_pubkey} requested DeletedVMUpdatesStream"); + info!("Daemon {node_pubkey} requested GetDeleteVMReqStream"); let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); self.data @@ -135,7 +134,7 @@ impl BrainDaemonService for BrainDaemonMock { }); let output_stream = ReceiverStream::new(grpc_rx); Ok(Response::new( - Box::pin(output_stream) as Self::DeletedVMUpdatesStream + Box::pin(output_stream) as Self::GetDeleteVMReqStream )) } @@ -159,14 +158,73 @@ impl BrainDaemonService for BrainDaemonMock { Box::pin(output_stream) as Self::ListVMContractsStream )) } + + type GetUpdateVMReqStream = Pin> + Send>>; + + async fn get_update_vm_req( + &self, + req: Request, + ) -> Result, Status> { + let req = req.into_inner(); + info!("Daemon {} requested GetUpdateVMStream", req.node_pubkey); + let (grpc_tx, grpc_rx) = mpsc::channel(6); + let (data_tx, mut data_rx) = mpsc::channel(6); + self.data + .add_daemon_updatevm_tx(&req.node_pubkey, data_tx); + let data = self.data.clone(); + tokio::spawn(async move { + while let Some(updatevmreq) = data_rx.recv().await { + debug!( + "Sending UpdateVMRequest to {}: {updatevmreq:?}", + req.node_pubkey + ); + if let Err(e) = grpc_tx.send(Ok(updatevmreq.clone())).await { + warn!( + "Could not send UpdateVMRequest to {}: {e:?}", + req.node_pubkey + ); + data.submit_updatevm_resp(UpdateVmResp { + error: "Daemon not connected.".to_string(), + uuid: updatevmreq.uuid, + }) + .await; + break; + } + } + }); + let output_stream = ReceiverStream::new(grpc_rx); + Ok(Response::new( + Box::pin(output_stream) as Self::GetUpdateVMReqStream + )) + } + + async fn send_update_vm_resp( + &self, + req: Request>, + ) -> Result, Status> { + debug!("Some node connected to stream NewVMResp"); + let mut resp_stream = req.into_inner(); + while let Some(update_vm_resp) = resp_stream.next().await { + match update_vm_resp { + Ok(c) => { + info!("Received confirmation from daemon: {c:?}"); + self.data.submit_updatevm_resp(c).await; + } + Err(e) => { + log::warn!("Daemon disconnected from Streaming: {e:?}") + } + } + } + Ok(Response::new(Empty {})) + } } #[tonic::async_trait] impl BrainCliService for BrainCliMock { async fn create_vm_contract( &self, - req: Request, - ) -> Result, Status> { + req: Request, + ) -> Result, Status> { let req = req.into_inner(); info!("New VM requested via CLI: {req:?}"); let admin_pubkey = req.admin_pubkey.clone(); @@ -186,6 +244,25 @@ impl BrainCliService for BrainCliMock { } } + async fn update_vm(&self, req: Request) -> Result, Status> { + let req = req.into_inner(); + info!("Update VM requested via CLI: {req:?}"); + let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); + self.data.submit_updatevm_req(req, oneshot_tx).await; + match oneshot_rx.await { + Ok(response) => { + info!("Sending UpdateVMResp: {response:?}"); + Ok(Response::new(response)) + } + Err(e) => { + log::error!("Something weird happened. Reached error {e:?}"); + Err(Status::unknown( + "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", + )) + } + } + } + type ListVMContractsStream = Pin> + Send>>; async fn list_vm_contracts( &self, @@ -226,7 +303,7 @@ impl BrainCliService for BrainCliMock { )) } - async fn delete_vm(&self, req: Request) -> Result, Status> { + async fn delete_vm(&self, req: Request) -> Result, Status> { let req = req.into_inner(); info!("Unknown CLI requested to delete vm {}", req.uuid); self.data.delete_vm(req).await;