Compare commits

...

13 Commits

Author SHA1 Message Date
Ramil_Algayev
9aea6ee55f I need help 2024-12-26 23:18:47 +04:00
Ramil_Algayev
b7c2211d32 very smoll tiny fix to be consistent 2024-12-26 02:04:13 +04:00
Ramil_Algayev
29e5de8424 updated for brain 2024-12-26 01:55:37 +04:00
Ramil_Algayev
5713152d03 updated the namings for clients 2024-12-26 01:40:09 +04:00
Ramil_Algayev
faeacf67eb letting brain to do the timestampping and removing chrono 2024-12-26 01:17:10 +04:00
Ramil_Algayev
6d8cf4f1e0 some name changes 2024-12-26 01:14:52 +04:00
Ramil_Algayev
dc8977e89b this issue made me almost change my career, I need more sleep prob 2024-12-26 00:35:25 +04:00
Ramil_Algayev
1d0b7157c7 updating logging for update 2024-12-25 01:51:14 +04:00
Ramil_Algayev
d43fdc06a1 created update functionality, :P 2024-12-25 00:50:46 +04:00
Ramil_Algayev
a3740a1e11 created update functionality fr this time, I swear 2024-12-24 23:29:34 +04:00
Ramil_Algayev
c0e5acbf24 created update functionality?? sequel 2024-12-24 19:07:18 +04:00
Ramil_Algayev
5e916803b3 created update functionality?? 2024-12-24 03:21:04 +04:00
Ramil_Algayev
959b5c32c9 created update functionality 2024-12-23 17:39:54 +04:00
4 changed files with 244 additions and 69 deletions

