created update functionality

This commit is contained in:
Ramil_Algayev 2024-12-23 21:25:08 +04:00
parent 7905c1c165
commit c7cc1565e2
3 changed files with 67 additions and 4 deletions

@ -40,6 +40,23 @@ message NewVMRequest {
string dtrfs_sha = 14; string dtrfs_sha = 14;
} }
message UpdateVMRequest {
string uuid = 1;
uint32 disk_size_gb = 2;
uint32 vcpus = 3;
uint32 memory_mb = 4;
string kernel_url = 5;
string kernel_sha = 6;
string dtrfs_url = 7;
string dtrfs_sha = 8;
}
message UpdateVMResp {
string uuid = 1;
string timestamp = 2;
string error = 3;
}
message VMContract { message VMContract {
string uuid = 1; string uuid = 1;
string hostname = 2; string hostname = 2;
@ -79,6 +96,7 @@ service BrainDaemonService {
rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty);
rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc UpdateVMs (NodePubkey) returns (stream UpdateVMRequest);
} }
message NodeFilters { message NodeFilters {
@ -105,5 +123,5 @@ service BrainCliService {
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc ListNodes (NodeFilters) returns (stream NodeListResp);
rpc DeleteVM (DeletedVMUpdate) returns (Empty); rpc DeleteVM (DeletedVMUpdate) returns (Empty);
rpc UpdateVM (UpdateVMRequest) returns (UpdateVMResp);
} }

@ -1,10 +1,10 @@
#![allow(dead_code)] #![allow(dead_code)]
use crate::grpc::brain as grpc; use crate::grpc::brain::{self as grpc, UpdateVmRequest, UpdateVmResp};
use dashmap::DashMap; use dashmap::DashMap;
use log::{debug, info, warn}; use log::{debug, info, warn};
use std::sync::RwLock; use std::sync::RwLock;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Sender as OneshotSender; use tokio::sync::oneshot::{self, Sender as OneshotSender};
#[derive(Eq, Hash, PartialEq, Clone)] #[derive(Eq, Hash, PartialEq, Clone)]
pub struct Node { pub struct Node {
@ -98,6 +98,7 @@ pub struct BrainData {
tmp_vmrequests: DashMap<String, (grpc::NewVmRequest, OneshotSender<grpc::NewVmConfirmation>)>, tmp_vmrequests: DashMap<String, (grpc::NewVmRequest, OneshotSender<grpc::NewVmConfirmation>)>,
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>, daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>,
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>, daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>,
daemon_updatevm_tx: DashMap<String, Sender<grpc::UpdateVmRequest>>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -115,6 +116,7 @@ impl BrainData {
tmp_vmrequests: DashMap::new(), tmp_vmrequests: DashMap::new(),
daemon_deletevm_tx: DashMap::new(), daemon_deletevm_tx: DashMap::new(),
daemon_newvm_tx: DashMap::new(), daemon_newvm_tx: DashMap::new(),
daemon_updatevm_tx: DashMap::new(),
} }
} }

@ -159,6 +159,29 @@ impl BrainDaemonService for BrainDaemonMock {
Box::pin(output_stream) as Self::ListVMContractsStream Box::pin(output_stream) as Self::ListVMContractsStream
)) ))
} }
type UpdateVMsStream = Pin<Box<dyn Stream<Item = Result<UpdateVmRequest, Status>> + Send>>;
async fn update_v_ms(
&self,
req: Request<NodePubkey>,
) -> Result<Response<Self::UpdateVMsStream>, Status> {
let node_pubkey = req.into_inner().node_pubkey;
let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (_, mut data_rx) = mpsc::channel(6);
tokio::spawn(async move {
while let Some(update_req) = data_rx.recv().await {
if let Err(e) = grpc_tx.send(Ok(update_req)).await {
warn!("Could not send UpdateVmRequest to {node_pubkey}: {e:?}");
break;
}
}
});
let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new(Box::pin(output_stream) as Self::UpdateVMsStream))
}
} }
#[tonic::async_trait] #[tonic::async_trait]
@ -232,4 +255,24 @@ impl BrainCliService for BrainCliMock {
self.data.delete_vm(req).await; self.data.delete_vm(req).await;
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
} }
async fn update_vm(
&self,
req: Request<UpdateVmRequest>,
) -> Result<Response<UpdateVmResp>, Status> {
let req = req.into_inner();
info!("CLI requested UpdateVM: {req:?}");
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
self.data.submit_updatevmrequest(req, oneshot_tx).await;
match oneshot_rx.await {
Ok(resp) => {
info!("Returning UpdateVmResp: {resp:?}");
Ok(Response::new(resp))
},
Err(e) => {
log::error!("Error waiting for UpdateVmResp: {e:?}");
Err(Status::unknown("Daemon not responding."))
}
}
}
} }