added global var and delete_vm stream

This commit is contained in:
ghe0 2024-12-22 16:54:23 +02:00
parent e48b9b0fef
commit f0e2e8b8bc
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
3 changed files with 51 additions and 2 deletions

@ -230,9 +230,11 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"env_logger", "env_logger",
"lazy_static",
"log", "log",
"prost", "prost",
"prost-types", "prost-types",
"rand",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tonic", "tonic",
@ -541,6 +543,12 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
[[package]]
name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.169" version = "0.2.169"

@ -6,9 +6,11 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.94" anyhow = "1.0.94"
env_logger = "0.11.6" env_logger = "0.11.6"
lazy_static = "1.5.0"
log = "0.4.22" log = "0.4.22"
prost = "0.13.4" prost = "0.13.4"
prost-types = "0.13.4" prost-types = "0.13.4"
rand = "0.8.5"
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.17" tokio-stream = "0.1.17"
tonic = "0.12" tonic = "0.12"

@ -5,7 +5,7 @@ pub mod brain {
use anyhow::Result; use anyhow::Result;
use brain::{ use brain::{
brain_daemon_service_client::BrainDaemonServiceClient, NewVmConfirmation, NewVmRequest, brain_daemon_service_client::BrainDaemonServiceClient, NewVmConfirmation, NewVmRequest, DeletedVmUpdate,
NodePubkey, NodePubkey,
}; };
use log::{error, debug, warn, info}; use log::{error, debug, warn, info};
@ -14,12 +14,27 @@ use tokio::{sync::mpsc::Receiver, sync::mpsc::Sender, task::JoinSet};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tonic::transport::Channel; use tonic::transport::Channel;
use lazy_static::lazy_static;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
lazy_static! {
static ref SECURE_PUBLIC_KEY: String = generate_random_string();
}
fn generate_random_string() -> String {
let rng = thread_rng();
rng.sample_iter(&Alphanumeric)
.take(16) // Adjust the length as needed
.map(char::from)
.collect()
}
async fn listen_for_new_vm_reqs( async fn listen_for_new_vm_reqs(
mut client: BrainDaemonServiceClient<Channel>, mut client: BrainDaemonServiceClient<Channel>,
tx: Sender<NewVmRequest>, tx: Sender<NewVmRequest>,
) -> Result<()> { ) -> Result<()> {
let node_pubkey = "somePublicKey".to_string(); debug!("starting listen_for_new_vm_reqs");
let node_pubkey = SECURE_PUBLIC_KEY.clone();
let mut grpc_stream = client let mut grpc_stream = client
.get_new_vm_reqs(NodePubkey { node_pubkey }) .get_new_vm_reqs(NodePubkey { node_pubkey })
.await? .await?
@ -36,12 +51,31 @@ async fn send_confirmations(
mut client: BrainDaemonServiceClient<Channel>, mut client: BrainDaemonServiceClient<Channel>,
rx: Receiver<NewVmConfirmation>, rx: Receiver<NewVmConfirmation>,
) -> Result<()> { ) -> Result<()> {
debug!("starting send_confirmations stream");
let rx_stream = ReceiverStream::new(rx); let rx_stream = ReceiverStream::new(rx);
client.send_vm_confirmations(rx_stream).await?; client.send_vm_confirmations(rx_stream).await?;
debug!("send_confirmations is about to exit"); debug!("send_confirmations is about to exit");
Ok(()) Ok(())
} }
async fn listen_for_deleted_vms(
mut client: BrainDaemonServiceClient<Channel>,
tx: Sender<DeletedVmUpdate>,
) -> Result<()> {
debug!("starting listen_for_new_vm_reqs");
let node_pubkey = SECURE_PUBLIC_KEY.clone();
let mut grpc_stream = client
.deleted_vm_updates(NodePubkey { node_pubkey })
.await?
.into_inner();
while let Some(deleted_vm) = grpc_stream.next().await {
info!("Received deleted vm: {deleted_vm:?}");
let _ = tx.send(deleted_vm?).await;
}
debug!("listen_for_new_vm_reqs is about to exit");
Ok(())
}
async fn connect_and_run() -> Result<()> { async fn connect_and_run() -> Result<()> {
let client = BrainDaemonServiceClient::connect("http://[::1]:31337").await?; let client = BrainDaemonServiceClient::connect("http://[::1]:31337").await?;
let mut streaming_tasks = JoinSet::new(); let mut streaming_tasks = JoinSet::new();
@ -54,6 +88,10 @@ async fn connect_and_run() -> Result<()> {
let (_confirm_tx, rx) = tokio::sync::mpsc::channel(6); let (_confirm_tx, rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(send_confirmations(confirm_client, rx)); streaming_tasks.spawn(send_confirmations(confirm_client, rx));
let deletevms_client = client.clone();
let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));
let task_output = streaming_tasks.join_next().await; let task_output = streaming_tasks.join_next().await;
warn!("One stream exited: {task_output:?}"); warn!("One stream exited: {task_output:?}");
Ok(()) Ok(())
@ -74,5 +112,6 @@ async fn main() {
env_logger::builder() env_logger::builder()
.filter_level(log::LevelFilter::Debug) .filter_level(log::LevelFilter::Debug)
.init(); .init();
info!("Hello! My name is {}", SECURE_PUBLIC_KEY.clone());
connection_wrapper().await; connection_wrapper().await;
} }