@ -8,22 +8,24 @@ message NodePubkey {
string node_pubkey = 1; string node_pubkey = 1;
} }
message RegisterNodeRequest { message RegisterNodeReq {
string node_pubkey = 1; string node_pubkey = 1;
string owner_pubkey = 2; string owner_pubkey = 2;
string ip = 3; string ip = 3;
string country = 4;
string city = 5;
uint32 avail_ports = 6;
uint32 avail_ipv4 = 7;
uint32 avail_ipv6 = 8;
uint32 avail_vcpus = 9;
uint32 avail_memory_mb = 10;
uint32 avail_storage_gb = 11;
uint32 max_ports_per_vm = 12;
} }
message NewVMRequest { message NodeRes {
string node_pubkey = 1;
uint32 avail_ports = 2;
uint32 avail_ipv4 = 3;
uint32 avail_ipv6 = 4;
uint32 avail_vcpus = 5;
uint32 avail_memory_mb = 6;
uint32 avail_storage_gb = 7;
uint32 max_ports_per_vm = 8;
}
message NewVMReq {
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
string hostname = 2; string hostname = 2;
string admin_pubkey = 3; string admin_pubkey = 3;
@ -40,6 +42,23 @@ message NewVMRequest {
string dtrfs_sha = 14; string dtrfs_sha = 14;
} }
message UpdateVMReq {
string uuid = 1;
string node_pubkey = 2;
uint32 disk_size_gb = 3;
uint32 vcpus = 4;
uint32 memory_mb = 5;
string kernel_url = 6;
string kernel_sha = 7;
string dtrfs_url = 8;
string dtrfs_sha = 9;
}
message UpdateVMResp {
string uuid = 1;
string error = 3;
}
message VMContract { message VMContract {
string uuid = 1; string uuid = 1;
string hostname = 2; string hostname = 2;
@ -54,6 +73,7 @@ message VMContract {
string kernel_sha = 11; string kernel_sha = 11;
string dtrfs_sha = 12; string dtrfs_sha = 12;
string created_at = 13; string created_at = 13;
string updated_at = 14;
} }
message ListVMContractsReq { message ListVMContractsReq {
@ -61,7 +81,7 @@ message ListVMContractsReq {
string node_pubkey = 2; string node_pubkey = 2;
} }
message NewVMConfirmation { message NewVMResp {
string uuid = 1; string uuid = 1;
repeated uint32 exposed_ports = 2; repeated uint32 exposed_ports = 2;
string public_ipv4 = 3; string public_ipv4 = 3;
@ -69,16 +89,19 @@ message NewVMConfirmation {
string error = 5; string error = 5;
} }
message DeletedVMUpdate { message DeleteVMReq {
string uuid = 1; string uuid = 1;
} }
service BrainDaemonService { service BrainDaemonService {
rpc RegisterNode (RegisterNodeRequest) returns (Empty); rpc RegisterNode (RegisterNodeReq) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); rpc NodeResUpdate (stream NodeRes) returns (Empty);
rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); rpc SendNewVMResp (stream NewVMResp) returns (Empty);
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
} }
message NodeFilters { message NodeFilters {
@ -101,9 +124,9 @@ message NodeListResp {
} }
service BrainCliService { service BrainCliService {
rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); rpc CreateVMContract (NewVMReq) returns (NewVMResp);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc ListNodes (NodeFilters) returns (stream NodeListResp);
rpc DeleteVM (DeletedVMUpdate) returns (Empty); rpc DeleteVM (DeleteVMReq) returns (Empty);
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
} }

@ -5,8 +5,8 @@ pub mod brain {
use anyhow::Result; use anyhow::Result;
use brain::{ use brain::{
brain_cli_service_client::BrainCliServiceClient, DeletedVmUpdate, ListVmContractsReq, brain_cli_service_client::BrainCliServiceClient, DeleteVmReq, ListVmContractsReq,
NewVmRequest, NodeFilters, NodeListResp, VmContract, NewVmReq, NodeFilters, NodeListResp, VmContract, UpdateVmReq,
}; };
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::{debug, info, warn}; use log::{debug, info, warn};
@ -65,7 +65,7 @@ async fn submit_vm_request(
mut client: BrainCliServiceClient<Channel>, mut client: BrainCliServiceClient<Channel>,
node_pubkey: &str, node_pubkey: &str,
) -> Result<()> { ) -> Result<()> {
let req = NewVmRequest { let req = NewVmReq {
uuid: String::new(), uuid: String::new(),
admin_pubkey: SECURE_PUBLIC_KEY.clone(), admin_pubkey: SECURE_PUBLIC_KEY.clone(),
node_pubkey: node_pubkey.to_string(), node_pubkey: node_pubkey.to_string(),
@ -123,7 +123,7 @@ async fn list_contracts(mut client: BrainCliServiceClient<Channel>) -> Result<Ve
} }
async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Result<()> { async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Result<()> {
let req = DeletedVmUpdate { let req = DeleteVmReq {
uuid: uuid.to_string(), uuid: uuid.to_string(),
}; };
info!("Creating VM {req:?}"); info!("Creating VM {req:?}");
@ -135,6 +135,38 @@ async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Re
Ok(()) Ok(())
} }
async fn update_vm_request(
mut client: BrainCliServiceClient<Channel>,
node_pubkey: &str,
uuid: &str,
) -> Result<()> {
let req = UpdateVmReq {
uuid: uuid.to_string(),
node_pubkey: node_pubkey.to_string(),
vcpus: 4,
memory_mb: 4096,
disk_size_gb: 40,
kernel_url: "thisIsMyNewURL".to_string(),
kernel_sha: "thisIsMyNewSha".to_string(),
dtrfs_url: "thisIsMyNewURL".to_string(),
dtrfs_sha: "thisIsMyNewSha".to_string(),
};
info!("Updating VM {req:?}");
let result = client.update_vm(req).await;
match result {
Ok(confirmation) => {
let confirmation = confirmation.into_inner();
if confirmation.error.is_empty() {
info!("Got VM update confirmation: {confirmation:?}");
} else {
warn!("Got VM update confirmation error: {}", confirmation.error);
};
}
Err(e) => log::error!("Could not update vm: {e:?}"),
};
Ok(())
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
env_logger::builder() env_logger::builder()
@ -154,6 +186,19 @@ async fn main() -> Result<()> {
} }
} }
let nodes = get_node_list(client.clone()).await?;
for node in nodes {
let contracts = list_contracts(client.clone()).await?;
for contract in contracts {
if let Err(e) = update_vm_request(client.clone(), &node.node_pubkey, &contract.uuid).await {
log::error!(
"Received error when updating VM on node {}: {e:?}",
&node.node_pubkey
);
}
}
}
if std::env::var("DELETE_VMS").is_err() { if std::env::var("DELETE_VMS").is_err() {
return Ok(()); return Ok(());
} }

@ -8,22 +8,24 @@ message NodePubkey {
string node_pubkey = 1; string node_pubkey = 1;
} }
message RegisterNodeRequest { message RegisterNodeReq {
string node_pubkey = 1; string node_pubkey = 1;
string owner_pubkey = 2; string owner_pubkey = 2;
string ip = 3; string ip = 3;
string country = 4;
string city = 5;
uint32 avail_ports = 6;
uint32 avail_ipv4 = 7;
uint32 avail_ipv6 = 8;
uint32 avail_vcpus = 9;
uint32 avail_memory_mb = 10;
uint32 avail_storage_gb = 11;
uint32 max_ports_per_vm = 12;
} }
message NewVMRequest { message NodeRes {
string node_pubkey = 1;
uint32 avail_ports = 2;
uint32 avail_ipv4 = 3;
uint32 avail_ipv6 = 4;
uint32 avail_vcpus = 5;
uint32 avail_memory_mb = 6;
uint32 avail_storage_gb = 7;
uint32 max_ports_per_vm = 8;
}
message NewVMReq {
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
string hostname = 2; string hostname = 2;
string admin_pubkey = 3; string admin_pubkey = 3;
@ -40,6 +42,23 @@ message NewVMRequest {
string dtrfs_sha = 14; string dtrfs_sha = 14;
} }
message UpdateVMReq {
string uuid = 1;
string node_pubkey = 2;
uint32 disk_size_gb = 3;
uint32 vcpus = 4;
uint32 memory_mb = 5;
string kernel_url = 6;
string kernel_sha = 7;
string dtrfs_url = 8;
string dtrfs_sha = 9;
}
message UpdateVMResp {
string uuid = 1;
string error = 3;
}
message VMContract { message VMContract {
string uuid = 1; string uuid = 1;
string hostname = 2; string hostname = 2;
@ -54,6 +73,7 @@ message VMContract {
string kernel_sha = 11; string kernel_sha = 11;
string dtrfs_sha = 12; string dtrfs_sha = 12;
string created_at = 13; string created_at = 13;
string updated_at = 14;
} }
message ListVMContractsReq { message ListVMContractsReq {
@ -61,7 +81,7 @@ message ListVMContractsReq {
string node_pubkey = 2; string node_pubkey = 2;
} }
message NewVMConfirmation { message NewVMResp {
string uuid = 1; string uuid = 1;
repeated uint32 exposed_ports = 2; repeated uint32 exposed_ports = 2;
string public_ipv4 = 3; string public_ipv4 = 3;
@ -69,16 +89,19 @@ message NewVMConfirmation {
string error = 5; string error = 5;
} }
message DeletedVMUpdate { message DeleteVMReq {
string uuid = 1; string uuid = 1;
} }
service BrainDaemonService { service BrainDaemonService {
rpc RegisterNode (RegisterNodeRequest) returns (Empty); rpc RegisterNode (RegisterNodeReq) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); rpc NodeResUpdate (stream NodeRes) returns (Empty);
rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); rpc SendNewVMResp (stream NewVMResp) returns (Empty);
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
} }
message NodeFilters { message NodeFilters {
@ -101,9 +124,10 @@ message NodeListResp {
} }
service BrainCliService { service BrainCliService {
rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); rpc CreateVMContract (NewVMReq) returns (NewVMResp);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc ListNodes (NodeFilters) returns (stream NodeListResp);
rpc DeleteVM (DeletedVMUpdate) returns (Empty); rpc DeleteVM (DeleteVMReq) returns (Empty);
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
} }

@ -5,8 +5,8 @@ pub mod brain {
use anyhow::Result; use anyhow::Result;
use brain::{ use brain::{
brain_daemon_service_client::BrainDaemonServiceClient, DeletedVmUpdate, NewVmConfirmation, brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, NewVmResp,
NewVmRequest, NodePubkey, RegisterNodeRequest, NewVmReq, NodePubkey, RegisterNodeReq, UpdateVmReq, UpdateVmResp, NodeRes,
}; };
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
@ -31,7 +31,7 @@ fn generate_random_string() -> String {
async fn listen_for_new_vm_reqs( async fn listen_for_new_vm_reqs(
mut client: BrainDaemonServiceClient<Channel>, mut client: BrainDaemonServiceClient<Channel>,
tx: Sender<NewVmRequest>, tx: Sender<NewVmReq>,
) -> Result<()> { ) -> Result<()> {
debug!("starting listen_for_new_vm_reqs"); debug!("starting listen_for_new_vm_reqs");
let node_pubkey = SECURE_PUBLIC_KEY.clone(); let node_pubkey = SECURE_PUBLIC_KEY.clone();
@ -54,25 +54,40 @@ async fn listen_for_new_vm_reqs(
Ok(()) Ok(())
} }
async fn send_confirmations( async fn send_newvm_resp(
mut client: BrainDaemonServiceClient<Channel>, mut client: BrainDaemonServiceClient<Channel>,
rx: Receiver<NewVmConfirmation>, rx: Receiver<NewVmResp>,
) -> Result<()> { ) -> Result<()> {
debug!("starting send_confirmations stream"); debug!("starting send_newvm_resp stream");
let rx_stream = ReceiverStream::new(rx); let rx_stream = ReceiverStream::new(rx);
client.send_vm_confirmations(rx_stream).await?; client.send_new_vm_resp(rx_stream).await?;
debug!("send_confirmations is about to exit"); debug!("send_newvm_resp is about to exit");
Ok(()) Ok(())
} }
async fn register_node(mut client: BrainDaemonServiceClient<Channel>) { async fn register_node(mut client: BrainDaemonServiceClient<Channel>) {
debug!("Starting node registration..."); debug!("Starting node registration...");
let req = RegisterNodeRequest { let req = RegisterNodeReq {
node_pubkey: SECURE_PUBLIC_KEY.clone(), node_pubkey: SECURE_PUBLIC_KEY.clone(),
owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY, owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY,
ip: "10.0.10.1".to_string(), ip: "10.0.10.1".to_string(),
country: "Cyrodiil".to_string(), };
city: "Bruma".to_string(), match client.register_node(req).await {
Ok(_) => info!(
"Registered as 10.0.10.1 {}",
SECURE_PUBLIC_KEY.clone()
),
Err(e) => error!("Could not register node data: {e:?}"),
};
}
async fn update_node_res(
mut _client: BrainDaemonServiceClient<Channel>,
tx: Sender<NodeRes>,
) -> Result<()> {
debug!("Starting node registration...");
let req = NodeRes {
node_pubkey: SECURE_PUBLIC_KEY.clone(),
avail_ports: 10000, avail_ports: 10000,
avail_ipv4: 10, avail_ipv4: 10,
avail_ipv6: 100_000, avail_ipv6: 100_000,
@ -81,23 +96,23 @@ async fn register_node(mut client: BrainDaemonServiceClient<Channel>) {
avail_storage_gb: 700, avail_storage_gb: 700,
max_ports_per_vm: 5, max_ports_per_vm: 5,
}; };
match client.register_node(req).await {
Ok(_) => info!( if let Err(e) = tx.send(req).await {
"Registered as 10.0.10.1 from Bruma/Cyrodiil with ID {}", error!("Failed to send NodeRes: {e}");
SECURE_PUBLIC_KEY.clone() } else {
), info!("NodeRes sent successfully");
Err(e) => error!("Could not register node data: {e:?}"), }
}; Ok(())
} }
async fn listen_for_deleted_vms( async fn listen_for_deleted_vms(
mut client: BrainDaemonServiceClient<Channel>, mut client: BrainDaemonServiceClient<Channel>,
tx: Sender<DeletedVmUpdate>, tx: Sender<DeleteVmReq>,
) -> Result<()> { ) -> Result<()> {
debug!("starting listen_for_new_vm_reqs"); debug!("starting listen_for_new_vm_reqs");
let node_pubkey = SECURE_PUBLIC_KEY.clone(); let node_pubkey = SECURE_PUBLIC_KEY.clone();
let mut grpc_stream = client let mut grpc_stream = client
.deleted_vm_updates(NodePubkey { node_pubkey }) .get_delete_vm_req(NodePubkey { node_pubkey })
.await? .await?
.into_inner(); .into_inner();
while let Some(stream_update) = grpc_stream.next().await { while let Some(stream_update) = grpc_stream.next().await {
@ -115,7 +130,59 @@ async fn listen_for_deleted_vms(
Ok(()) Ok(())
} }
async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmConfirmation>) { async fn listen_for_update_vm_reqs(
mut client: BrainDaemonServiceClient<Channel>,
tx: Sender<UpdateVmReq>,
) -> Result<()> {
debug!("starting listen_for_update_vm_reqs");
let node_pubkey = SECURE_PUBLIC_KEY.clone();
let mut grpc_stream = client
.get_update_vm_req(NodePubkey { node_pubkey })
.await?
.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(req) => {
info!("Received update vm request: {req:?}");
let _ = tx.send(req).await;
}
Err(e) => {
warn!("Brain disconnected from listen_for_update_vm_reqs: {e}");
}
}
}
debug!("listen_for_update_vm_reqs is about to exit");
Ok(())
}
async fn handle_update_vm_requests(
mut req: Receiver<UpdateVmReq>,
resp: Sender<UpdateVmResp>
) {
info!("Started to handle update vm requests.");
while let Some(update_vm) = req.recv().await {
let confirmation = UpdateVmResp {
uuid: update_vm.uuid,
error: "".to_string(),
};
info!("Sending UpdateVmResp: {confirmation:?}");
let _ = resp.send(confirmation).await;
};
warn!("update vm request handler is ending");
}
async fn send_updatevm_resp(
mut client: BrainDaemonServiceClient<Channel>,
rx: Receiver<UpdateVmResp>,
) -> Result<()> {
debug!("starting send_updatevm_resp stream");
let rx_stream = ReceiverStream::new(rx);
client.send_update_vm_resp(rx_stream).await?;
debug!("send_updatevm_resp is about to exit");
Ok(())
}
async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>) {
info!("Started to handle vm requests. 1 out of 5 requests will return error."); info!("Started to handle vm requests. 1 out of 5 requests will return error.");
let mut i = 0; let mut i = 0;
while let Some(new_vm) = req.recv().await { while let Some(new_vm) = req.recv().await {
@ -132,7 +199,7 @@ async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmC
false => String::new(), false => String::new(),
}; };
if i != 3 { if i != 3 {
let confirmation = NewVmConfirmation { let confirmation = NewVmResp{
uuid: new_vm.uuid, uuid: new_vm.uuid,
exposed_ports, exposed_ports,
public_ipv4, public_ipv4,
@ -142,7 +209,7 @@ async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmC
info!("Sending NewVmConfirmation: {confirmation:?}"); info!("Sending NewVmConfirmation: {confirmation:?}");
let _ = resp.send(confirmation).await; let _ = resp.send(confirmation).await;
} else { } else {
let confirmation = NewVmConfirmation { let confirmation = NewVmResp {
uuid: new_vm.uuid, uuid: new_vm.uuid,
exposed_ports: Vec::new(), exposed_ports: Vec::new(),
public_ipv4: String::new(), public_ipv4: String::new(),
@ -165,6 +232,10 @@ async fn connect_and_run() -> Result<()> {
let mut streaming_tasks = JoinSet::new(); let mut streaming_tasks = JoinSet::new();
register_node(client.clone()).await; register_node(client.clone()).await;
let update_node_res_client = client.clone();
let (node_tx, _node_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(update_node_res(update_node_res_client, node_tx));
let newvm_client = client.clone(); let newvm_client = client.clone();
let (tx, newvm_rx) = tokio::sync::mpsc::channel(6); let (tx, newvm_rx) = tokio::sync::mpsc::channel(6);
@ -172,18 +243,30 @@ async fn connect_and_run() -> Result<()> {
let confirm_client = client.clone(); let confirm_client = client.clone();
let (confirm_tx, rx) = tokio::sync::mpsc::channel(6); let (confirm_tx, rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(send_confirmations(confirm_client, rx)); streaming_tasks.spawn(send_newvm_resp(confirm_client, rx));
tokio::spawn(async move { tokio::spawn(async move {
handle_vm_requests(newvm_rx, confirm_tx).await; handle_vm_requests(newvm_rx, confirm_tx).await;
}); });
let updatevm_client = client.clone();
let (tx, updatevm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(listen_for_update_vm_reqs(updatevm_client, tx));
let resp_client = client.clone();
let (resp_tx, rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(send_updatevm_resp(resp_client, rx));
tokio::spawn(async move {
handle_update_vm_requests(updatevm_rx, resp_tx).await;
});
let deletevms_client = client.clone(); let deletevms_client = client.clone();
let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6); let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx)); streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));
let task_output = streaming_tasks.join_next().await; let task_output = streaming_tasks.join_next().await;
warn!("One stream exited: {task_output:?}"); warn!("One stream exited: {task_output:?} {streaming_tasks:?}");
Ok(()) Ok(())
} }
@ -204,4 +287,4 @@ async fn main() {
.init(); .init();
info!("Hello! My name is {}", SECURE_PUBLIC_KEY.clone()); info!("Hello! My name is {}", SECURE_PUBLIC_KEY.clone());
connection_wrapper().await; connection_wrapper().await;
} }