diff --git a/daemon-mock/Cargo.lock b/daemon-mock/Cargo.lock index 2ffd18a..572553c 100644 --- a/daemon-mock/Cargo.lock +++ b/daemon-mock/Cargo.lock @@ -230,9 +230,11 @@ version = "0.1.0" dependencies = [ "anyhow", "env_logger", + "lazy_static", "log", "prost", "prost-types", + "rand", "tokio", "tokio-stream", "tonic", @@ -541,6 +543,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.169" diff --git a/daemon-mock/Cargo.toml b/daemon-mock/Cargo.toml index 3a43022..955420f 100644 --- a/daemon-mock/Cargo.toml +++ b/daemon-mock/Cargo.toml @@ -6,9 +6,11 @@ edition = "2021" [dependencies] anyhow = "1.0.94" env_logger = "0.11.6" +lazy_static = "1.5.0" log = "0.4.22" prost = "0.13.4" prost-types = "0.13.4" +rand = "0.8.5" tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } tokio-stream = "0.1.17" tonic = "0.12" diff --git a/daemon-mock/src/main.rs b/daemon-mock/src/main.rs index 89df882..5eea940 100644 --- a/daemon-mock/src/main.rs +++ b/daemon-mock/src/main.rs @@ -5,7 +5,7 @@ pub mod brain { use anyhow::Result; use brain::{ - brain_daemon_service_client::BrainDaemonServiceClient, NewVmConfirmation, NewVmRequest, + brain_daemon_service_client::BrainDaemonServiceClient, NewVmConfirmation, NewVmRequest, DeletedVmUpdate, NodePubkey, }; use log::{error, debug, warn, info}; @@ -14,12 +14,27 @@ use tokio::{sync::mpsc::Receiver, sync::mpsc::Sender, task::JoinSet}; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tonic::transport::Channel; +use lazy_static::lazy_static; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; + +lazy_static! { + static ref SECURE_PUBLIC_KEY: String = generate_random_string(); +} + +fn generate_random_string() -> String { + let rng = thread_rng(); + rng.sample_iter(&Alphanumeric) + .take(16) // Adjust the length as needed + .map(char::from) + .collect() +} async fn listen_for_new_vm_reqs( mut client: BrainDaemonServiceClient, tx: Sender, ) -> Result<()> { - let node_pubkey = "somePublicKey".to_string(); + debug!("starting listen_for_new_vm_reqs"); + let node_pubkey = SECURE_PUBLIC_KEY.clone(); let mut grpc_stream = client .get_new_vm_reqs(NodePubkey { node_pubkey }) .await? @@ -36,12 +51,31 @@ async fn send_confirmations( mut client: BrainDaemonServiceClient, rx: Receiver, ) -> Result<()> { + debug!("starting send_confirmations stream"); let rx_stream = ReceiverStream::new(rx); client.send_vm_confirmations(rx_stream).await?; debug!("send_confirmations is about to exit"); Ok(()) } +async fn listen_for_deleted_vms( + mut client: BrainDaemonServiceClient, + tx: Sender, +) -> Result<()> { + debug!("starting listen_for_new_vm_reqs"); + let node_pubkey = SECURE_PUBLIC_KEY.clone(); + let mut grpc_stream = client + .deleted_vm_updates(NodePubkey { node_pubkey }) + .await? + .into_inner(); + while let Some(deleted_vm) = grpc_stream.next().await { + info!("Received deleted vm: {deleted_vm:?}"); + let _ = tx.send(deleted_vm?).await; + } + debug!("listen_for_new_vm_reqs is about to exit"); + Ok(()) +} + async fn connect_and_run() -> Result<()> { let client = BrainDaemonServiceClient::connect("http://[::1]:31337").await?; let mut streaming_tasks = JoinSet::new(); @@ -54,6 +88,10 @@ async fn connect_and_run() -> Result<()> { let (_confirm_tx, rx) = tokio::sync::mpsc::channel(6); streaming_tasks.spawn(send_confirmations(confirm_client, rx)); + let deletevms_client = client.clone(); + let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6); + streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx)); + let task_output = streaming_tasks.join_next().await; warn!("One stream exited: {task_output:?}"); Ok(()) @@ -74,5 +112,6 @@ async fn main() { env_logger::builder() .filter_level(log::LevelFilter::Debug) .init(); + info!("Hello! My name is {}", SECURE_PUBLIC_KEY.clone()); connection_wrapper().await; }