Compare commits

..

3 Commits

Author SHA1 Message Date
f050e58176
change NewVmResp to allow the CLI to build params 2024-12-28 04:17:48 +02:00
b6e7fa0311
added capability to update node resources 2024-12-26 23:57:32 +02:00
20e1ff64fa added support for vm updates (#1)
Co-authored-by: Ramil_Algayev <pro.remred@gmail.com>
Co-authored-by: ghe0 <gheorghe@gheo.tech>
Reviewed-on: #1
Co-authored-by: ramrem <ralgayev@detee.ltd>
Co-committed-by: ramrem <ralgayev@detee.ltd>
2024-12-25 23:00:15 +00:00
4 changed files with 137 additions and 108 deletions

@ -11,10 +11,9 @@ message NodePubkey {
message RegisterNodeReq { message RegisterNodeReq {
string node_pubkey = 1; string node_pubkey = 1;
string owner_pubkey = 2; string owner_pubkey = 2;
string ip = 3;
} }
message NodeRes { message NodeResourceReq {
string node_pubkey = 1; string node_pubkey = 1;
uint32 avail_ports = 2; uint32 avail_ports = 2;
uint32 avail_ipv4 = 3; uint32 avail_ipv4 = 3;
@ -44,7 +43,6 @@ message NewVMReq {
message UpdateVMReq { message UpdateVMReq {
string uuid = 1; string uuid = 1;
string node_pubkey = 2;
uint32 disk_size_gb = 3; uint32 disk_size_gb = 3;
uint32 vcpus = 4; uint32 vcpus = 4;
uint32 memory_mb = 5; uint32 memory_mb = 5;
@ -81,12 +79,21 @@ message ListVMContractsReq {
string node_pubkey = 2; string node_pubkey = 2;
} }
message NewVmRespIP {
uint32 nic_index = 1;
string address = 2;
string mask = 3;
string gateway = 4;
}
message NewVMResp { message NewVMResp {
string uuid = 1; string uuid = 1;
repeated uint32 exposed_ports = 2; repeated uint32 exposed_ports = 2;
string public_ipv4 = 3; string ovmf_hash = 5;
string public_ipv6 = 4; // This is needed to allow the CLI to build the kernel params from known data.
string error = 5; // The CLI will use the kernel params to get the measurement.
repeated NewVmRespIP ips = 6;
string error = 7;
} }
message DeleteVMReq { message DeleteVMReq {
@ -95,7 +102,7 @@ message DeleteVMReq {
service BrainDaemonService { service BrainDaemonService {
rpc RegisterNode (RegisterNodeReq) returns (Empty); rpc RegisterNode (RegisterNodeReq) returns (Empty);
rpc NodeResUpdate (stream NodeRes) returns (Empty); rpc SendNodeResources (stream NodeResourceReq) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
rpc SendNewVMResp (stream NewVMResp) returns (Empty); rpc SendNewVMResp (stream NewVMResp) returns (Empty);
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
@ -117,10 +124,11 @@ message NodeFilters {
message NodeListResp { message NodeListResp {
string node_pubkey = 1; string node_pubkey = 1;
string country = 2; string country = 2;
string city = 3; string region = 3;
string ip = 4; // required for latency test string city = 4;
uint32 server_rating = 5; string ip = 5; // required for latency test
uint32 provider_rating = 6; uint32 server_rating = 6;
uint32 provider_rating = 7;
} }
service BrainCliService { service BrainCliService {

@ -5,8 +5,8 @@ pub mod brain {
use anyhow::Result; use anyhow::Result;
use brain::{ use brain::{
brain_cli_service_client::BrainCliServiceClient, DeleteVmReq, ListVmContractsReq, brain_cli_service_client::BrainCliServiceClient, DeleteVmReq, ListVmContractsReq, NewVmReq,
NewVmReq, NodeFilters, NodeListResp, VmContract, UpdateVmReq, NodeFilters, NodeListResp, UpdateVmReq, VmContract,
}; };
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::{debug, info, warn}; use log::{debug, info, warn};
@ -37,8 +37,8 @@ async fn get_node_list(mut client: BrainCliServiceClient<Channel>) -> Result<Vec
let mut grpc_stream = client let mut grpc_stream = client
.list_nodes(NodeFilters { .list_nodes(NodeFilters {
free_ports: 0, free_ports: 0,
offers_ipv4: true, offers_ipv4: false,
offers_ipv6: true, offers_ipv6: false,
vcpus: 0, vcpus: 0,
memory_mb: 0, memory_mb: 0,
storage_gb: 0, storage_gb: 0,
@ -84,12 +84,12 @@ async fn submit_vm_request(
info!("Creating VM {req:?}"); info!("Creating VM {req:?}");
let result = client.create_vm_contract(req).await; let result = client.create_vm_contract(req).await;
match result { match result {
Ok(confirmation) => { Ok(resp) => {
let confirmation = confirmation.into_inner(); let resp = resp.into_inner();
if confirmation.error.is_empty() { if resp.error.is_empty() {
info!("Got VM confirmation: {confirmation:?}"); info!("Got NewVMResp: {resp:?}");
} else { } else {
warn!("Got VM confirmation error: {}", confirmation.error); warn!("Got new VM error: {}", resp.error);
}; };
} }
Err(e) => log::error!("Could not create vm: {e:?}"), Err(e) => log::error!("Could not create vm: {e:?}"),
@ -137,12 +137,10 @@ async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Re
async fn update_vm_request( async fn update_vm_request(
mut client: BrainCliServiceClient<Channel>, mut client: BrainCliServiceClient<Channel>,
node_pubkey: &str, uuid: &str,
uuid: &str,
) -> Result<()> { ) -> Result<()> {
let req = UpdateVmReq { let req = UpdateVmReq {
uuid: uuid.to_string(), uuid: uuid.to_string(),
node_pubkey: node_pubkey.to_string(),
vcpus: 4, vcpus: 4,
memory_mb: 4096, memory_mb: 4096,
disk_size_gb: 40, disk_size_gb: 40,
@ -154,12 +152,12 @@ async fn update_vm_request(
info!("Updating VM {req:?}"); info!("Updating VM {req:?}");
let result = client.update_vm(req).await; let result = client.update_vm(req).await;
match result { match result {
Ok(confirmation) => { Ok(resp) => {
let confirmation = confirmation.into_inner(); let resp = resp.into_inner();
if confirmation.error.is_empty() { if resp.error.is_empty() {
info!("Got VM update confirmation: {confirmation:?}"); info!("Got VM update response: {resp:?}");
} else { } else {
warn!("Got VM update confirmation error: {}", confirmation.error); warn!("Got VM update error: {}", resp.error);
}; };
} }
Err(e) => log::error!("Could not update vm: {e:?}"), Err(e) => log::error!("Could not update vm: {e:?}"),
@ -186,16 +184,13 @@ async fn main() -> Result<()> {
} }
} }
let nodes = get_node_list(client.clone()).await?; let contracts = list_contracts(client.clone()).await?;
for node in nodes { for contract in contracts {
let contracts = list_contracts(client.clone()).await?; if let Err(e) = update_vm_request(client.clone(), &contract.uuid).await {
for contract in contracts { log::error!(
if let Err(e) = update_vm_request(client.clone(), &node.node_pubkey, &contract.uuid).await { "Received error when updating VM on node {}: {e:?}",
log::error!( &contract.node_pubkey
"Received error when updating VM on node {}: {e:?}", );
&node.node_pubkey
);
}
} }
} }

@ -11,10 +11,9 @@ message NodePubkey {
message RegisterNodeReq { message RegisterNodeReq {
string node_pubkey = 1; string node_pubkey = 1;
string owner_pubkey = 2; string owner_pubkey = 2;
string ip = 3;
} }
message NodeRes { message NodeResourceReq {
string node_pubkey = 1; string node_pubkey = 1;
uint32 avail_ports = 2; uint32 avail_ports = 2;
uint32 avail_ipv4 = 3; uint32 avail_ipv4 = 3;
@ -44,7 +43,6 @@ message NewVMReq {
message UpdateVMReq { message UpdateVMReq {
string uuid = 1; string uuid = 1;
string node_pubkey = 2;
uint32 disk_size_gb = 3; uint32 disk_size_gb = 3;
uint32 vcpus = 4; uint32 vcpus = 4;
uint32 memory_mb = 5; uint32 memory_mb = 5;
@ -81,12 +79,21 @@ message ListVMContractsReq {
string node_pubkey = 2; string node_pubkey = 2;
} }
message NewVmRespIP {
uint32 nic_index = 1;
string address = 2;
string mask = 3;
string gateway = 4;
}
message NewVMResp { message NewVMResp {
string uuid = 1; string uuid = 1;
repeated uint32 exposed_ports = 2; repeated uint32 exposed_ports = 2;
string public_ipv4 = 3; string ovmf_hash = 5;
string public_ipv6 = 4; // This is needed to allow the CLI to build the kernel params from known data.
string error = 5; // The CLI will use the kernel params to get the measurement.
repeated NewVmRespIP ips = 6;
string error = 7;
} }
message DeleteVMReq { message DeleteVMReq {
@ -95,7 +102,7 @@ message DeleteVMReq {
service BrainDaemonService { service BrainDaemonService {
rpc RegisterNode (RegisterNodeReq) returns (Empty); rpc RegisterNode (RegisterNodeReq) returns (Empty);
rpc NodeResUpdate (stream NodeRes) returns (Empty); rpc SendNodeResources (stream NodeResourceReq) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
rpc SendNewVMResp (stream NewVMResp) returns (Empty); rpc SendNewVMResp (stream NewVMResp) returns (Empty);
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
@ -117,10 +124,11 @@ message NodeFilters {
message NodeListResp { message NodeListResp {
string node_pubkey = 1; string node_pubkey = 1;
string country = 2; string country = 2;
string city = 3; string region = 3;
string ip = 4; // required for latency test string city = 4;
uint32 server_rating = 5; string ip = 5; // required for latency test
uint32 provider_rating = 6; uint32 server_rating = 6;
uint32 provider_rating = 7;
} }
service BrainCliService { service BrainCliService {
@ -130,4 +138,3 @@ service BrainCliService {
rpc DeleteVM (DeleteVMReq) returns (Empty); rpc DeleteVM (DeleteVMReq) returns (Empty);
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
} }

@ -5,8 +5,8 @@ pub mod brain {
use anyhow::Result; use anyhow::Result;
use brain::{ use brain::{
brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, NewVmResp, brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, NewVmReq, NewVmResp,
NewVmReq, NodePubkey, RegisterNodeReq, UpdateVmReq, UpdateVmResp, NodeRes, NewVmRespIp, NodePubkey, NodeResourceReq, RegisterNodeReq, UpdateVmReq, UpdateVmResp,
}; };
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
@ -65,46 +65,32 @@ async fn send_newvm_resp(
Ok(()) Ok(())
} }
async fn send_node_resources(
mut client: BrainDaemonServiceClient<Channel>,
rx: Receiver<NodeResourceReq>,
) -> Result<()> {
debug!("starting send_newvm_resp stream");
let rx_stream = ReceiverStream::new(rx);
client.send_node_resources(rx_stream).await?;
debug!("send_newvm_resp is about to exit");
Ok(())
}
async fn register_node(mut client: BrainDaemonServiceClient<Channel>) { async fn register_node(mut client: BrainDaemonServiceClient<Channel>) {
debug!("Starting node registration..."); debug!("Starting node registration...");
let req = RegisterNodeReq { let req = RegisterNodeReq {
node_pubkey: SECURE_PUBLIC_KEY.clone(), node_pubkey: SECURE_PUBLIC_KEY.clone(),
owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY, owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY,
ip: "10.0.10.1".to_string(),
}; };
match client.register_node(req).await { match client.register_node(req).await {
Ok(_) => info!( Ok(_) => info!(
"Registered as 10.0.10.1 {}", "Registered as 10.0.10.1 from Bruma/Cyrodiil with ID {}",
SECURE_PUBLIC_KEY.clone() SECURE_PUBLIC_KEY.clone()
), ),
Err(e) => error!("Could not register node data: {e:?}"), Err(e) => error!("Could not register node data: {e:?}"),
}; };
} }
async fn update_node_res(
mut _client: BrainDaemonServiceClient<Channel>,
tx: Sender<NodeRes>,
) -> Result<()> {
debug!("Starting node registration...");
let req = NodeRes {
node_pubkey: SECURE_PUBLIC_KEY.clone(),
avail_ports: 10000,
avail_ipv4: 10,
avail_ipv6: 100_000,
avail_vcpus: 16,
avail_memory_mb: 20_000,
avail_storage_gb: 700,
max_ports_per_vm: 5,
};
if let Err(e) = tx.send(req).await {
error!("Failed to send NodeRes: {e}");
} else {
info!("NodeRes sent successfully");
}
Ok(())
}
async fn listen_for_deleted_vms( async fn listen_for_deleted_vms(
mut client: BrainDaemonServiceClient<Channel>, mut client: BrainDaemonServiceClient<Channel>,
tx: Sender<DeleteVmReq>, tx: Sender<DeleteVmReq>,
@ -157,17 +143,17 @@ async fn listen_for_update_vm_reqs(
async fn handle_update_vm_requests( async fn handle_update_vm_requests(
mut req: Receiver<UpdateVmReq>, mut req: Receiver<UpdateVmReq>,
resp: Sender<UpdateVmResp> resp_chan: Sender<UpdateVmResp>,
) { ) {
info!("Started to handle update vm requests."); info!("Started to handle update vm requests.");
while let Some(update_vm) = req.recv().await { while let Some(update_vm) = req.recv().await {
let confirmation = UpdateVmResp { let update_vm_resp = UpdateVmResp {
uuid: update_vm.uuid, uuid: update_vm.uuid,
error: "".to_string(), error: "".to_string(),
}; };
info!("Sending UpdateVmResp: {confirmation:?}"); info!("Sending UpdateVmResp: {update_vm_resp:?}");
let _ = resp.send(confirmation).await; let _ = resp_chan.send(update_vm_resp).await;
}; }
warn!("update vm request handler is ending"); warn!("update vm request handler is ending");
} }
@ -182,7 +168,11 @@ async fn send_updatevm_resp(
Ok(()) Ok(())
} }
async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>) { async fn handle_vm_requests(
mut req: Receiver<NewVmReq>,
resp: Sender<NewVmResp>,
resource_tx: Sender<NodeResourceReq>,
) {
info!("Started to handle vm requests. 1 out of 5 requests will return error."); info!("Started to handle vm requests. 1 out of 5 requests will return error.");
let mut i = 0; let mut i = 0;
while let Some(new_vm) = req.recv().await { while let Some(new_vm) = req.recv().await {
@ -190,34 +180,53 @@ async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>
true => Vec::new(), true => Vec::new(),
false => vec![20321, 20415, 25912], false => vec![20321, 20415, 25912],
}; };
let public_ipv4 = match new_vm.public_ipv4 { let mut ips = Vec::new();
true => "10.0.100.5".to_string(), ips.push(NewVmRespIp {
false => String::new(), nic_index: 0,
}; address: "190.0.100.5".to_string(),
let public_ipv6 = match new_vm.public_ipv6 { gateway: "190.0.100.1".to_string(),
true => " 2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(), mask: "24".to_string(),
false => String::new(), });
}; ips.push(NewVmRespIp {
nic_index: 0,
address: "2a02:2f2d:d301:3100:afe8:a85e:54a0:dd28".to_string(),
gateway: "2a02:2f2d:d301:3100::1".to_string(),
mask: "64".to_string(),
});
if i != 3 { if i != 3 {
let confirmation = NewVmResp{ let confirmation = NewVmResp {
uuid: new_vm.uuid, uuid: new_vm.uuid,
exposed_ports, exposed_ports,
public_ipv4, ovmf_hash: "YouAreNotGettingHacked".to_string(),
public_ipv6, ips,
error: String::new(), error: String::new(),
}; };
info!("Sending NewVmConfirmation: {confirmation:?}"); info!("Sending NewVmConfirmation: {confirmation:?}");
let _ = resp.send(confirmation).await; let _ = resp.send(confirmation).await;
info!("Sending NodeResourceReq");
} else { } else {
let confirmation = NewVmResp { let confirmation = NewVmResp {
uuid: new_vm.uuid, uuid: new_vm.uuid,
exposed_ports: Vec::new(), exposed_ports: Vec::new(),
public_ipv4: String::new(), ovmf_hash: "YouAreNotGettingHacked".to_string(),
public_ipv6: String::new(), ips: Vec::new(),
error: "No.".to_string(), error: "No.".to_string(),
}; };
info!("Sending error for NewVmConfirmation: {confirmation:?}"); info!("Sending error for NewVmConfirmation: {confirmation:?}");
let _ = resp.send(confirmation).await; let _ = resp.send(confirmation).await;
info!("Sending updated about used resources.");
let _ = resource_tx
.send(NodeResourceReq {
node_pubkey: SECURE_PUBLIC_KEY.clone(),
avail_ipv4: 5,
avail_ipv6: 5,
avail_memory_mb: 5000,
avail_vcpus: 32,
avail_storage_gb: 100,
avail_ports: 5000,
max_ports_per_vm: 5,
})
.await;
} }
i += 1; i += 1;
if i == 5 { if i == 5 {
@ -232,21 +241,31 @@ async fn connect_and_run() -> Result<()> {
let mut streaming_tasks = JoinSet::new(); let mut streaming_tasks = JoinSet::new();
register_node(client.clone()).await; register_node(client.clone()).await;
let update_node_res_client = client.clone();
let (node_tx, _node_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(update_node_res(update_node_res_client, node_tx));
let newvm_client = client.clone(); let newvm_client = client.clone();
let (tx, newvm_rx) = tokio::sync::mpsc::channel(6); let (tx, newvm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(listen_for_new_vm_reqs(newvm_client, tx)); streaming_tasks.spawn(listen_for_new_vm_reqs(newvm_client, tx));
let confirm_client = client.clone(); let confirm_client = client.clone();
let (confirm_tx, rx) = tokio::sync::mpsc::channel(6); let resource_client = client.clone();
streaming_tasks.spawn(send_newvm_resp(confirm_client, rx)); let (resource_tx, resource_rx) = tokio::sync::mpsc::channel(6);
let _ = resource_tx
.send(NodeResourceReq {
node_pubkey: SECURE_PUBLIC_KEY.clone(),
avail_ipv4: 10,
avail_ipv6: 10,
avail_memory_mb: 2500,
avail_vcpus: 60,
avail_storage_gb: 200,
avail_ports: 5002,
max_ports_per_vm: 5,
})
.await;
let (confirm_tx, confirm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(send_newvm_resp(confirm_client, confirm_rx));
streaming_tasks.spawn(send_node_resources(resource_client, resource_rx));
tokio::spawn(async move { tokio::spawn(async move {
handle_vm_requests(newvm_rx, confirm_tx).await; handle_vm_requests(newvm_rx, confirm_tx, resource_tx).await;
}); });
let updatevm_client = client.clone(); let updatevm_client = client.clone();
@ -266,7 +285,7 @@ async fn connect_and_run() -> Result<()> {
streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx)); streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));
let task_output = streaming_tasks.join_next().await; let task_output = streaming_tasks.join_next().await;
warn!("One stream exited: {task_output:?} {streaming_tasks:?}"); warn!("One stream exited: {task_output:?}");
Ok(()) Ok(())
} }
@ -287,4 +306,4 @@ async fn main() {
.init(); .init();
info!("Hello! My name is {}", SECURE_PUBLIC_KEY.clone()); info!("Hello! My name is {}", SECURE_PUBLIC_KEY.clone());
connection_wrapper().await; connection_wrapper().await;
} }