updater #1
@ -8,7 +8,7 @@ message NodePubkey {
|
|||||||
string node_pubkey = 1;
|
string node_pubkey = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RegisterNodeRequest {
|
message RegisterNodeReq {
|
||||||
string node_pubkey = 1;
|
string node_pubkey = 1;
|
||||||
string owner_pubkey = 2;
|
string owner_pubkey = 2;
|
||||||
string ip = 3;
|
string ip = 3;
|
||||||
@ -23,7 +23,7 @@ message RegisterNodeRequest {
|
|||||||
uint32 max_ports_per_vm = 12;
|
uint32 max_ports_per_vm = 12;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NewVMRequest {
|
message NewVMReq {
|
||||||
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
|
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
|
||||||
string hostname = 2;
|
string hostname = 2;
|
||||||
string admin_pubkey = 3;
|
string admin_pubkey = 3;
|
||||||
@ -40,6 +40,22 @@ message NewVMRequest {
|
|||||||
string dtrfs_sha = 14;
|
string dtrfs_sha = 14;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message UpdateVMReq {
|
||||||
|
string uuid = 1;
|
||||||
|
uint32 disk_size_gb = 3;
|
||||||
|
uint32 vcpus = 4;
|
||||||
|
uint32 memory_mb = 5;
|
||||||
|
string kernel_url = 6;
|
||||||
|
string kernel_sha = 7;
|
||||||
|
string dtrfs_url = 8;
|
||||||
|
string dtrfs_sha = 9;
|
||||||
|
}
|
||||||
|
|
||||||
|
message UpdateVMResp {
|
||||||
|
string uuid = 1;
|
||||||
|
string error = 3;
|
||||||
|
}
|
||||||
|
|
||||||
message VMContract {
|
message VMContract {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
string hostname = 2;
|
string hostname = 2;
|
||||||
@ -54,6 +70,7 @@ message VMContract {
|
|||||||
string kernel_sha = 11;
|
string kernel_sha = 11;
|
||||||
string dtrfs_sha = 12;
|
string dtrfs_sha = 12;
|
||||||
string created_at = 13;
|
string created_at = 13;
|
||||||
|
string updated_at = 14;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ListVMContractsReq {
|
message ListVMContractsReq {
|
||||||
@ -61,7 +78,7 @@ message ListVMContractsReq {
|
|||||||
string node_pubkey = 2;
|
string node_pubkey = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NewVMConfirmation {
|
message NewVMResp {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
repeated uint32 exposed_ports = 2;
|
repeated uint32 exposed_ports = 2;
|
||||||
string public_ipv4 = 3;
|
string public_ipv4 = 3;
|
||||||
@ -69,18 +86,21 @@ message NewVMConfirmation {
|
|||||||
string error = 5;
|
string error = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message DeletedVMUpdate {
|
message DeleteVMReq {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
service BrainDaemonService {
|
service BrainDaemonService {
|
||||||
rpc RegisterNode (RegisterNodeRequest) returns (Empty);
|
rpc RegisterNode (RegisterNodeReq) returns (Empty);
|
||||||
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest);
|
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
|
||||||
rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty);
|
rpc SendNewVMResp (stream NewVMResp) returns (Empty);
|
||||||
rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate);
|
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
|
||||||
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
|
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
|
||||||
|
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
|
||||||
|
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
message NodeFilters {
|
message NodeFilters {
|
||||||
uint32 free_ports = 1;
|
uint32 free_ports = 1;
|
||||||
bool offers_ipv4 = 2;
|
bool offers_ipv4 = 2;
|
||||||
@ -101,9 +121,9 @@ message NodeListResp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
service BrainCliService {
|
service BrainCliService {
|
||||||
rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation);
|
rpc CreateVMContract (NewVMReq) returns (NewVMResp);
|
||||||
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
|
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
|
||||||
rpc ListNodes (NodeFilters) returns (stream NodeListResp);
|
rpc ListNodes (NodeFilters) returns (stream NodeListResp);
|
||||||
rpc DeleteVM (DeletedVMUpdate) returns (Empty);
|
rpc DeleteVM (DeleteVMReq) returns (Empty);
|
||||||
|
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,8 +5,8 @@ pub mod brain {
|
|||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use brain::{
|
use brain::{
|
||||||
brain_cli_service_client::BrainCliServiceClient, DeletedVmUpdate, ListVmContractsReq,
|
brain_cli_service_client::BrainCliServiceClient, DeleteVmReq, ListVmContractsReq, NewVmReq,
|
||||||
NewVmRequest, NodeFilters, NodeListResp, VmContract,
|
NodeFilters, NodeListResp, UpdateVmReq, VmContract,
|
||||||
};
|
};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use log::{debug, info, warn};
|
use log::{debug, info, warn};
|
||||||
@ -65,7 +65,7 @@ async fn submit_vm_request(
|
|||||||
mut client: BrainCliServiceClient<Channel>,
|
mut client: BrainCliServiceClient<Channel>,
|
||||||
node_pubkey: &str,
|
node_pubkey: &str,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let req = NewVmRequest {
|
let req = NewVmReq {
|
||||||
uuid: String::new(),
|
uuid: String::new(),
|
||||||
admin_pubkey: SECURE_PUBLIC_KEY.clone(),
|
admin_pubkey: SECURE_PUBLIC_KEY.clone(),
|
||||||
node_pubkey: node_pubkey.to_string(),
|
node_pubkey: node_pubkey.to_string(),
|
||||||
@ -84,12 +84,12 @@ async fn submit_vm_request(
|
|||||||
info!("Creating VM {req:?}");
|
info!("Creating VM {req:?}");
|
||||||
let result = client.create_vm_contract(req).await;
|
let result = client.create_vm_contract(req).await;
|
||||||
match result {
|
match result {
|
||||||
Ok(confirmation) => {
|
Ok(resp) => {
|
||||||
let confirmation = confirmation.into_inner();
|
let resp = resp.into_inner();
|
||||||
if confirmation.error.is_empty() {
|
if resp.error.is_empty() {
|
||||||
info!("Got VM confirmation: {confirmation:?}");
|
info!("Got NewVMResp: {resp:?}");
|
||||||
} else {
|
} else {
|
||||||
warn!("Got VM confirmation error: {}", confirmation.error);
|
warn!("Got new VM error: {}", resp.error);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
Err(e) => log::error!("Could not create vm: {e:?}"),
|
Err(e) => log::error!("Could not create vm: {e:?}"),
|
||||||
@ -123,7 +123,7 @@ async fn list_contracts(mut client: BrainCliServiceClient<Channel>) -> Result<Ve
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Result<()> {
|
async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Result<()> {
|
||||||
let req = DeletedVmUpdate {
|
let req = DeleteVmReq {
|
||||||
uuid: uuid.to_string(),
|
uuid: uuid.to_string(),
|
||||||
};
|
};
|
||||||
info!("Creating VM {req:?}");
|
info!("Creating VM {req:?}");
|
||||||
@ -135,6 +135,36 @@ async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Re
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn update_vm_request(
|
||||||
|
mut client: BrainCliServiceClient<Channel>,
|
||||||
|
uuid: &str,
|
||||||
|
) -> Result<()> {
|
||||||
|
let req = UpdateVmReq {
|
||||||
|
uuid: uuid.to_string(),
|
||||||
|
vcpus: 4,
|
||||||
|
memory_mb: 4096,
|
||||||
|
disk_size_gb: 40,
|
||||||
|
kernel_url: "thisIsMyNewURL".to_string(),
|
||||||
|
kernel_sha: "thisIsMyNewSha".to_string(),
|
||||||
|
dtrfs_url: "thisIsMyNewURL".to_string(),
|
||||||
|
dtrfs_sha: "thisIsMyNewSha".to_string(),
|
||||||
|
};
|
||||||
|
info!("Updating VM {req:?}");
|
||||||
|
let result = client.update_vm(req).await;
|
||||||
|
match result {
|
||||||
|
Ok(resp) => {
|
||||||
|
let resp = resp.into_inner();
|
||||||
|
if resp.error.is_empty() {
|
||||||
|
info!("Got VM update response: {resp:?}");
|
||||||
|
} else {
|
||||||
|
warn!("Got VM update error: {}", resp.error);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Err(e) => log::error!("Could not update vm: {e:?}"),
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
env_logger::builder()
|
env_logger::builder()
|
||||||
@ -154,6 +184,16 @@ async fn main() -> Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let contracts = list_contracts(client.clone()).await?;
|
||||||
|
for contract in contracts {
|
||||||
|
if let Err(e) = update_vm_request(client.clone(), &contract.uuid).await {
|
||||||
|
log::error!(
|
||||||
|
"Received error when updating VM on node {}: {e:?}",
|
||||||
|
&contract.node_pubkey
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if std::env::var("DELETE_VMS").is_err() {
|
if std::env::var("DELETE_VMS").is_err() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ message NodePubkey {
|
|||||||
string node_pubkey = 1;
|
string node_pubkey = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RegisterNodeRequest {
|
message RegisterNodeReq {
|
||||||
string node_pubkey = 1;
|
string node_pubkey = 1;
|
||||||
string owner_pubkey = 2;
|
string owner_pubkey = 2;
|
||||||
string ip = 3;
|
string ip = 3;
|
||||||
@ -23,7 +23,7 @@ message RegisterNodeRequest {
|
|||||||
uint32 max_ports_per_vm = 12;
|
uint32 max_ports_per_vm = 12;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NewVMRequest {
|
message NewVMReq {
|
||||||
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
|
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
|
||||||
string hostname = 2;
|
string hostname = 2;
|
||||||
string admin_pubkey = 3;
|
string admin_pubkey = 3;
|
||||||
@ -40,6 +40,22 @@ message NewVMRequest {
|
|||||||
string dtrfs_sha = 14;
|
string dtrfs_sha = 14;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message UpdateVMReq {
|
||||||
|
string uuid = 1;
|
||||||
|
uint32 disk_size_gb = 3;
|
||||||
|
uint32 vcpus = 4;
|
||||||
|
uint32 memory_mb = 5;
|
||||||
|
string kernel_url = 6;
|
||||||
|
string kernel_sha = 7;
|
||||||
|
string dtrfs_url = 8;
|
||||||
|
string dtrfs_sha = 9;
|
||||||
|
}
|
||||||
|
|
||||||
|
message UpdateVMResp {
|
||||||
|
string uuid = 1;
|
||||||
|
string error = 3;
|
||||||
|
}
|
||||||
|
|
||||||
message VMContract {
|
message VMContract {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
string hostname = 2;
|
string hostname = 2;
|
||||||
@ -54,6 +70,7 @@ message VMContract {
|
|||||||
string kernel_sha = 11;
|
string kernel_sha = 11;
|
||||||
string dtrfs_sha = 12;
|
string dtrfs_sha = 12;
|
||||||
string created_at = 13;
|
string created_at = 13;
|
||||||
|
string updated_at = 14;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ListVMContractsReq {
|
message ListVMContractsReq {
|
||||||
@ -61,7 +78,7 @@ message ListVMContractsReq {
|
|||||||
string node_pubkey = 2;
|
string node_pubkey = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NewVMConfirmation {
|
message NewVMResp {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
repeated uint32 exposed_ports = 2;
|
repeated uint32 exposed_ports = 2;
|
||||||
string public_ipv4 = 3;
|
string public_ipv4 = 3;
|
||||||
@ -69,18 +86,21 @@ message NewVMConfirmation {
|
|||||||
string error = 5;
|
string error = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message DeletedVMUpdate {
|
message DeleteVMReq {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
service BrainDaemonService {
|
service BrainDaemonService {
|
||||||
rpc RegisterNode (RegisterNodeRequest) returns (Empty);
|
rpc RegisterNode (RegisterNodeReq) returns (Empty);
|
||||||
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest);
|
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
|
||||||
rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty);
|
rpc SendNewVMResp (stream NewVMResp) returns (Empty);
|
||||||
rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate);
|
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
|
||||||
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
|
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
|
||||||
|
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
|
||||||
|
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
message NodeFilters {
|
message NodeFilters {
|
||||||
uint32 free_ports = 1;
|
uint32 free_ports = 1;
|
||||||
bool offers_ipv4 = 2;
|
bool offers_ipv4 = 2;
|
||||||
@ -101,9 +121,9 @@ message NodeListResp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
service BrainCliService {
|
service BrainCliService {
|
||||||
rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation);
|
rpc CreateVMContract (NewVMReq) returns (NewVMResp);
|
||||||
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
|
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
|
||||||
rpc ListNodes (NodeFilters) returns (stream NodeListResp);
|
rpc ListNodes (NodeFilters) returns (stream NodeListResp);
|
||||||
rpc DeleteVM (DeletedVMUpdate) returns (Empty);
|
rpc DeleteVM (DeleteVMReq) returns (Empty);
|
||||||
|
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,8 +5,8 @@ pub mod brain {
|
|||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use brain::{
|
use brain::{
|
||||||
brain_daemon_service_client::BrainDaemonServiceClient, DeletedVmUpdate, NewVmConfirmation,
|
brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, NewVmResp,
|
||||||
NewVmRequest, NodePubkey, RegisterNodeRequest,
|
NewVmReq, NodePubkey, RegisterNodeReq, UpdateVmReq, UpdateVmResp,
|
||||||
};
|
};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
@ -31,7 +31,7 @@ fn generate_random_string() -> String {
|
|||||||
|
|
||||||
async fn listen_for_new_vm_reqs(
|
async fn listen_for_new_vm_reqs(
|
||||||
mut client: BrainDaemonServiceClient<Channel>,
|
mut client: BrainDaemonServiceClient<Channel>,
|
||||||
tx: Sender<NewVmRequest>,
|
tx: Sender<NewVmReq>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
debug!("starting listen_for_new_vm_reqs");
|
debug!("starting listen_for_new_vm_reqs");
|
||||||
let node_pubkey = SECURE_PUBLIC_KEY.clone();
|
let node_pubkey = SECURE_PUBLIC_KEY.clone();
|
||||||
@ -54,20 +54,20 @@ async fn listen_for_new_vm_reqs(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_confirmations(
|
async fn send_newvm_resp(
|
||||||
mut client: BrainDaemonServiceClient<Channel>,
|
mut client: BrainDaemonServiceClient<Channel>,
|
||||||
rx: Receiver<NewVmConfirmation>,
|
rx: Receiver<NewVmResp>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
debug!("starting send_confirmations stream");
|
debug!("starting send_newvm_resp stream");
|
||||||
let rx_stream = ReceiverStream::new(rx);
|
let rx_stream = ReceiverStream::new(rx);
|
||||||
client.send_vm_confirmations(rx_stream).await?;
|
client.send_new_vm_resp(rx_stream).await?;
|
||||||
debug!("send_confirmations is about to exit");
|
debug!("send_newvm_resp is about to exit");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn register_node(mut client: BrainDaemonServiceClient<Channel>) {
|
async fn register_node(mut client: BrainDaemonServiceClient<Channel>) {
|
||||||
debug!("Starting node registration...");
|
debug!("Starting node registration...");
|
||||||
let req = RegisterNodeRequest {
|
let req = RegisterNodeReq {
|
||||||
node_pubkey: SECURE_PUBLIC_KEY.clone(),
|
node_pubkey: SECURE_PUBLIC_KEY.clone(),
|
||||||
owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY,
|
owner_pubkey: "IamTheOwnerOf".to_string() + &SECURE_PUBLIC_KEY,
|
||||||
ip: "10.0.10.1".to_string(),
|
ip: "10.0.10.1".to_string(),
|
||||||
@ -92,12 +92,12 @@ async fn register_node(mut client: BrainDaemonServiceClient<Channel>) {
|
|||||||
|
|
||||||
async fn listen_for_deleted_vms(
|
async fn listen_for_deleted_vms(
|
||||||
mut client: BrainDaemonServiceClient<Channel>,
|
mut client: BrainDaemonServiceClient<Channel>,
|
||||||
tx: Sender<DeletedVmUpdate>,
|
tx: Sender<DeleteVmReq>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
debug!("starting listen_for_new_vm_reqs");
|
debug!("starting listen_for_new_vm_reqs");
|
||||||
let node_pubkey = SECURE_PUBLIC_KEY.clone();
|
let node_pubkey = SECURE_PUBLIC_KEY.clone();
|
||||||
let mut grpc_stream = client
|
let mut grpc_stream = client
|
||||||
.deleted_vm_updates(NodePubkey { node_pubkey })
|
.get_delete_vm_req(NodePubkey { node_pubkey })
|
||||||
.await?
|
.await?
|
||||||
.into_inner();
|
.into_inner();
|
||||||
while let Some(stream_update) = grpc_stream.next().await {
|
while let Some(stream_update) = grpc_stream.next().await {
|
||||||
@ -115,7 +115,59 @@ async fn listen_for_deleted_vms(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmConfirmation>) {
|
async fn listen_for_update_vm_reqs(
|
||||||
|
mut client: BrainDaemonServiceClient<Channel>,
|
||||||
|
tx: Sender<UpdateVmReq>,
|
||||||
|
) -> Result<()> {
|
||||||
|
debug!("starting listen_for_update_vm_reqs");
|
||||||
|
let node_pubkey = SECURE_PUBLIC_KEY.clone();
|
||||||
|
let mut grpc_stream = client
|
||||||
|
.get_update_vm_req(NodePubkey { node_pubkey })
|
||||||
|
.await?
|
||||||
|
.into_inner();
|
||||||
|
while let Some(stream_update) = grpc_stream.next().await {
|
||||||
|
match stream_update {
|
||||||
|
Ok(req) => {
|
||||||
|
info!("Received update vm request: {req:?}");
|
||||||
|
let _ = tx.send(req).await;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Brain disconnected from listen_for_update_vm_reqs: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
debug!("listen_for_update_vm_reqs is about to exit");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_update_vm_requests(
|
||||||
|
mut req: Receiver<UpdateVmReq>,
|
||||||
|
resp_chan: Sender<UpdateVmResp>
|
||||||
|
) {
|
||||||
|
info!("Started to handle update vm requests.");
|
||||||
|
while let Some(update_vm) = req.recv().await {
|
||||||
|
let update_vm_resp = UpdateVmResp {
|
||||||
|
uuid: update_vm.uuid,
|
||||||
|
error: "".to_string(),
|
||||||
|
};
|
||||||
|
info!("Sending UpdateVmResp: {update_vm_resp:?}");
|
||||||
|
let _ = resp_chan.send(update_vm_resp).await;
|
||||||
|
};
|
||||||
|
warn!("update vm request handler is ending");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_updatevm_resp(
|
||||||
|
mut client: BrainDaemonServiceClient<Channel>,
|
||||||
|
rx: Receiver<UpdateVmResp>,
|
||||||
|
) -> Result<()> {
|
||||||
|
debug!("starting send_updatevm_resp stream");
|
||||||
|
let rx_stream = ReceiverStream::new(rx);
|
||||||
|
client.send_update_vm_resp(rx_stream).await?;
|
||||||
|
debug!("send_updatevm_resp is about to exit");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_vm_requests(mut req: Receiver<NewVmReq>, resp: Sender<NewVmResp>) {
|
||||||
info!("Started to handle vm requests. 1 out of 5 requests will return error.");
|
info!("Started to handle vm requests. 1 out of 5 requests will return error.");
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
while let Some(new_vm) = req.recv().await {
|
while let Some(new_vm) = req.recv().await {
|
||||||
@ -132,7 +184,7 @@ async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmC
|
|||||||
false => String::new(),
|
false => String::new(),
|
||||||
};
|
};
|
||||||
if i != 3 {
|
if i != 3 {
|
||||||
let confirmation = NewVmConfirmation {
|
let confirmation = NewVmResp{
|
||||||
uuid: new_vm.uuid,
|
uuid: new_vm.uuid,
|
||||||
exposed_ports,
|
exposed_ports,
|
||||||
public_ipv4,
|
public_ipv4,
|
||||||
@ -142,7 +194,7 @@ async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmC
|
|||||||
info!("Sending NewVmConfirmation: {confirmation:?}");
|
info!("Sending NewVmConfirmation: {confirmation:?}");
|
||||||
let _ = resp.send(confirmation).await;
|
let _ = resp.send(confirmation).await;
|
||||||
} else {
|
} else {
|
||||||
let confirmation = NewVmConfirmation {
|
let confirmation = NewVmResp {
|
||||||
uuid: new_vm.uuid,
|
uuid: new_vm.uuid,
|
||||||
exposed_ports: Vec::new(),
|
exposed_ports: Vec::new(),
|
||||||
public_ipv4: String::new(),
|
public_ipv4: String::new(),
|
||||||
@ -172,12 +224,25 @@ async fn connect_and_run() -> Result<()> {
|
|||||||
|
|
||||||
let confirm_client = client.clone();
|
let confirm_client = client.clone();
|
||||||
let (confirm_tx, rx) = tokio::sync::mpsc::channel(6);
|
let (confirm_tx, rx) = tokio::sync::mpsc::channel(6);
|
||||||
streaming_tasks.spawn(send_confirmations(confirm_client, rx));
|
streaming_tasks.spawn(send_newvm_resp(confirm_client, rx));
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
handle_vm_requests(newvm_rx, confirm_tx).await;
|
handle_vm_requests(newvm_rx, confirm_tx).await;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let updatevm_client = client.clone();
|
||||||
|
let (tx, updatevm_rx) = tokio::sync::mpsc::channel(6);
|
||||||
|
streaming_tasks.spawn(listen_for_update_vm_reqs(updatevm_client, tx));
|
||||||
|
|
||||||
|
let resp_client = client.clone();
|
||||||
|
let (resp_tx, rx) = tokio::sync::mpsc::channel(6);
|
||||||
|
streaming_tasks.spawn(send_updatevm_resp(resp_client, rx));
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
handle_update_vm_requests(updatevm_rx, resp_tx).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
let deletevms_client = client.clone();
|
let deletevms_client = client.clone();
|
||||||
let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6);
|
let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6);
|
||||||
streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));
|
streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));
|
||||||
|
Loading…
Reference in New Issue
Block a user