diff --git a/cli-mock/src/main.rs b/cli-mock/src/main.rs index 41ddfac..a30efc4 100644 --- a/cli-mock/src/main.rs +++ b/cli-mock/src/main.rs @@ -6,7 +6,7 @@ pub mod brain { use anyhow::Result; use brain::{ brain_cli_service_client::BrainCliServiceClient, DeletedVmUpdate, ListVmContractsReq, - NewVmRequest, NodeFilters, NodeListResp, VmContract, UpdateVmRequest, UpdateVmResp, + NewVmRequest, NodeFilters, NodeListResp, VmContract, UpdateVmRequest, }; use lazy_static::lazy_static; use log::{debug, info, warn}; @@ -135,11 +135,13 @@ async fn delete_vm(mut client: BrainCliServiceClient, uuid: &str) -> Re Ok(()) } -async fn update_vm(mut client: BrainCliServiceClient, uuid: &str) -> Result { - info!("Updatind VM {uuid}"); +async fn update_vm_request( + mut client: BrainCliServiceClient, + node_pubkey: &str, +) -> Result<()> { let req = UpdateVmRequest { - uuid: uuid.to_string(), - node_pubkey: SECURE_PUBLIC_KEY.to_string(), + uuid: String::new(), + node_pubkey: node_pubkey.to_string(), vcpus: 4, memory_mb: 4096, disk_size_gb: 40, @@ -148,10 +150,20 @@ async fn update_vm(mut client: BrainCliServiceClient, uuid: &str) -> Re dtrfs_url: "thisIsMyNewURL".to_string(), dtrfs_sha: "thisIsMyNewSha".to_string(), }; - + info!("Creating VM {req:?}"); let result = client.update_vm(req).await; - - result.map(|msg| msg.into_inner()).map_err(|e| anyhow::Error::new(e)) + match result { + Ok(confirmation) => { + let confirmation = confirmation.into_inner(); + if confirmation.error.is_empty() { + info!("Got VM confirmation: {confirmation:?}"); + } else { + warn!("Got VM confirmation error: {}", confirmation.error); + }; + } + Err(e) => log::error!("Could not create vm: {e:?}"), + }; + Ok(()) } #[tokio::main] @@ -175,7 +187,7 @@ async fn main() -> Result<()> { let contracts = list_contracts(client.clone()).await?; for contract in contracts { - if let Err(e) = update_vm(client.clone(), &contract.uuid).await { + if let Err(e) = update_vm_request(client.clone(), &contract.uuid).await { log::error!("Received error when updating VM {}: {e:?}", &contract.uuid); } } diff --git a/daemon-mock/src/main.rs b/daemon-mock/src/main.rs index 379ee2e..3a2caad 100644 --- a/daemon-mock/src/main.rs +++ b/daemon-mock/src/main.rs @@ -125,7 +125,7 @@ async fn listen_for_update_vm_reqs( let mut grpc_stream = client .get_update_vm(NodePubkey { node_pubkey }) .await? - .into_inner(); + .into_inner(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(req) => { @@ -143,20 +143,30 @@ async fn listen_for_update_vm_reqs( async fn handle_update_vm_requests( mut req: Receiver, - resp: Sender, -) { + resp: Sender + ) { info!("Started to handle update vm requests."); while let Some(update_vm) = req.recv().await { - info!("Updating vm: {update_vm:?}"); - let response = UpdateVmResp { - uuid: update_vm.uuid.clone(), + let confirmation = UpdateVmResp { + uuid: update_vm.uuid, timestamp: chrono::Utc::now().to_rfc3339(), - error: String::new(), + error: "No errors yet".to_string(), }; - info!("Sending UpdateVmResp: {response:?}"); - let _ = resp.send(response).await; - } - warn!("update vm request handler is ending"); + info!("Sending UpdateVmResp: {confirmation:?}"); + let _ = resp.send(confirmation).await; + }; + warn!("vm request handler is ending"); +} + +async fn send_confirmations_update( + mut client: BrainDaemonServiceClient, + rx: Receiver, +) -> Result<()> { + debug!("starting send_confirmations_update stream"); + let rx_stream = ReceiverStream::new(rx); + client.send_update_vm(rx_stream).await?; + debug!("send_confirmations_update is about to exit"); + Ok(()) } async fn handle_vm_requests(mut req: Receiver, resp: Sender) { @@ -223,14 +233,18 @@ async fn connect_and_run() -> Result<()> { }); let updatevm_client = client.clone(); - let (update_tx, update_rx) = tokio::sync::mpsc::channel(6); - streaming_tasks.spawn(listen_for_update_vm_reqs(updatevm_client, update_tx)); + let (tx, updatevm_rx) = tokio::sync::mpsc::channel(6); + streaming_tasks.spawn(listen_for_update_vm_reqs(updatevm_client, tx)); + + let resp_client = client.clone(); + let (resp_tx, rx) = tokio::sync::mpsc::channel(6); + streaming_tasks.spawn(send_confirmations_update(resp_client, rx)); - let (update_resp_tx, _) = tokio::sync::mpsc::channel(6); tokio::spawn(async move { - handle_update_vm_requests(update_rx, update_resp_tx).await; + handle_update_vm_requests(updatevm_rx, resp_tx).await; }); + let deletevms_client = client.clone(); let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6); streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));