updater branch merge #1

Merged
ghe0 merged 10 commits from updater into main 2024-12-25 22:59:18 +00:00
3 changed files with 96 additions and 65 deletions
Showing only changes of commit 59d4e25bb1 - Show all commits

@ -97,7 +97,8 @@ service BrainDaemonService {
rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty);
rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc UpdateVMs (NodePubkey) returns (stream UpdateVMRequest); rpc GetUpdateVM (NodePubkey) returns (stream UpdateVMRequest);
ghe0 marked this conversation as resolved Outdated
Outdated
Review

These streams are becoming confusing and hard to read, both in the proto and also in the code. We should improve readability a bit by renaming some of the RPCs and some of the messages.

Let's do this:

service BrainDaemonService {
  rpc RegisterNode (RegisterNodeReq) returns (Empty);
  rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
  rpc SendNewVMResp (stream NewVMResp) returns (Empty);
  rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
  rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
  rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
  rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
}
These streams are becoming confusing and hard to read, both in the proto and also in the code. We should improve readability a bit by renaming some of the RPCs and some of the messages. Let's do this: ``` service BrainDaemonService { rpc RegisterNode (RegisterNodeReq) returns (Empty); rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); rpc SendNewVMResp (stream NewVMResp) returns (Empty); rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); } ```
rpc SendUpdateVM (stream UpdateVMResp) returns (Empty);
} }
message NodeFilters { message NodeFilters {
@ -125,4 +126,4 @@ service BrainCliService {
rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc ListNodes (NodeFilters) returns (stream NodeListResp);
rpc DeleteVM (DeletedVMUpdate) returns (Empty); rpc DeleteVM (DeletedVMUpdate) returns (Empty);
rpc UpdateVM (UpdateVMRequest) returns (UpdateVMResp); rpc UpdateVM (UpdateVMRequest) returns (UpdateVMResp);
} }

@ -96,7 +96,6 @@ pub struct BrainData {
nodes: RwLock<Vec<Node>>, nodes: RwLock<Vec<Node>>,
contracts: RwLock<Vec<Contract>>, contracts: RwLock<Vec<Contract>>,
tmp_vmrequests: DashMap<String, (grpc::NewVmRequest, OneshotSender<grpc::NewVmConfirmation>)>, tmp_vmrequests: DashMap<String, (grpc::NewVmRequest, OneshotSender<grpc::NewVmConfirmation>)>,
tmp_updatevmrequests: DashMap<String, (grpc::UpdateVmRequest, OneshotSender<grpc::UpdateVmResp>)>,
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>, daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>,
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>, daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>,
daemon_updatevm_tx: DashMap<String, Sender<grpc::UpdateVmRequest>>, daemon_updatevm_tx: DashMap<String, Sender<grpc::UpdateVmRequest>>,
@ -115,7 +114,6 @@ impl BrainData {
nodes: RwLock::new(Vec::new()), nodes: RwLock::new(Vec::new()),
contracts: RwLock::new(Vec::new()), contracts: RwLock::new(Vec::new()),
tmp_vmrequests: DashMap::new(), tmp_vmrequests: DashMap::new(),
tmp_updatevmrequests: DashMap::new(),
daemon_deletevm_tx: DashMap::new(), daemon_deletevm_tx: DashMap::new(),
daemon_newvm_tx: DashMap::new(), daemon_newvm_tx: DashMap::new(),
daemon_updatevm_tx: DashMap::new(), daemon_updatevm_tx: DashMap::new(),
@ -139,6 +137,10 @@ impl BrainData {
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
} }
pub fn add_daemon_updatevm_tx(&self, node_pubkey: &str, _tx: Sender<grpc::UpdateVmRequest>) {
self.daemon_updatevm_tx.insert(node_pubkey.to_string(), _tx);
}
pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) {
if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) { if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) {
info!("Found vm {}. Deleting...", delete_vm.uuid); info!("Found vm {}. Deleting...", delete_vm.uuid);
@ -241,35 +243,17 @@ impl BrainData {
} }
} }
pub async fn submit_updatevmrequest( pub fn update_vm(&self, req: grpc::UpdateVmRequest) -> Result<(), String> {
&self, let mut contracts = self.contracts.write().unwrap();
req: grpc::UpdateVmRequest, if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == req.uuid) {
tx: OneshotSender<grpc::UpdateVmResp>, contract.disk_size_gb = req.disk_size_gb;
) { contract.vcpus = req.vcpus;
let uuid = req.uuid.clone(); contract.memory_mb = req.memory_mb;
info!("Inserting update vm request in memory: {:?}", req); contract.kernel_sha = req.kernel_sha;
self.tmp_updatevmrequests.insert(uuid.clone(), (req.clone(), tx)); contract.dtrfs_sha = req.dtrfs_sha;
Ok(())
if let Some(server_tx) = self.daemon_updatevm_tx.get(&req.node_pubkey) {
if server_tx.send(req.clone()).await.is_err() {
warn!("Daemon {} RX dropped before sending update VM. Cleaning memory...", &req.node_pubkey);
if let Some((_, oneshot_tx)) = self.tmp_updatevmrequests.remove(&uuid) {
let _ = oneshot_tx.1.send(grpc::UpdateVmResp {
uuid,
timestamp: "".to_string(),
error: "Daemon is offline.".to_string(),
});
}
}
} else { } else {
warn!("No daemon TX found for {}", req.node_pubkey); Err(format!("Contract {} not found", req.uuid))
if let Some((_, oneshot_tx)) = self.tmp_updatevmrequests.remove(&uuid) {
let _ = oneshot_tx.1.send(grpc::UpdateVmResp {
uuid,
timestamp: "".to_string(),
error: "Daemon is offline.".to_string(),
});
}
} }
} }

