From 47eb811a6e1a8e584ce9a01af7023271acd07e90 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Sun, 22 Dec 2024 18:16:06 +0200 Subject: [PATCH] added node registration and handling new vm reqs --- daemon-mock/src/main.rs | 117 +++++++++++++++++++++++++++++++++++----- 1 file changed, 104 insertions(+), 13 deletions(-) diff --git a/daemon-mock/src/main.rs b/daemon-mock/src/main.rs index 5eea940..e5303a2 100644 --- a/daemon-mock/src/main.rs +++ b/daemon-mock/src/main.rs @@ -5,17 +5,17 @@ pub mod brain { use anyhow::Result; use brain::{ - brain_daemon_service_client::BrainDaemonServiceClient, NewVmConfirmation, NewVmRequest, DeletedVmUpdate, - NodePubkey, + brain_daemon_service_client::BrainDaemonServiceClient, DeletedVmUpdate, NewVmConfirmation, + NewVmRequest, NodePubkey, RegisterNodeRequest, }; -use log::{error, debug, warn, info}; +use lazy_static::lazy_static; +use log::{debug, error, info, warn}; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; use tokio::time::{sleep, Duration}; use tokio::{sync::mpsc::Receiver, sync::mpsc::Sender, task::JoinSet}; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tonic::transport::Channel; -use lazy_static::lazy_static; -use rand::{distributions::Alphanumeric, thread_rng, Rng}; lazy_static! { static ref SECURE_PUBLIC_KEY: String = generate_random_string(); @@ -39,9 +39,16 @@ async fn listen_for_new_vm_reqs( .get_new_vm_reqs(NodePubkey { node_pubkey }) .await? .into_inner(); - while let Some(newvmreq) = grpc_stream.next().await { - info!("Received new vm request: {newvmreq:?}"); - let _ = tx.send(newvmreq?).await; + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(req) => { + info!("Received new vm request: {req:?}"); + let _ = tx.send(req).await; + } + Err(e) => { + warn!("Brain disconnected from listen_for_new_vm_reqs: {e}"); + } + } } debug!("listen_for_new_vm_reqs is about to exit"); Ok(()) @@ -58,6 +65,31 @@ async fn send_confirmations( Ok(()) } +async fn register_node(mut client: BrainDaemonServiceClient) { + debug!("Starting node registration..."); + let req = RegisterNodeRequest { + node_pubkey: SECURE_PUBLIC_KEY.clone(), + owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY, + ip: "10.0.10.1".to_string(), + country: "Cyrodiil".to_string(), + city: "Bruma".to_string(), + avail_ports: 10000, + avail_ipv4: 10, + avail_ipv6: 100_000, + avail_vcpus: 16, + avail_memory_mb: 20_000, + avail_storage_gb: 700, + max_ports_per_vm: 5, + }; + match client.register_node(req).await { + Ok(_) => info!( + "Registered as 10.0.10.1 from Bruma/Cyrodiil with ID {}", + SECURE_PUBLIC_KEY.clone() + ), + Err(e) => error!("Could not register node data: {e:?}"), + }; +} + async fn listen_for_deleted_vms( mut client: BrainDaemonServiceClient, tx: Sender, @@ -68,26 +100,85 @@ async fn listen_for_deleted_vms( .deleted_vm_updates(NodePubkey { node_pubkey }) .await? .into_inner(); - while let Some(deleted_vm) = grpc_stream.next().await { - info!("Received deleted vm: {deleted_vm:?}"); - let _ = tx.send(deleted_vm?).await; + while let Some(stream_update) = grpc_stream.next().await { + match stream_update { + Ok(req) => { + info!("Received delete vm request: {req:?}"); + let _ = tx.send(req).await; + } + Err(e) => { + warn!("Brain disconnected from listen_for_deleted_vms: {e}"); + } + } } debug!("listen_for_new_vm_reqs is about to exit"); Ok(()) } +async fn handle_vm_requests(mut req: Receiver, resp: Sender) { + info!("Started to handle vm requests. 1 out of 5 requests will return error."); + let mut i = 0; + while let Some(new_vm) = req.recv().await { + let success = i == 3; + let exposed_ports = match new_vm.public_ipv4 { + true => Vec::new(), + false => vec![20321, 20415, 25912], + }; + let public_ipv4 = match new_vm.public_ipv4 { + true => "10.0.100.5".to_string(), + false => String::new(), + }; + let public_ipv6 = match new_vm.public_ipv6 { + true => " 2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(), + false => String::new(), + }; + if success { + let confirmation = NewVmConfirmation { + uuid: new_vm.uuid, + exposed_ports, + public_ipv4, + public_ipv6, + error: String::new(), + }; + info!("Sending NewVmConfirmation: {confirmation:?}"); + let _ = resp.send(confirmation).await; + } else { + let confirmation = NewVmConfirmation { + uuid: new_vm.uuid, + exposed_ports: Vec::new(), + public_ipv4: String::new(), + public_ipv6: String::new(), + error: "No.".to_string(), + }; + info!("Sending error for NewVmConfirmation: {confirmation:?}"); + let _ = resp.send(confirmation).await; + } + i += 1; + if i == 5 { + i = 0; + } + } + warn!("vm request handler is ending"); +} + async fn connect_and_run() -> Result<()> { let client = BrainDaemonServiceClient::connect("http://[::1]:31337").await?; let mut streaming_tasks = JoinSet::new(); + register_node(client.clone()).await; + let newvm_client = client.clone(); - let (tx, _newvm_rx) = tokio::sync::mpsc::channel(6); + let (tx, newvm_rx) = tokio::sync::mpsc::channel(6); streaming_tasks.spawn(listen_for_new_vm_reqs(newvm_client, tx)); let confirm_client = client.clone(); - let (_confirm_tx, rx) = tokio::sync::mpsc::channel(6); + let (confirm_tx, rx) = tokio::sync::mpsc::channel(6); streaming_tasks.spawn(send_confirmations(confirm_client, rx)); + tokio::spawn(async move { + handle_vm_requests(newvm_rx, confirm_tx).await; + }); + let deletevms_client = client.clone(); let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6); streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));