created update functionality fr this time, I swear

This commit is contained in:
Ramil_Algayev 2024-12-24 23:29:34 +04:00
parent c0e5acbf24
commit a3740a1e11
2 changed files with 50 additions and 24 deletions

@ -6,7 +6,7 @@ pub mod brain {
use anyhow::Result; use anyhow::Result;
use brain::{ use brain::{
brain_cli_service_client::BrainCliServiceClient, DeletedVmUpdate, ListVmContractsReq, brain_cli_service_client::BrainCliServiceClient, DeletedVmUpdate, ListVmContractsReq,
NewVmRequest, NodeFilters, NodeListResp, VmContract, UpdateVmRequest, UpdateVmResp, NewVmRequest, NodeFilters, NodeListResp, VmContract, UpdateVmRequest,
}; };
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::{debug, info, warn}; use log::{debug, info, warn};
@ -135,11 +135,13 @@ async fn delete_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Re
Ok(()) Ok(())
} }
async fn update_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Result<UpdateVmResp> { async fn update_vm_request(
info!("Updatind VM {uuid}"); mut client: BrainCliServiceClient<Channel>,
node_pubkey: &str,
) -> Result<()> {
let req = UpdateVmRequest { let req = UpdateVmRequest {
uuid: uuid.to_string(), uuid: String::new(),
node_pubkey: SECURE_PUBLIC_KEY.to_string(), node_pubkey: node_pubkey.to_string(),
vcpus: 4, vcpus: 4,
memory_mb: 4096, memory_mb: 4096,
disk_size_gb: 40, disk_size_gb: 40,
@ -148,10 +150,20 @@ async fn update_vm(mut client: BrainCliServiceClient<Channel>, uuid: &str) -> Re
dtrfs_url: "thisIsMyNewURL".to_string(), dtrfs_url: "thisIsMyNewURL".to_string(),
dtrfs_sha: "thisIsMyNewSha".to_string(), dtrfs_sha: "thisIsMyNewSha".to_string(),
}; };
info!("Creating VM {req:?}");
let result = client.update_vm(req).await; let result = client.update_vm(req).await;
match result {
result.map(|msg| msg.into_inner()).map_err(|e| anyhow::Error::new(e)) Ok(confirmation) => {
let confirmation = confirmation.into_inner();
if confirmation.error.is_empty() {
info!("Got VM confirmation: {confirmation:?}");
} else {
warn!("Got VM confirmation error: {}", confirmation.error);
};
}
Err(e) => log::error!("Could not create vm: {e:?}"),
};
Ok(())
} }
#[tokio::main] #[tokio::main]
@ -175,7 +187,7 @@ async fn main() -> Result<()> {
let contracts = list_contracts(client.clone()).await?; let contracts = list_contracts(client.clone()).await?;
for contract in contracts { for contract in contracts {
if let Err(e) = update_vm(client.clone(), &contract.uuid).await { if let Err(e) = update_vm_request(client.clone(), &contract.uuid).await {
log::error!("Received error when updating VM {}: {e:?}", &contract.uuid); log::error!("Received error when updating VM {}: {e:?}", &contract.uuid);
} }
} }

@ -125,7 +125,7 @@ async fn listen_for_update_vm_reqs(
let mut grpc_stream = client let mut grpc_stream = client
.get_update_vm(NodePubkey { node_pubkey }) .get_update_vm(NodePubkey { node_pubkey })
.await? .await?
.into_inner(); .into_inner();
while let Some(stream_update) = grpc_stream.next().await { while let Some(stream_update) = grpc_stream.next().await {
match stream_update { match stream_update {
Ok(req) => { Ok(req) => {
@ -143,20 +143,30 @@ async fn listen_for_update_vm_reqs(
async fn handle_update_vm_requests( async fn handle_update_vm_requests(
mut req: Receiver<UpdateVmRequest>, mut req: Receiver<UpdateVmRequest>,
resp: Sender<UpdateVmResp>, resp: Sender<UpdateVmResp>
) { ) {
info!("Started to handle update vm requests."); info!("Started to handle update vm requests.");
while let Some(update_vm) = req.recv().await { while let Some(update_vm) = req.recv().await {
info!("Updating vm: {update_vm:?}"); let confirmation = UpdateVmResp {
let response = UpdateVmResp { uuid: update_vm.uuid,
uuid: update_vm.uuid.clone(),
timestamp: chrono::Utc::now().to_rfc3339(), timestamp: chrono::Utc::now().to_rfc3339(),
error: String::new(), error: "No errors yet".to_string(),
}; };
info!("Sending UpdateVmResp: {response:?}"); info!("Sending UpdateVmResp: {confirmation:?}");
let _ = resp.send(response).await; let _ = resp.send(confirmation).await;
} };
warn!("update vm request handler is ending"); warn!("vm request handler is ending");
}
async fn send_confirmations_update(
mut client: BrainDaemonServiceClient<Channel>,
rx: Receiver<UpdateVmResp>,
) -> Result<()> {
debug!("starting send_confirmations_update stream");
let rx_stream = ReceiverStream::new(rx);
client.send_update_vm(rx_stream).await?;
debug!("send_confirmations_update is about to exit");
Ok(())
} }
async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmConfirmation>) { async fn handle_vm_requests(mut req: Receiver<NewVmRequest>, resp: Sender<NewVmConfirmation>) {
@ -223,14 +233,18 @@ async fn connect_and_run() -> Result<()> {
}); });
let updatevm_client = client.clone(); let updatevm_client = client.clone();
let (update_tx, update_rx) = tokio::sync::mpsc::channel(6); let (tx, updatevm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(listen_for_update_vm_reqs(updatevm_client, update_tx)); streaming_tasks.spawn(listen_for_update_vm_reqs(updatevm_client, tx));
let resp_client = client.clone();
let (resp_tx, rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(send_confirmations_update(resp_client, rx));
let (update_resp_tx, _) = tokio::sync::mpsc::channel(6);
tokio::spawn(async move { tokio::spawn(async move {
handle_update_vm_requests(update_rx, update_resp_tx).await; handle_update_vm_requests(updatevm_rx, resp_tx).await;
}); });
let deletevms_client = client.clone(); let deletevms_client = client.clone();
let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6); let (tx, _deletevm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx)); streaming_tasks.spawn(listen_for_deleted_vms(deletevms_client, tx));