added capability to update node resources

This commit is contained in:
ghe0 2024-12-26 23:57:32 +02:00
parent 20e1ff64fa
commit b6e7fa0311
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
4 changed files with 90 additions and 54 deletions

@ -11,16 +11,17 @@ message NodePubkey {
message RegisterNodeReq {
string node_pubkey = 1;
string owner_pubkey = 2;
string ip = 3;
string country = 4;
string city = 5;
uint32 avail_ports = 6;
uint32 avail_ipv4 = 7;
uint32 avail_ipv6 = 8;
uint32 avail_vcpus = 9;
uint32 avail_memory_mb = 10;
uint32 avail_storage_gb = 11;
uint32 max_ports_per_vm = 12;
}
message NodeResourceReq {
string node_pubkey = 1;
uint32 avail_ports = 2;
uint32 avail_ipv4 = 3;
uint32 avail_ipv6 = 4;
uint32 avail_vcpus = 5;
uint32 avail_memory_mb = 6;
uint32 avail_storage_gb = 7;
uint32 max_ports_per_vm = 8;
}
message NewVMReq {
@ -92,6 +93,7 @@ message DeleteVMReq {
service BrainDaemonService {
rpc RegisterNode (RegisterNodeReq) returns (Empty);
rpc SendNodeResources (stream NodeResourceReq) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
rpc SendNewVMResp (stream NewVMResp) returns (Empty);
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
@ -100,7 +102,6 @@ service BrainDaemonService {
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
}
message NodeFilters {
uint32 free_ports = 1;
bool offers_ipv4 = 2;
@ -114,10 +115,11 @@ message NodeFilters {
message NodeListResp {
string node_pubkey = 1;
string country = 2;
string city = 3;
string ip = 4; // required for latency test
uint32 server_rating = 5;
uint32 provider_rating = 6;
string region = 3;
string city = 4;
string ip = 5; // required for latency test
uint32 server_rating = 6;
uint32 provider_rating = 7;
}
service BrainCliService {

@ -37,8 +37,8 @@ async fn get_node_list(mut client: BrainCliServiceClient<Channel>) -> Result<Vec
let mut grpc_stream = client
.list_nodes(NodeFilters {
free_ports: 0,
offers_ipv4: true,
offers_ipv6: true,
offers_ipv4: false,
offers_ipv6: false,
vcpus: 0,
memory_mb: 0,
storage_gb: 0,

@ -11,16 +11,17 @@ message NodePubkey {
message RegisterNodeReq {
string node_pubkey = 1;
string owner_pubkey = 2;
string ip = 3;
string country = 4;
string city = 5;
uint32 avail_ports = 6;
uint32 avail_ipv4 = 7;
uint32 avail_ipv6 = 8;
uint32 avail_vcpus = 9;
uint32 avail_memory_mb = 10;
uint32 avail_storage_gb = 11;
uint32 max_ports_per_vm = 12;
}
message NodeResourceReq {
string node_pubkey = 1;
uint32 avail_ports = 2;
uint32 avail_ipv4 = 3;
uint32 avail_ipv6 = 4;
uint32 avail_vcpus = 5;
uint32 avail_memory_mb = 6;
uint32 avail_storage_gb = 7;
uint32 max_ports_per_vm = 8;
}
message NewVMReq {
@ -92,6 +93,7 @@ message DeleteVMReq {
service BrainDaemonService {
rpc RegisterNode (RegisterNodeReq) returns (Empty);
rpc SendNodeResources (stream NodeResourceReq) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
rpc SendNewVMResp (stream NewVMResp) returns (Empty);
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
@ -100,7 +102,6 @@ service BrainDaemonService {
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
}
message NodeFilters {
uint32 free_ports = 1;
bool offers_ipv4 = 2;
@ -114,10 +115,11 @@ message NodeFilters {
message NodeListResp {
string node_pubkey = 1;
string country = 2;
string city = 3;
string ip = 4; // required for latency test
uint32 server_rating = 5;
uint32 provider_rating = 6;
string region = 3;
string city = 4;
string ip = 5; // required for latency test
uint32 server_rating = 6;
uint32 provider_rating = 7;
}
service BrainCliService {

@ -5,8 +5,8 @@ pub mod brain {
use anyhow::Result;
use brain::{
brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, NewVmResp,
NewVmReq, NodePubkey, RegisterNodeReq, UpdateVmReq, UpdateVmResp,
brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, NewVmReq, NewVmResp,
NodePubkey, NodeResourceReq, RegisterNodeReq, UpdateVmReq, UpdateVmResp,
};
use lazy_static::lazy_static;
use log::{debug, error, info, warn};
@ -65,21 +65,22 @@ async fn send_newvm_resp(
Ok(())
}
async fn send_node_resources(
mut client: BrainDaemonServiceClient<Channel>,
rx: Receiver<NodeResourceReq>,
) -> Result<()> {
debug!("starting send_newvm_resp stream");
let rx_stream = ReceiverStream::new(rx);
client.send_node_resources(rx_stream).await?;
debug!("send_newvm_resp is about to exit");
Ok(())
}
async fn register_node(mut client: BrainDaemonServiceClient<Channel>) {
debug!("Starting node registration...");
let req = RegisterNodeReq {
node_pubkey: SECURE_PUBLIC_KEY.clone(),
owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY,
ip: "10.0.10.1".to_string(),
country: "Cyrodiil".to_string(),
city: "Bruma".to_string(),
avail_ports: 10000,
avail_ipv4: 10,
avail_ipv6: 100_000,
avail_vcpus: 16,
avail_memory_mb: 20_000,
avail_storage_gb: 700,
max_ports_per_vm: 5,
};
match client.register_node(req).await {
Ok(_) => info!(
@ -142,8 +143,8 @@ async fn listen_for_update_vm_reqs(
async fn handle_update_vm_requests(
mut req: Receiver<UpdateVmReq>,
resp_chan: Sender<UpdateVmResp>
) {
resp_chan: Sender<UpdateVmResp>,
) {
info!("Started to handle update vm requests.");
while let Some(update_vm) = req.recv().await {
let update_vm_resp = UpdateVmResp {
@ -152,7 +153,7 @@ async fn handle_update_vm_requests(
};
info!("Sending UpdateVmResp: {update_vm_resp:?}");
let _ = resp_chan.send(update_vm_resp).await;
};
}
warn!("update vm request handler is ending");
}
@ -167,7 +168,11 @@ async fn send_updatevm_resp(
Ok(())
}
async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>) {
async fn handle_vm_requests(
mut req: Receiver<NewVmReq>,
resp: Sender<NewVmResp>,
resource_tx: Sender<NodeResourceReq>,
) {
info!("Started to handle vm requests. 1 out of 5 requests will return error.");
let mut i = 0;
while let Some(new_vm) = req.recv().await {
@ -184,7 +189,7 @@ async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>
false => String::new(),
};
if i != 3 {
let confirmation = NewVmResp{
let confirmation = NewVmResp {
uuid: new_vm.uuid,
exposed_ports,
public_ipv4,
@ -193,6 +198,7 @@ async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>
};
info!("Sending NewVmConfirmation: {confirmation:?}");
let _ = resp.send(confirmation).await;
info!("Sending NodeResourceReq");
} else {
let confirmation = NewVmResp {
uuid: new_vm.uuid,
@ -203,6 +209,19 @@ async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>
};
info!("Sending error for NewVmConfirmation: {confirmation:?}");
let _ = resp.send(confirmation).await;
info!("Sending updated about used resources.");
let _ = resource_tx
.send(NodeResourceReq {
node_pubkey: SECURE_PUBLIC_KEY.clone(),
avail_ipv4: 5,
avail_ipv6: 5,
avail_memory_mb: 5000,
avail_vcpus: 32,
avail_storage_gb: 100,
avail_ports: 5000,
max_ports_per_vm: 5,
})
.await;
}
i += 1;
if i == 5 {
@ -223,11 +242,25 @@ async fn connect_and_run() -> Result<()> {
streaming_tasks.spawn(listen_for_new_vm_reqs(newvm_client, tx));
let confirm_client = client.clone();
let (confirm_tx, rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(send_newvm_resp(confirm_client, rx));
let resource_client = client.clone();
let (resource_tx, resource_rx) = tokio::sync::mpsc::channel(6);
let _ = resource_tx
.send(NodeResourceReq {
node_pubkey: SECURE_PUBLIC_KEY.clone(),
avail_ipv4: 10,
avail_ipv6: 10,
avail_memory_mb: 2500,
avail_vcpus: 60,
avail_storage_gb: 200,
avail_ports: 5002,
max_ports_per_vm: 5,
})
.await;
let (confirm_tx, confirm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(send_newvm_resp(confirm_client, confirm_rx));
streaming_tasks.spawn(send_node_resources(resource_client, resource_rx));
tokio::spawn(async move {
handle_vm_requests(newvm_rx, confirm_tx).await;
handle_vm_requests(newvm_rx, confirm_tx, resource_tx).await;
});
let updatevm_client = client.clone();
@ -242,7 +275,6 @@ async fn connect_and_run() -> Result<()> {
handle_update_vm_requests(updatevm_rx, resp_tx).await;
});
let deletevms_client = client.clone();
let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));