added node registration and handling new vm reqs

This commit is contained in:
ghe0 2024-12-22 18:16:06 +02:00
parent f0e2e8b8bc
commit 47eb811a6e
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4

@ -5,17 +5,17 @@ pub mod brain {
use anyhow::Result;
use brain::{
brain_daemon_service_client::BrainDaemonServiceClient, NewVmConfirmation, NewVmRequest, DeletedVmUpdate,
NodePubkey,
brain_daemon_service_client::BrainDaemonServiceClient, DeletedVmUpdate, NewVmConfirmation,
NewVmRequest, NodePubkey, RegisterNodeRequest,
};
use log::{error, debug, warn, info};
use lazy_static::lazy_static;
use log::{debug, error, info, warn};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use tokio::time::{sleep, Duration};
use tokio::{sync::mpsc::Receiver, sync::mpsc::Sender, task::JoinSet};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use lazy_static::lazy_static;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
lazy_static! {
static ref SECURE_PUBLIC_KEY: String = generate_random_string();
@ -39,9 +39,16 @@ async fn listen_for_new_vm_reqs(
.get_new_vm_reqs(NodePubkey { node_pubkey })
.await?
.into_inner();
while let Some(newvmreq) = grpc_stream.next().await {
info!("Received new vm request: {newvmreq:?}");
let _ = tx.send(newvmreq?).await;
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(req) => {
info!("Received new vm request: {req:?}");
let _ = tx.send(req).await;
}
Err(e) => {
warn!("Brain disconnected from listen_for_new_vm_reqs: {e}");
}
}
}
debug!("listen_for_new_vm_reqs is about to exit");
Ok(())
@ -58,6 +65,31 @@ async fn send_confirmations(
Ok(())
}
async fn register_node(mut client: BrainDaemonServiceClient<Channel>) {
debug!("Starting node registration...");
let req = RegisterNodeRequest {
node_pubkey: SECURE_PUBLIC_KEY.clone(),
owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY,
ip: "10.0.10.1".to_string(),
country: "Cyrodiil".to_string(),
city: "Bruma".to_string(),
avail_ports: 10000,
avail_ipv4: 10,
avail_ipv6: 100_000,
avail_vcpus: 16,
avail_memory_mb: 20_000,
avail_storage_gb: 700,
max_ports_per_vm: 5,
};
match client.register_node(req).await {
Ok(_) => info!(
"Registered as 10.0.10.1 from Bruma/Cyrodiil with ID {}",
SECURE_PUBLIC_KEY.clone()
),
Err(e) => error!("Could not register node data: {e:?}"),
};
}
async fn listen_for_deleted_vms(
mut client: BrainDaemonServiceClient<Channel>,
tx: Sender<DeletedVmUpdate>,
@ -68,26 +100,85 @@ async fn listen_for_deleted_vms(
.deleted_vm_updates(NodePubkey { node_pubkey })
.await?
.into_inner();
while let Some(deleted_vm) = grpc_stream.next().await {
info!("Received deleted vm: {deleted_vm:?}");
let _ = tx.send(deleted_vm?).await;
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(req) => {
info!("Received delete vm request: {req:?}");
let _ = tx.send(req).await;
}
Err(e) => {
warn!("Brain disconnected from listen_for_deleted_vms: {e}");
}
}
}
debug!("listen_for_new_vm_reqs is about to exit");
Ok(())
}
async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmConfirmation>) {
info!("Started to handle vm requests. 1 out of 5 requests will return error.");
let mut i = 0;
while let Some(new_vm) = req.recv().await {
let success = i == 3;
let exposed_ports = match new_vm.public_ipv4 {
true => Vec::new(),
false => vec![20321, 20415, 25912],
};
let public_ipv4 = match new_vm.public_ipv4 {
true => "10.0.100.5".to_string(),
false => String::new(),
};
let public_ipv6 = match new_vm.public_ipv6 {
true => " 2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(),
false => String::new(),
};
if success {
let confirmation = NewVmConfirmation {
uuid: new_vm.uuid,
exposed_ports,
public_ipv4,
public_ipv6,
error: String::new(),
};
info!("Sending NewVmConfirmation: {confirmation:?}");
let _ = resp.send(confirmation).await;
} else {
let confirmation = NewVmConfirmation {
uuid: new_vm.uuid,
exposed_ports: Vec::new(),
public_ipv4: String::new(),
public_ipv6: String::new(),
error: "No.".to_string(),
};
info!("Sending error for NewVmConfirmation: {confirmation:?}");
let _ = resp.send(confirmation).await;
}
i += 1;
if i == 5 {
i = 0;
}
}
warn!("vm request handler is ending");
}
async fn connect_and_run() -> Result<()> {
let client = BrainDaemonServiceClient::connect("http://[::1]:31337").await?;
let mut streaming_tasks = JoinSet::new();
register_node(client.clone()).await;
let newvm_client = client.clone();
let (tx, _newvm_rx) = tokio::sync::mpsc::channel(6);
let (tx, newvm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(listen_for_new_vm_reqs(newvm_client, tx));
let confirm_client = client.clone();
let (_confirm_tx, rx) = tokio::sync::mpsc::channel(6);
let (confirm_tx, rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(send_confirmations(confirm_client, rx));
tokio::spawn(async move {
handle_vm_requests(newvm_rx, confirm_tx).await;
});
let deletevms_client = client.clone();
let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));