updater branch merge #1

Merged
ghe0 merged 10 commits from updater into main 2024-12-25 22:59:18 +00:00
3 changed files with 81 additions and 67 deletions
Showing only changes of commit a641c0453a - Show all commits

@ -8,7 +8,7 @@ message NodePubkey {
string node_pubkey = 1; string node_pubkey = 1;
} }
message RegisterNodeRequest { message RegisterNodeReq {
string node_pubkey = 1; string node_pubkey = 1;
string owner_pubkey = 2; string owner_pubkey = 2;
string ip = 3; string ip = 3;
@ -23,7 +23,7 @@ message RegisterNodeRequest {
uint32 max_ports_per_vm = 12; uint32 max_ports_per_vm = 12;
} }
message NewVMRequest { message NewVMReq {
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
string hostname = 2; string hostname = 2;
string admin_pubkey = 3; string admin_pubkey = 3;
@ -40,7 +40,7 @@ message NewVMRequest {
string dtrfs_sha = 14; string dtrfs_sha = 14;
} }
message UpdateVMRequest { message UpdateVMReq {
string uuid = 1; string uuid = 1;
string node_pubkey = 2; string node_pubkey = 2;
uint32 disk_size_gb = 3; uint32 disk_size_gb = 3;
@ -79,7 +79,7 @@ message ListVMContractsReq {
string node_pubkey = 2; string node_pubkey = 2;
} }
message NewVMConfirmation { message NewVMResp {
string uuid = 1; string uuid = 1;
repeated uint32 exposed_ports = 2; repeated uint32 exposed_ports = 2;
string public_ipv4 = 3; string public_ipv4 = 3;
@ -87,20 +87,21 @@ message NewVMConfirmation {
string error = 5; string error = 5;
} }
message DeletedVMUpdate { message DeleteVMReq {
string uuid = 1; string uuid = 1;
} }
service BrainDaemonService { service BrainDaemonService {
rpc RegisterNode (RegisterNodeRequest) returns (Empty); rpc RegisterNode (RegisterNodeReq) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); rpc SendNewVMResp (stream NewVMResp) returns (Empty);
rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc GetUpdateVM (NodePubkey) returns (stream UpdateVMRequest); rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
ghe0 marked this conversation as resolved Outdated
Outdated
Review

These streams are becoming confusing and hard to read, both in the proto and also in the code. We should improve readability a bit by renaming some of the RPCs and some of the messages.

Let's do this:

service BrainDaemonService {
  rpc RegisterNode (RegisterNodeReq) returns (Empty);
  rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
  rpc SendNewVMResp (stream NewVMResp) returns (Empty);
  rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
  rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
  rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
  rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
}
These streams are becoming confusing and hard to read, both in the proto and also in the code. We should improve readability a bit by renaming some of the RPCs and some of the messages. Let's do this: ``` service BrainDaemonService { rpc RegisterNode (RegisterNodeReq) returns (Empty); rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); rpc SendNewVMResp (stream NewVMResp) returns (Empty); rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); } ```
rpc SendUpdateVM (stream UpdateVMResp) returns (Empty); rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
} }
message NodeFilters { message NodeFilters {
uint32 free_ports = 1; uint32 free_ports = 1;
bool offers_ipv4 = 2; bool offers_ipv4 = 2;
@ -121,9 +122,9 @@ message NodeListResp {
} }
service BrainCliService { service BrainCliService {
rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); rpc CreateVMContract (NewVMReq) returns (NewVMResp);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc ListNodes (NodeFilters) returns (stream NodeListResp);
rpc DeleteVM (DeletedVMUpdate) returns (Empty); rpc DeleteVM (DeleteVMReq) returns (Empty);
rpc UpdateVM (UpdateVMRequest) returns (UpdateVMResp); rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
} }

@ -22,8 +22,8 @@ pub struct Node {
pub max_ports_per_vm: u32, pub max_ports_per_vm: u32,
} }
impl From<grpc::RegisterNodeRequest> for Node { impl From<grpc::RegisterNodeReq> for Node {
fn from(node: grpc::RegisterNodeRequest) -> Self { fn from(node: grpc::RegisterNodeReq) -> Self {
Node { Node {
public_key: node.node_pubkey, public_key: node.node_pubkey,
owner_key: node.owner_pubkey, owner_key: node.owner_pubkey,
@ -95,11 +95,11 @@ impl Into<grpc::VmContract> for Contract {
pub struct BrainData { pub struct BrainData {
nodes: RwLock<Vec<Node>>, nodes: RwLock<Vec<Node>>,
contracts: RwLock<Vec<Contract>>, contracts: RwLock<Vec<Contract>>,
tmp_vmrequests: DashMap<String, (grpc::NewVmRequest, OneshotSender<grpc::NewVmConfirmation>)>, tmp_vmrequests: DashMap<String, (grpc::NewVmReq, OneshotSender<grpc::NewVmResp>)>,
tmp_updatevmrequests: DashMap<String, (grpc::UpdateVmRequest, OneshotSender<grpc::UpdateVmResp>)>, tmp_updatevmrequests: DashMap<String, (grpc::UpdateVmReq, OneshotSender<grpc::UpdateVmResp>)>,
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeletedVmUpdate>>, daemon_deletevm_tx: DashMap<String, Sender<grpc::DeleteVmReq>>,
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmRequest>>, daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmReq>>,
daemon_updatevm_tx: DashMap<String, Sender<grpc::UpdateVmRequest>>, daemon_updatevm_tx: DashMap<String, Sender<grpc::UpdateVmReq>>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -122,7 +122,7 @@ impl BrainData {
} }
} }
pub fn insert_node(&self, node: grpc::RegisterNodeRequest) { pub fn insert_node(&self, node: grpc::RegisterNodeReq) {
info!("Registering node {node:?}"); info!("Registering node {node:?}");
let mut nodes = self.nodes.write().unwrap(); let mut nodes = self.nodes.write().unwrap();
for n in nodes.iter_mut() { for n in nodes.iter_mut() {
@ -135,18 +135,19 @@ impl BrainData {
nodes.push(node.into()); nodes.push(node.into());
} }
pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::DeletedVmUpdate>) { pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::DeleteVmReq>) {
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
} }
pub async fn add_daemon_updatevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::UpdateVmRequest>) { pub async fn add_daemon_updatevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::UpdateVmReq>) {
log::debug!("Adding daemon updatevm tx for node_pubkey: {}", node_pubkey); log::debug!("Adding daemon updatevm tx for node_pubkey: {}", node_pubkey);
self.tmp_updatevmrequests self.tmp_updatevmrequests
.retain(|_, req| req.0.node_pubkey != node_pubkey); .retain(|_, req| req.0.node_pubkey != node_pubkey);
self.daemon_updatevm_tx.insert(node_pubkey.to_string(), tx); self.daemon_updatevm_tx.insert(node_pubkey.to_string(), tx);
info!("Added daemon TX for {}", node_pubkey);
} }
pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) {
if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) { if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) {
info!("Found vm {}. Deleting...", delete_vm.uuid); info!("Found vm {}. Deleting...", delete_vm.uuid);
if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) { if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) {
@ -166,13 +167,13 @@ impl BrainData {
} }
} }
pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmRequest>) { pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmReq>) {
self.tmp_vmrequests self.tmp_vmrequests
.retain(|_, req| req.0.node_pubkey != node_pubkey); .retain(|_, req| req.0.node_pubkey != node_pubkey);
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
} }
pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmConfirmation) { pub async fn submit_vmconfirmation(&self, confirmation: grpc::NewVmResp) {
let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) { let newvmreq = match self.tmp_vmrequests.remove(&confirmation.uuid) {
Some((_, r)) => r, Some((_, r)) => r,
None => { None => {
@ -247,8 +248,8 @@ impl BrainData {
pub async fn submit_newvmrequest( pub async fn submit_newvmrequest(
&self, &self,
mut req: grpc::NewVmRequest, mut req: grpc::NewVmReq,
tx: OneshotSender<grpc::NewVmConfirmation>, tx: OneshotSender<grpc::NewVmResp>,
) { ) {
let uuid = uuid::Uuid::new_v4().to_string(); let uuid = uuid::Uuid::new_v4().to_string();
@ -268,7 +269,7 @@ impl BrainData {
"Daemon {} RX dropped before sending update. Cleaning memory...", "Daemon {} RX dropped before sending update. Cleaning memory...",
req.node_pubkey req.node_pubkey
); );
self.submit_vmconfirmation(grpc::NewVmConfirmation { self.submit_vmconfirmation(grpc::NewVmResp {
error: "Daemon is offline.".to_string(), error: "Daemon is offline.".to_string(),
uuid, uuid,
exposed_ports: Vec::new(), exposed_ports: Vec::new(),
@ -282,7 +283,7 @@ impl BrainData {
pub async fn submit_updatevmrequest( pub async fn submit_updatevmrequest(
&self, &self,
req: grpc::UpdateVmRequest, req: grpc::UpdateVmReq,
tx: OneshotSender<grpc::UpdateVmResp>, tx: OneshotSender<grpc::UpdateVmResp>,
) { ) {
let uuid = req.uuid.clone(); let uuid = req.uuid.clone();
@ -294,20 +295,32 @@ impl BrainData {
"Found daemon TX for {}. Sending updateVMReq {}", "Found daemon TX for {}. Sending updateVMReq {}",
req.node_pubkey, req.uuid req.node_pubkey, req.uuid
); );
if server_tx.send(req.clone()).await.is_ok() { match server_tx.send(req.clone()).await {
return; Ok(_) => {
} else { debug!("Successfully sent updateVMReq to {}", req.node_pubkey);
warn!( return;
"Daemon {} RX dropped before sending update. Cleaning memory...", }
req.node_pubkey Err(e) => {
); warn!(
self.submit_update_vmconfirmation(grpc::UpdateVmResp { "Failed to send updateVMReq to {}: {}. Cleaning memory...",
uuid, req.node_pubkey, e
timestamp: chrono::Utc::now().to_rfc3339(), );
error: "Daemon is offline.".to_string(), self.submit_update_vmconfirmation(grpc::UpdateVmResp {
}) uuid,
.await; timestamp: chrono::Utc::now().to_rfc3339(),
error: "Daemon is offline.".to_string(),
})
.await;
}
} }
} else {
warn!("No daemon TX found for {}", req.node_pubkey);
self.submit_update_vmconfirmation(grpc::UpdateVmResp {
uuid,
timestamp: chrono::Utc::now().to_rfc3339(),
error: "Daemon is offline.".to_string(),
})
.await;
} }
} }

@ -41,13 +41,13 @@ impl BrainCliMock {
impl BrainDaemonService for BrainDaemonMock { impl BrainDaemonService for BrainDaemonMock {
async fn register_node( async fn register_node(
&self, &self,
req: Request<RegisterNodeRequest>, req: Request<RegisterNodeReq>,
) -> Result<Response<Empty>, Status> { ) -> Result<Response<Empty>, Status> {
self.data.insert_node(req.into_inner()); self.data.insert_node(req.into_inner());
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
} }
type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmRequest, Status>> + Send>>; type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmReq, Status>> + Send>>;
async fn get_new_vm_reqs( async fn get_new_vm_reqs(
&self, &self,
@ -67,7 +67,7 @@ impl BrainDaemonService for BrainDaemonMock {
debug!("Sending NewVMRequest to {}: {newvmreq:?}", req.node_pubkey); debug!("Sending NewVMRequest to {}: {newvmreq:?}", req.node_pubkey);
if let Err(e) = grpc_tx.send(Ok(newvmreq)).await { if let Err(e) = grpc_tx.send(Ok(newvmreq)).await {
warn!("Could not send NewVMRequest to {}: {e:?}", req.node_pubkey); warn!("Could not send NewVMRequest to {}: {e:?}", req.node_pubkey);
data.submit_vmconfirmation(NewVmConfirmation { data.submit_vmconfirmation(NewVmResp {
error: "Daemon not connected.".to_string(), error: "Daemon not connected.".to_string(),
uuid, uuid,
..Default::default() ..Default::default()
@ -83,11 +83,11 @@ impl BrainDaemonService for BrainDaemonMock {
)) ))
} }
async fn send_vm_confirmations( async fn send_new_vm_resp(
&self, &self,
req: Request<Streaming<NewVmConfirmation>>, req: Request<Streaming<NewVmResp>>,
) -> Result<Response<Empty>, Status> { ) -> Result<Response<Empty>, Status> {
debug!("Some node connected to stream NewVmConfirmation"); debug!("Some node connected to stream NewVMResp");
let mut confirmations = req.into_inner(); let mut confirmations = req.into_inner();
while let Some(confirmation) = confirmations.next().await { while let Some(confirmation) = confirmations.next().await {
match confirmation { match confirmation {
@ -96,21 +96,21 @@ impl BrainDaemonService for BrainDaemonMock {
self.data.submit_vmconfirmation(c).await; self.data.submit_vmconfirmation(c).await;
} }
Err(e) => { Err(e) => {
log::warn!("Daemon disconnected from Streaming<NewVmConfirmation>: {e:?}") log::warn!("Daemon disconnected from Streaming<NewVMResp>: {e:?}")
} }
} }
} }
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
} }
type DeletedVMUpdatesStream = Pin<Box<dyn Stream<Item = Result<DeletedVmUpdate, Status>> + Send>>; type GetDeleteVMReqStream = Pin<Box<dyn Stream<Item = Result<DeleteVmReq, Status>> + Send>>;
async fn deleted_vm_updates( async fn get_delete_vm_req(
&self, &self,
req: Request<NodePubkey>, req: Request<NodePubkey>,
) -> Result<Response<Self::DeletedVMUpdatesStream>, Status> { ) -> Result<Response<Self::GetDeleteVMReqStream>, Status> {
let node_pubkey = req.into_inner().node_pubkey; let node_pubkey = req.into_inner().node_pubkey;
info!("Daemon {node_pubkey} requested DeletedVMUpdatesStream"); info!("Daemon {node_pubkey} requested GetDeleteVMReqStream");
let (grpc_tx, grpc_rx) = mpsc::channel(6); let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6);
self.data self.data
@ -134,7 +134,7 @@ impl BrainDaemonService for BrainDaemonMock {
}); });
let output_stream = ReceiverStream::new(grpc_rx); let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new( Ok(Response::new(
Box::pin(output_stream) as Self::DeletedVMUpdatesStream Box::pin(output_stream) as Self::GetDeleteVMReqStream
)) ))
} }
@ -159,12 +159,12 @@ impl BrainDaemonService for BrainDaemonMock {
)) ))
} }
type GetUpdateVMStream = Pin<Box<dyn Stream<Item = Result<UpdateVmRequest, Status>> + Send>>; type GetUpdateVMReqStream = Pin<Box<dyn Stream<Item = Result<UpdateVmReq, Status>> + Send>>;
async fn get_update_vm( async fn get_update_vm_req(
&self, &self,
req: Request<NodePubkey>, req: Request<NodePubkey>,
) -> Result<Response<Self::GetUpdateVMStream>, Status> { ) -> Result<Response<Self::GetUpdateVMReqStream>, Status> {
let req = req.into_inner(); let req = req.into_inner();
info!("Daemon {} requested GetUpdateVMStream", req.node_pubkey); info!("Daemon {} requested GetUpdateVMStream", req.node_pubkey);
let (grpc_tx, grpc_rx) = mpsc::channel(6); let (grpc_tx, grpc_rx) = mpsc::channel(6);
@ -190,15 +190,15 @@ impl BrainDaemonService for BrainDaemonMock {
}); });
let output_stream = ReceiverStream::new(grpc_rx); let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new( Ok(Response::new(
Box::pin(output_stream) as Self::GetUpdateVMStream Box::pin(output_stream) as Self::GetUpdateVMReqStream
)) ))
} }
async fn send_update_vm( async fn send_update_vm_resp(
&self, &self,
req: Request<Streaming<UpdateVmResp>>, req: Request<Streaming<UpdateVmResp>>,
) -> Result<Response<Empty>, Status> { ) -> Result<Response<Empty>, Status> {
debug!("Some node connected to stream NewVmConfirmation"); debug!("Some node connected to stream NewVMResp");
let mut confirmations = req.into_inner(); let mut confirmations = req.into_inner();
while let Some(confirmation) = confirmations.next().await { while let Some(confirmation) = confirmations.next().await {
match confirmation { match confirmation {
@ -207,7 +207,7 @@ impl BrainDaemonService for BrainDaemonMock {
self.data.submit_update_vmconfirmation(c).await; self.data.submit_update_vmconfirmation(c).await;
} }
Err(e) => { Err(e) => {
log::warn!("Daemon disconnected from Streaming<NewVmConfirmation>: {e:?}") log::warn!("Daemon disconnected from Streaming<NewVMResp>: {e:?}")
} }
} }
} }
@ -219,8 +219,8 @@ impl BrainDaemonService for BrainDaemonMock {
impl BrainCliService for BrainCliMock { impl BrainCliService for BrainCliMock {
async fn create_vm_contract( async fn create_vm_contract(
&self, &self,
req: Request<NewVmRequest>, req: Request<NewVmReq>,
) -> Result<Response<NewVmConfirmation>, Status> { ) -> Result<Response<NewVmResp>, Status> {
let req = req.into_inner(); let req = req.into_inner();
info!("New VM requested via CLI: {req:?}"); info!("New VM requested via CLI: {req:?}");
let admin_pubkey = req.admin_pubkey.clone(); let admin_pubkey = req.admin_pubkey.clone();
@ -242,7 +242,7 @@ impl BrainCliService for BrainCliMock {
async fn update_vm( async fn update_vm(
&self, &self,
req: Request<UpdateVmRequest>, req: Request<UpdateVmReq>,
) -> Result<Response<UpdateVmResp>, Status> { ) -> Result<Response<UpdateVmResp>, Status> {
let req = req.into_inner(); let req = req.into_inner();
info!("Update VM requested via CLI: {req:?}"); info!("Update VM requested via CLI: {req:?}");
@ -303,7 +303,7 @@ impl BrainCliService for BrainCliMock {
)) ))
} }
async fn delete_vm(&self, req: Request<DeletedVmUpdate>) -> Result<Response<Empty>, Status> { async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
let req = req.into_inner(); let req = req.into_inner();
info!("Unknown CLI requested to delete vm {}", req.uuid); info!("Unknown CLI requested to delete vm {}", req.uuid);
self.data.delete_vm(req).await; self.data.delete_vm(req).await;