@ -103,8 +103,7 @@ impl BrainDaemonService for BrainDaemonMock {
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
} }
type DeletedVMUpdatesStream = type DeletedVMUpdatesStream = Pin<Box<dyn Stream<Item = Result<DeletedVmUpdate, Status>> + Send>>;
Pin<Box<dyn Stream<Item = Result<DeletedVmUpdate, Status>> + Send>>;
async fn deleted_vm_updates( async fn deleted_vm_updates(
&self, &self,
@ -160,27 +159,73 @@ impl BrainDaemonService for BrainDaemonMock {
)) ))
} }
type UpdateVMsStream = Pin<Box<dyn Stream<Item = Result<UpdateVmRequest, Status>> + Send>>; type GetUpdateVMStream = Pin<Box<dyn Stream<Item = Result<UpdateVmRequest, Status>> + Send>>;
async fn update_v_ms( async fn get_update_vm(
&self, &self,
req: Request<NodePubkey>, req: Request<NodePubkey>,
) -> Result<Response<Self::UpdateVMsStream>, Status> { ) -> Result<Response<Self::GetUpdateVMStream>, Status> {
let node_pubkey = req.into_inner().node_pubkey; let req = req.into_inner();
info!("Node {} requested GetUpdateVMsStream", req.node_pubkey);
let (grpc_tx, grpc_rx) = mpsc::channel(6); let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (_, mut data_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6);
self.data
.clone()
.add_daemon_updatevm_tx(&req.node_pubkey, data_tx);
let data = self.data.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Some(update_req) = data_rx.recv().await { while let Some(updatevmreq) = data_rx.recv().await {
if let Err(e) = grpc_tx.send(Ok(update_req)).await { debug!("Sending UpdateVMRequest to {}: {:?}", req.node_pubkey.clone(), updatevmreq.clone());
warn!("Could not send UpdateVmRequest to {node_pubkey}: {e:?}"); if let Err(e) = grpc_tx.send(Ok(updatevmreq.clone())).await {
break; warn!("Could not send UpdateVMRequest to {}: {e:?}", req.node_pubkey.clone());
continue;
} }
data.update_vm(UpdateVmRequest {
uuid: updatevmreq.uuid,
node_pubkey: updatevmreq.node_pubkey,
disk_size_gb: updatevmreq.disk_size_gb,
memory_mb: updatevmreq.memory_mb,
vcpus: updatevmreq.vcpus,
kernel_sha: updatevmreq.kernel_sha,
dtrfs_sha: updatevmreq.dtrfs_sha,
kernel_url: updatevmreq.kernel_url,
dtrfs_url: updatevmreq.dtrfs_url,
});
break;
} }
}); });
let output_stream = ReceiverStream::new(grpc_rx); let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new(Box::pin(output_stream) as Self::UpdateVMsStream)) Ok(Response::new(
Box::pin(output_stream) as Self::GetUpdateVMStream
))
}
async fn send_update_vm(
&self,
req: Request<Streaming<UpdateVmResp>>,
) -> Result<Response<Empty>, Status> {
let req = req.into_inner();
info!("Received UpdateVMs request");
// Perform the update operation
let result = self.data.update_all_vms().await;
match result {
Ok(_) => {
let response = UpdateVmResp {
success: true,
message: "All VMs updated successfully".to_string(),
};
Ok(Response::new(response))
}
Err(e) => {
let response = UpdateVmResp {
success: false,
message: format!("Failed to update VMs: {:?}", e),
};
Err(Status::internal(response.message))
}
}
} }
} }
@ -209,6 +254,25 @@ impl BrainCliService for BrainCliMock {
} }
} }
async fn update_vm(
&self,
req: Request<UpdateVmRequest>,
) -> Result<Response<UpdateVmResp>, Status> {
let req = req.into_inner();
match self.data.update_vm(req.clone()) {
Ok(_) => Ok(Response::new(UpdateVmResp {
uuid: req.uuid,
timestamp: format!("{:?}", std::time::SystemTime::now()),
error: "".to_string(),
})),
Err(e) => Ok(Response::new(UpdateVmResp {
uuid: req.uuid,
timestamp: "".to_string(),
error: e,
})),
}
}
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>; type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn list_vm_contracts( async fn list_vm_contracts(
&self, &self,
@ -256,23 +320,5 @@ impl BrainCliService for BrainCliMock {
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
} }
async fn update_vm(
&self,
req: Request<UpdateVmRequest>,
) -> Result<Response<UpdateVmResp>, Status> {
let req = req.into_inner();
info!("CLI requested UpdateVM: {req:?}");
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
self.data.submit_updatevmrequest(req, oneshot_tx).await;
match oneshot_rx.await {
Ok(resp) => {
info!("Returning UpdateVmResp: {resp:?}");
Ok(Response::new(resp))
},
Err(e) => {
log::error!("Error waiting for UpdateVmResp: {e:?}");
Err(Status::unknown("Daemon not responding."))
}
}
}
} }