diff --git a/Cargo.lock b/Cargo.lock index 97a4195..2957b24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.18" @@ -204,6 +219,7 @@ checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" name = "brain-mock" version = "0.1.0" dependencies = [ + "chrono", "dashmap", "env_logger", "log", @@ -216,6 +232,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + [[package]] name = "byteorder" version = "1.5.0" @@ -228,18 +250,47 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" +[[package]] +name = "cc" +version = "1.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31a0499c1dc64f458ad13872de75c0eb7e3fdb0e67964610c914b034fc5956e" +dependencies = [ + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets", +] + [[package]] name = "colorchoice" version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -527,6 +578,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -568,6 +642,16 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +[[package]] +name = "js-sys" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "libc" version = "0.2.168" @@ -640,6 +724,15 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "object" version = "0.36.5" @@ -924,6 +1017,12 @@ dependencies = [ "syn", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "slab" version = "0.4.9" @@ -1193,6 +1292,69 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79" +dependencies = [ + "bumpalo", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 4a78dd4..f55837d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } tokio-stream = "0.1.17" tonic = "0.12" uuid = { version = "1.11.0", features = ["v4"] } +chrono = "0.4.39" [build-dependencies] tonic-build = "0.12" diff --git a/src/data.rs b/src/data.rs index 5951faa..eee5746 100644 --- a/src/data.rs +++ b/src/data.rs @@ -137,7 +137,7 @@ impl BrainData { self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); } - pub fn add_daemon_updatevm_tx(&self, node_pubkey: &str, _tx: Sender) { + pub async fn add_daemon_updatevm_tx(&self, node_pubkey: &str, _tx: Sender) { self.daemon_updatevm_tx.insert(node_pubkey.to_string(), _tx); } @@ -243,7 +243,7 @@ impl BrainData { } } - pub fn update_vm(&self, req: grpc::UpdateVmRequest) -> Result<(), String> { + pub async fn update_vm(&self, req: grpc::UpdateVmRequest) -> Result<(), String> { let mut contracts = self.contracts.write().unwrap(); if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == req.uuid) { contract.disk_size_gb = req.disk_size_gb; diff --git a/src/grpc.rs b/src/grpc.rs index c0aa4f9..7b9935a 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; +use chrono; pub struct BrainDaemonMock { data: Arc, @@ -164,34 +165,20 @@ impl BrainDaemonService for BrainDaemonMock { async fn get_update_vm( &self, req: Request, - ) -> Result, Status> { + ) -> Result, Status> { let req = req.into_inner(); - info!("Node {} requested GetUpdateVMsStream", req.node_pubkey); + info!("Daemon {} requested GetNewVMReqsStream", req.node_pubkey); let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); self.data - .clone() - .add_daemon_updatevm_tx(&req.node_pubkey, data_tx); - let data = self.data.clone(); + .add_daemon_updatevm_tx(&req.node_pubkey, data_tx) + .await; tokio::spawn(async move { while let Some(updatevmreq) = data_rx.recv().await { - debug!("Sending UpdateVMRequest to {}: {:?}", req.node_pubkey.clone(), updatevmreq.clone()); - if let Err(e) = grpc_tx.send(Ok(updatevmreq.clone())).await { - warn!("Could not send UpdateVMRequest to {}: {e:?}", req.node_pubkey.clone()); - continue; + debug!("Sending UpdateVMRequest to {}: {updatevmreq:?}", req.node_pubkey); + if let Err(e) = grpc_tx.send(Ok(updatevmreq)).await { + warn!("Could not send UpdateVMRequest to {}: {e:?}", req.node_pubkey); } - data.update_vm(UpdateVmRequest { - uuid: updatevmreq.uuid, - node_pubkey: updatevmreq.node_pubkey, - disk_size_gb: updatevmreq.disk_size_gb, - memory_mb: updatevmreq.memory_mb, - vcpus: updatevmreq.vcpus, - kernel_sha: updatevmreq.kernel_sha, - dtrfs_sha: updatevmreq.dtrfs_sha, - kernel_url: updatevmreq.kernel_url, - dtrfs_url: updatevmreq.dtrfs_url, - }); - break; } }); let output_stream = ReceiverStream::new(grpc_rx); @@ -204,28 +191,19 @@ impl BrainDaemonService for BrainDaemonMock { &self, req: Request>, ) -> Result, Status> { - let req = req.into_inner(); - info!("Received UpdateVMs request"); - - // Perform the update operation - let result = self.data.update_all_vms().await; - - match result { - Ok(_) => { - let response = UpdateVmResp { - success: true, - message: "All VMs updated successfully".to_string(), - }; - Ok(Response::new(response)) - } - Err(e) => { - let response = UpdateVmResp { - success: false, - message: format!("Failed to update VMs: {:?}", e), - }; - Err(Status::internal(response.message)) + debug!("Some node connected to stream UpdateVmResp"); + let mut confirmations = req.into_inner(); + while let Some(confirmation) = confirmations.next().await { + match confirmation { + Ok(c) => { + info!("Received confirmation from daemon: {c:?}") + } + Err(e) => { + log::warn!("Daemon disconnected from Streaming: {e:?}") + } } } + Ok(Response::new(Empty {})) } } @@ -259,10 +237,10 @@ impl BrainCliService for BrainCliMock { req: Request, ) -> Result, Status> { let req = req.into_inner(); - match self.data.update_vm(req.clone()) { + match self.data.update_vm(req.clone()).await { Ok(_) => Ok(Response::new(UpdateVmResp { uuid: req.uuid, - timestamp: format!("{:?}", std::time::SystemTime::now()), + timestamp: chrono::Utc::now().to_rfc3339(), error: "".to_string(), })), Err(e) => Ok(Response::new(UpdateVmResp {