From 5755ece192b003bcf26cb978b8dc5a95512c10c0 Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Tue, 7 Jan 2025 03:00:13 +0400 Subject: [PATCH] Fixed the issue with updating the dtrfs_api_endpoint I was updating it after sending it, so I fixed that Removed the unnessary part from the if statement dealing with the send fucntion Also refactored the code for the submit_newvm_resp and submit_updatevm_resp functions --- src/data.rs | 156 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 101 insertions(+), 55 deletions(-) diff --git a/src/data.rs b/src/data.rs index d269a37..bad12ad 100644 --- a/src/data.rs +++ b/src/data.rs @@ -196,43 +196,61 @@ impl BrainData { return; } }; - let args = match new_vm_resp.args.clone() { - Some(args) => args, - None => { - log::error!("NewVmResp does not contain MeasurementArgs for {}", new_vm_resp.uuid); - new_vm_resp.error = "Daemon did not return measurement args.".to_string(); - return; + match new_vm_resp.args { + Some(ref mut args) => { + if args.dtrfs_api_endpoint.starts_with(':') { + match self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) { + Some(node) => { + args.dtrfs_api_endpoint = + format!("{}{}", node.ip, args.dtrfs_api_endpoint); + } + None => { + log::error!("Node not found for pubkey {}", new_vm_req.0.node_pubkey); + new_vm_resp.error = "Node not found.".to_string(); + return; + } + } + } } - }; - if let Err(e) = new_vm_req.1.send(new_vm_resp.clone()) { + None => { + log::error!( + "NewVmResp does not contain MeasurementArgs for {}", + new_vm_resp.uuid + ); + new_vm_resp.error = "Daemon did not return measurement args.".to_string(); + return; + } + } + if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) { log::error!( - "CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", + "CLI RX for {} dropped before receiving confirmation {:?}.", &new_vm_req.0.admin_pubkey, new_vm_resp, - e ); } if new_vm_resp.error != "" { return; } + let mut public_ipv4 = String::new(); let mut public_ipv6 = String::new(); - for ip in args.ips { + let args = new_vm_resp.args.as_ref().unwrap(); + for ip in args.ips.iter() { if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { - if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { - public_ipv4 = ipv4_addr.to_string(); - } - continue; + if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { + public_ipv4 = ipv4_addr.to_string(); + } + continue; } if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) { - public_ipv6 = ipv6_addr.to_string(); + public_ipv6 = ipv6_addr.to_string(); } } let contract = Contract { uuid: new_vm_resp.uuid, - exposed_ports: args.exposed_ports, + exposed_ports: args.exposed_ports.clone(), public_ipv4, public_ipv6, created_at: Utc::now().to_rfc3339(), @@ -248,59 +266,87 @@ impl BrainData { }; info!("Created new contract: {contract:?}"); self.contracts.write().unwrap().push(contract); - - let args = new_vm_resp.args.as_mut().unwrap(); - if args.dtrfs_api_endpoint.starts_with(':') { - if let Some(node) = self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) { - args.dtrfs_api_endpoint = format!("{}{}", node.ip, args.dtrfs_api_endpoint); - } - } } - pub async fn submit_updatevm_resp(&self, mut resp: grpc::UpdateVmResp) { - let updatevmreq = match self.tmp_updatevm_reqs.remove(&resp.uuid) { + pub async fn submit_updatevm_resp(&self, mut update_vm_resp: grpc::UpdateVmResp) { + let update_vm_req = match self.tmp_updatevm_reqs.remove(&update_vm_resp.uuid) { Some((_, r)) => r, None => { log::error!( "Received confirmation for ghost UpdateVMRequest {}", - resp.uuid + update_vm_resp.uuid ); - resp.error = "Received confirmation for ghost UpdateVMRequest.".to_string(); + update_vm_resp.error = + "Received confirmation for ghost UpdateVMRequest.".to_string(); return; } }; - if let None = resp.args { - log::error!("NewVmResp does not contain MeasurementArgs for {}", resp.uuid); - resp.error = "Daemon did not return measurement args.".to_string(); - return; - }; - if let Err(e) = updatevmreq.1.send(resp.clone()) { - log::error!("CLI RX dropped before receiving UpdateVMResp {resp:?}. Error is: {e:?}"); - } - if resp.error != "" { + if update_vm_resp.error != "" { return; } - let mut contracts = self.contracts.write().unwrap(); - if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == resp.uuid) { - let args = resp.args.as_mut().unwrap(); - if args.dtrfs_api_endpoint.starts_with(':') { - if let Some(node) = self.find_nodes_by_pubkey(&contract.node_pubkey) { - args.dtrfs_api_endpoint = format!("{}{}", node.ip, args.dtrfs_api_endpoint); - } - } - contract.disk_size_gb = updatevmreq.0.disk_size_gb; - contract.vcpus = updatevmreq.0.vcpus; - contract.memory_mb = updatevmreq.0.memory_mb; - if !updatevmreq.0.kernel_sha.is_empty() { - info!("Updating kernel sha for {} to {}", contract.uuid, updatevmreq.0.kernel_sha); - contract.kernel_sha = updatevmreq.0.kernel_sha; + let mut contracts = self.contracts.write().unwrap(); + match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) { + Some(contract) => { + match update_vm_resp.args { + Some(ref mut args) => { + if args.dtrfs_api_endpoint.starts_with(':') { + match self.find_nodes_by_pubkey(&contract.node_pubkey) { + Some(node) => { + args.dtrfs_api_endpoint = + format!("{}{}", node.ip, args.dtrfs_api_endpoint); + } + None => { + log::error!( + "Node not found for pubkey {}", + contract.node_pubkey + ); + update_vm_resp.error = "Node not found.".to_string(); + return; + } + } + } + } + None => { + log::error!( + "NewVmResp does not contain MeasurementArgs for {}", + update_vm_resp.uuid + ); + update_vm_resp.error = + "Daemon did not return measurement args.".to_string(); + return; + } + } + + contract.disk_size_gb = update_vm_req.0.disk_size_gb; + contract.vcpus = update_vm_req.0.vcpus; + contract.memory_mb = update_vm_req.0.memory_mb; + if !update_vm_req.0.kernel_sha.is_empty() { + info!( + "Updating kernel sha for {} to {}", + contract.uuid, update_vm_req.0.kernel_sha + ); + contract.kernel_sha = update_vm_req.0.kernel_sha; + } + if !update_vm_req.0.dtrfs_sha.is_empty() { + info!( + "Updating dtrfs sha for {} to {}", + contract.uuid, update_vm_req.0.dtrfs_sha + ); + contract.dtrfs_sha = update_vm_req.0.dtrfs_sha; + } + contract.updated_at = Utc::now().to_rfc3339(); } - if !updatevmreq.0.dtrfs_sha.is_empty() { - info!("Updating dtrfs sha for {} to {}", contract.uuid, updatevmreq.0.dtrfs_sha); - contract.dtrfs_sha = updatevmreq.0.dtrfs_sha; + None => { + log::error!("Contract not found for {}.", update_vm_req.0.uuid); + update_vm_resp.error = "Contract not found.".to_string(); } - contract.updated_at = Utc::now().to_rfc3339(); + } + if let Err(_) = update_vm_req.1.send(update_vm_resp.clone()) { + log::error!( + "CLI RX dropped before receiving UpdateVMResp {:?}.", + update_vm_resp + ); } }