#![allow(dead_code)] pub mod brain { tonic::include_proto!("brain"); } use anyhow::Result; use brain::{ brain_daemon_service_client::BrainDaemonServiceClient, DeletedVmUpdate, NewVmConfirmation, NewVmRequest, NodePubkey, RegisterNodeRequest, UpdateVmRequest, UpdateVmResp }; use lazy_static::lazy_static; use log::{debug, error, info, warn}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use tokio::time::{sleep, Duration}; use tokio::{sync::mpsc::Receiver, sync::mpsc::Sender, task::JoinSet}; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tonic::transport::Channel; use chrono; lazy_static! { static ref SECURE_PUBLIC_KEY: String = generate_random_string(); } fn generate_random_string() -> String { let rng = thread_rng(); rng.sample_iter(&Alphanumeric) .take(16) // Adjust the length as needed .map(char::from) .collect() } async fn listen_for_new_vm_reqs( mut client: BrainDaemonServiceClient, tx: Sender, ) -> Result<()> { debug!("starting listen_for_new_vm_reqs"); let node_pubkey = SECURE_PUBLIC_KEY.clone(); let mut grpc_stream = client .get_new_vm_reqs(NodePubkey { node_pubkey }) .await? .into_inner(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(req) => { info!("Received new vm request: {req:?}"); let _ = tx.send(req).await; } Err(e) => { warn!("Brain disconnected from listen_for_new_vm_reqs: {e}"); } } } debug!("listen_for_new_vm_reqs is about to exit"); Ok(()) } async fn send_confirmations( mut client: BrainDaemonServiceClient, rx: Receiver, ) -> Result<()> { debug!("starting send_confirmations stream"); let rx_stream = ReceiverStream::new(rx); client.send_vm_confirmations(rx_stream).await?; debug!("send_confirmations is about to exit"); Ok(()) } async fn register_node(mut client: BrainDaemonServiceClient) { debug!("Starting node registration..."); let req = RegisterNodeRequest { node_pubkey: SECURE_PUBLIC_KEY.clone(), owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY, ip: "10.0.10.1".to_string(), country: "Cyrodiil".to_string(), city: "Bruma".to_string(), avail_ports: 10000, avail_ipv4: 10, avail_ipv6: 100_000, avail_vcpus: 16, avail_memory_mb: 20_000, avail_storage_gb: 700, max_ports_per_vm: 5, }; match client.register_node(req).await { Ok(_) => info!( "Registered as 10.0.10.1 from Bruma/Cyrodiil with ID {}", SECURE_PUBLIC_KEY.clone() ), Err(e) => error!("Could not register node data: {e:?}"), }; } async fn listen_for_deleted_vms( mut client: BrainDaemonServiceClient, tx: Sender, ) -> Result<()> { debug!("starting listen_for_new_vm_reqs"); let node_pubkey = SECURE_PUBLIC_KEY.clone(); let mut grpc_stream = client .deleted_vm_updates(NodePubkey { node_pubkey }) .await? .into_inner(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(req) => { info!("Received delete vm request: {req:?}"); let _ = tx.send(req).await; } Err(e) => { warn!("Brain disconnected from listen_for_deleted_vms: {e}"); } } } debug!("listen_for_new_vm_reqs is about to exit"); Ok(()) } async fn listen_for_update_vm_reqs( mut client: BrainDaemonServiceClient, tx: Sender, ) -> Result<()> { debug!("starting listen_for_update_vm_reqs"); let node_pubkey = SECURE_PUBLIC_KEY.clone(); let mut grpc_stream = client .update_v_ms(NodePubkey { node_pubkey }) .await? .into_inner(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(req) => { info!("Received update vm request: {req:?}"); let _ = tx.send(req).await; } Err(e) => { warn!("Brain disconnected from listen_for_update_vm_reqs: {e}"); } } } debug!("listen_for_update_vm_reqs is about to exit"); Ok(()) } async fn handle_update_vm_requests( mut req: Receiver, resp: Sender, ) { info!("Started to handle update vm requests."); while let Some(update_vm) = req.recv().await { let response = UpdateVmResp { uuid: update_vm.uuid.clone(), timestamp: chrono::Utc::now().to_rfc3339(), error: String::new(), }; info!("Sending UpdateVmResp: {response:?}"); let _ = resp.send(response).await; } warn!("update vm request handler is ending"); } async fn handle_vm_requests(mut req: Receiver, resp: Sender) { info!("Started to handle vm requests. 1 out of 5 requests will return error."); let mut i = 0; while let Some(new_vm) = req.recv().await { let exposed_ports = match new_vm.public_ipv4 { true => Vec::new(), false => vec![20321, 20415, 25912], }; let public_ipv4 = match new_vm.public_ipv4 { true => "10.0.100.5".to_string(), false => String::new(), }; let public_ipv6 = match new_vm.public_ipv6 { true => " 2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(), false => String::new(), }; if i != 3 { let confirmation = NewVmConfirmation { uuid: new_vm.uuid, exposed_ports, public_ipv4, public_ipv6, error: String::new(), }; info!("Sending NewVmConfirmation: {confirmation:?}"); let _ = resp.send(confirmation).await; } else { let confirmation = NewVmConfirmation { uuid: new_vm.uuid, exposed_ports: Vec::new(), public_ipv4: String::new(), public_ipv6: String::new(), error: "No.".to_string(), }; info!("Sending error for NewVmConfirmation: {confirmation:?}"); let _ = resp.send(confirmation).await; } i += 1; if i == 5 { i = 0; } } warn!("vm request handler is ending"); } async fn connect_and_run() -> Result<()> { let client = BrainDaemonServiceClient::connect("http://[::1]:31337").await?; let mut streaming_tasks = JoinSet::new(); register_node(client.clone()).await; let newvm_client = client.clone(); let (tx, newvm_rx) = tokio::sync::mpsc::channel(6); streaming_tasks.spawn(listen_for_new_vm_reqs(newvm_client, tx)); let confirm_client = client.clone(); let (confirm_tx, rx) = tokio::sync::mpsc::channel(6); streaming_tasks.spawn(send_confirmations(confirm_client, rx)); tokio::spawn(async move { handle_vm_requests(newvm_rx, confirm_tx).await; }); let updatevm_client = client.clone(); let (update_tx, update_rx) = tokio::sync::mpsc::channel(6); streaming_tasks.spawn(listen_for_update_vm_reqs(updatevm_client, update_tx)); let (update_resp_tx, _) = tokio::sync::mpsc::channel(6); tokio::spawn(async move { handle_update_vm_requests(update_rx, update_resp_tx).await; }); let deletevms_client = client.clone(); let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6); streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx)); let task_output = streaming_tasks.join_next().await; warn!("One stream exited: {task_output:?}"); Ok(()) } async fn connection_wrapper() -> ! { loop { info!("Connecting to brain..."); if let Err(e) = connect_and_run().await { error!("The connection broke: {e}"); } sleep(Duration::from_secs(3)).await; } } #[tokio::main] async fn main() { env_logger::builder() .filter_level(log::LevelFilter::Debug) .init(); info!("Hello! My name is {}", SECURE_PUBLIC_KEY.clone()); connection_wrapper().await; }