From b5943ba4651f5a4286a38476dd8da1c38c7cf87e Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Wed, 1 Jan 2025 18:09:36 +0400 Subject: [PATCH 01/11] added ovmf_hash to UpdateVMResp --- brain.proto | 19 ++++++++++--------- src/data.rs | 2 ++ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/brain.proto b/brain.proto index 5fd8227..1c5bc4e 100644 --- a/brain.proto +++ b/brain.proto @@ -43,18 +43,19 @@ message NewVMReq { message UpdateVMReq { string uuid = 1; - uint32 disk_size_gb = 3; - uint32 vcpus = 4; - uint32 memory_mb = 5; - string kernel_url = 6; - string kernel_sha = 7; - string dtrfs_url = 8; - string dtrfs_sha = 9; + uint32 disk_size_gb = 2; + uint32 vcpus = 3; + uint32 memory_mb = 4; + string kernel_url = 5; + string kernel_sha = 6; + string dtrfs_url = 7; + string dtrfs_sha = 8; } message UpdateVMResp { string uuid = 1; - string error = 3; + string error = 2; + string ovmf_hash = 3; } message VMContract { @@ -142,4 +143,4 @@ service BrainCliService { rpc GetOneNode (NodeFilters) returns (NodeListResp); rpc DeleteVM (DeleteVMReq) returns (Empty); rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); -} +} \ No newline at end of file diff --git a/src/data.rs b/src/data.rs index bac98dd..f490431 100644 --- a/src/data.rs +++ b/src/data.rs @@ -319,6 +319,7 @@ impl BrainData { let _ = tx.send(grpc::UpdateVmResp { uuid, error: "Contract does not exist.".to_string(), + ovmf_hash: "".to_string(), }); return; } @@ -343,6 +344,7 @@ impl BrainData { self.submit_updatevm_resp(grpc::UpdateVmResp { uuid, error: "Daemon is offline.".to_string(), + ovmf_hash: "".to_string(), }) .await; } -- 2.43.0 From d5a771d28aa2ab7495597f2bf15a2a7f3d3dacc3 Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Wed, 1 Jan 2025 18:12:23 +0400 Subject: [PATCH 02/11] fixed missing ovmf_hash field in UpdateVMResp --- src/data.rs | 1 + src/grpc.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/data.rs b/src/data.rs index f490431..f0c4de3 100644 --- a/src/data.rs +++ b/src/data.rs @@ -354,6 +354,7 @@ impl BrainData { self.submit_updatevm_resp(grpc::UpdateVmResp { uuid, error: "Daemon is offline.".to_string(), + ovmf_hash: "".to_string(), }) .await; } diff --git a/src/grpc.rs b/src/grpc.rs index 7db03e3..aa82a00 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -256,6 +256,7 @@ impl BrainDaemonService for BrainDaemonMock { data.submit_updatevm_resp(UpdateVmResp { error: "Daemon not connected.".to_string(), uuid: updatevmreq.uuid, + ovmf_hash: "".to_string(), }) .await; break; -- 2.43.0 From 716b7edd3370272c595b987c271b6314e39c4101 Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Fri, 3 Jan 2025 18:23:29 +0400 Subject: [PATCH 03/11] Had to change these so that the updater could work. I'm not sure if this is the best way to do it, but it works for now. --- src/data.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/data.rs b/src/data.rs index f0c4de3..4b0c522 100644 --- a/src/data.rs +++ b/src/data.rs @@ -272,9 +272,9 @@ impl BrainData { mut req: grpc::NewVmReq, tx: OneshotSender, ) { - let uuid = uuid::Uuid::new_v4().to_string(); - - req.uuid = uuid.clone(); + if req.uuid.is_empty() { + req.uuid = uuid::Uuid::new_v4().to_string(); + } info!("Inserting new vm request in memory: {req:?}"); self.tmp_newvm_reqs .insert(req.uuid.clone(), (req.clone(), tx)); @@ -292,7 +292,7 @@ impl BrainData { ); self.submit_newvm_resp(grpc::NewVmResp { error: "Daemon is offline.".to_string(), - uuid, + uuid: req.uuid, exposed_ports: Vec::new(), ovmf_hash: "".to_string(), ips: Vec::new(), -- 2.43.0 From b1d272f8fb8fd257bc990cf4d804393660cd9200 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Fri, 3 Jan 2025 18:08:23 +0200 Subject: [PATCH 04/11] new proto that has MeasurementArgs --- brain.proto | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/brain.proto b/brain.proto index 1c5bc4e..b8ef0ee 100644 --- a/brain.proto +++ b/brain.proto @@ -24,6 +24,17 @@ message NodeResourceReq { uint32 max_ports_per_vm = 8; } +message MeasurementArgs { + // this will be IP:Port of the dtrfs API + // actually not a measurement arg, but needed for the injector + string dtrfs_api_endpoint = 1; + repeated uint32 exposed_ports = 2; + string ovmf_hash = 5; + // This is needed to allow the CLI to build the kernel params from known data. + // The CLI will use the kernel params to get the measurement. + repeated NewVmRespIP ips = 6; +} + message NewVMReq { string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID string hostname = 2; @@ -41,6 +52,12 @@ message NewVMReq { string dtrfs_sha = 14; } +message NewVMResp { + string uuid = 1; + string error = 2; + MeasurementArgs args = 3; +} + message UpdateVMReq { string uuid = 1; uint32 disk_size_gb = 2; @@ -55,7 +72,7 @@ message UpdateVMReq { message UpdateVMResp { string uuid = 1; string error = 2; - string ovmf_hash = 3; + MeasurementArgs args = 3; } message VMContract { @@ -88,16 +105,6 @@ message NewVmRespIP { string gateway = 4; } -message NewVMResp { - string uuid = 1; - repeated uint32 exposed_ports = 2; - string ovmf_hash = 5; - // This is needed to allow the CLI to build the kernel params from known data. - // The CLI will use the kernel params to get the measurement. - repeated NewVmRespIP ips = 6; - string error = 7; -} - message DeleteVMReq { string uuid = 1; } @@ -111,6 +118,7 @@ service BrainDaemonService { rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); + // rpc SendMeasurementArgs (NodePubkey) returns (MeasurementArgs); } message NodeFilters { @@ -143,4 +151,5 @@ service BrainCliService { rpc GetOneNode (NodeFilters) returns (NodeListResp); rpc DeleteVM (DeleteVMReq) returns (Empty); rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); -} \ No newline at end of file + // rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs); +} -- 2.43.0 From bff04c7b7ea104099f72545f1093d2ba960e8b3f Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Sat, 4 Jan 2025 17:20:16 +0400 Subject: [PATCH 05/11] Temporary code to test updates, this is not final at all. --- brain.proto | 6 +++--- src/data.rs | 18 +++++++++--------- src/grpc.rs | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/brain.proto b/brain.proto index b8ef0ee..7253420 100644 --- a/brain.proto +++ b/brain.proto @@ -118,7 +118,7 @@ service BrainDaemonService { rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); - // rpc SendMeasurementArgs (NodePubkey) returns (MeasurementArgs); + //rpc GetMeasurementArgs (ListVMContractsReq) returns (stream MeasurementArgs); } message NodeFilters { @@ -151,5 +151,5 @@ service BrainCliService { rpc GetOneNode (NodeFilters) returns (NodeListResp); rpc DeleteVM (DeleteVMReq) returns (Empty); rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); - // rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs); -} + //rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs); +} \ No newline at end of file diff --git a/src/data.rs b/src/data.rs index 4b0c522..0f09021 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,5 +1,5 @@ #![allow(dead_code)] -use crate::grpc::brain as grpc; +use crate::grpc::brain::{self as grpc, MeasurementArgs}; use chrono::Utc; use dashmap::DashMap; use log::{debug, info, warn}; @@ -87,6 +87,7 @@ pub struct BrainData { daemon_deletevm_tx: DashMap>, daemon_newvm_tx: DashMap>, daemon_updatevm_tx: DashMap>, + daemon_get_measurement_tx: DashMap>, } #[derive(Debug)] @@ -106,6 +107,7 @@ impl BrainData { daemon_deletevm_tx: DashMap::new(), daemon_newvm_tx: DashMap::new(), daemon_updatevm_tx: DashMap::new(), + daemon_get_measurement_tx: DashMap::new(), } } @@ -208,7 +210,7 @@ impl BrainData { } let mut public_ipv4 = String::new(); let mut public_ipv6 = String::new(); - for ip in new_vm_resp.ips { + for ip in new_vm_resp.args.clone().unwrap_or(grpc::MeasurementArgs::default()).ips { if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { public_ipv4 = ipv4_addr.to_string(); @@ -221,7 +223,7 @@ impl BrainData { } let contract = Contract { uuid: new_vm_resp.uuid, - exposed_ports: new_vm_resp.exposed_ports, + exposed_ports: new_vm_resp.args.unwrap_or(grpc::MeasurementArgs::default()).exposed_ports, public_ipv4, public_ipv6, created_at: Utc::now().to_rfc3339(), @@ -293,9 +295,7 @@ impl BrainData { self.submit_newvm_resp(grpc::NewVmResp { error: "Daemon is offline.".to_string(), uuid: req.uuid, - exposed_ports: Vec::new(), - ovmf_hash: "".to_string(), - ips: Vec::new(), + args: None, }) .await; } @@ -319,7 +319,7 @@ impl BrainData { let _ = tx.send(grpc::UpdateVmResp { uuid, error: "Contract does not exist.".to_string(), - ovmf_hash: "".to_string(), + args: None, }); return; } @@ -344,7 +344,7 @@ impl BrainData { self.submit_updatevm_resp(grpc::UpdateVmResp { uuid, error: "Daemon is offline.".to_string(), - ovmf_hash: "".to_string(), + args: None, }) .await; } @@ -354,7 +354,7 @@ impl BrainData { self.submit_updatevm_resp(grpc::UpdateVmResp { uuid, error: "Daemon is offline.".to_string(), - ovmf_hash: "".to_string(), + args: None, }) .await; } diff --git a/src/grpc.rs b/src/grpc.rs index aa82a00..adb280e 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -256,7 +256,7 @@ impl BrainDaemonService for BrainDaemonMock { data.submit_updatevm_resp(UpdateVmResp { error: "Daemon not connected.".to_string(), uuid: updatevmreq.uuid, - ovmf_hash: "".to_string(), + args: None, }) .await; break; -- 2.43.0 From d8c183df256c94b5697dc8efc426cff0c167ea87 Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Sat, 4 Jan 2025 20:20:42 +0400 Subject: [PATCH 06/11] fixed small bug with updating --- src/data.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/data.rs b/src/data.rs index 0f09021..95cd4e5 100644 --- a/src/data.rs +++ b/src/data.rs @@ -263,8 +263,12 @@ impl BrainData { contract.disk_size_gb = updatevmreq.0.disk_size_gb; contract.vcpus = updatevmreq.0.vcpus; contract.memory_mb = updatevmreq.0.memory_mb; - contract.kernel_sha = updatevmreq.0.kernel_sha; - contract.dtrfs_sha = updatevmreq.0.dtrfs_sha; + if !updatevmreq.0.kernel_sha.is_empty() { + contract.kernel_sha = updatevmreq.0.kernel_sha; + } + if !updatevmreq.0.dtrfs_sha.is_empty() { + contract.dtrfs_sha = updatevmreq.0.dtrfs_sha; + } contract.updated_at = Utc::now().to_rfc3339(); } } @@ -274,9 +278,7 @@ impl BrainData { mut req: grpc::NewVmReq, tx: OneshotSender, ) { - if req.uuid.is_empty() { - req.uuid = uuid::Uuid::new_v4().to_string(); - } + req.uuid = uuid::Uuid::new_v4().to_string(); info!("Inserting new vm request in memory: {req:?}"); self.tmp_newvm_reqs .insert(req.uuid.clone(), (req.clone(), tx)); -- 2.43.0 From 5faa8d210de9d889ba0d3f441274b7f4ff1cbc6b Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Sat, 4 Jan 2025 20:28:36 +0400 Subject: [PATCH 07/11] added more logging --- src/data.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/data.rs b/src/data.rs index 95cd4e5..7aa9647 100644 --- a/src/data.rs +++ b/src/data.rs @@ -264,9 +264,11 @@ impl BrainData { contract.vcpus = updatevmreq.0.vcpus; contract.memory_mb = updatevmreq.0.memory_mb; if !updatevmreq.0.kernel_sha.is_empty() { + info!("Updating kernel sha for {} to {}", contract.uuid, updatevmreq.0.kernel_sha); contract.kernel_sha = updatevmreq.0.kernel_sha; } if !updatevmreq.0.dtrfs_sha.is_empty() { + info!("Updating dtrfs sha for {} to {}", contract.uuid, updatevmreq.0.dtrfs_sha); contract.dtrfs_sha = updatevmreq.0.dtrfs_sha; } contract.updated_at = Utc::now().to_rfc3339(); -- 2.43.0 From 554468e8b6d1dbc4dd51401787cfa25ac5ae8d63 Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Mon, 6 Jan 2025 15:57:43 +0400 Subject: [PATCH 08/11] fixed the concern with pr --- src/data.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/data.rs b/src/data.rs index 7aa9647..3aa8fc2 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,5 +1,5 @@ #![allow(dead_code)] -use crate::grpc::brain::{self as grpc, MeasurementArgs}; +use crate::grpc::brain::{self as grpc}; use chrono::Utc; use dashmap::DashMap; use log::{debug, info, warn}; @@ -87,7 +87,6 @@ pub struct BrainData { daemon_deletevm_tx: DashMap>, daemon_newvm_tx: DashMap>, daemon_updatevm_tx: DashMap>, - daemon_get_measurement_tx: DashMap>, } #[derive(Debug)] @@ -107,7 +106,6 @@ impl BrainData { daemon_deletevm_tx: DashMap::new(), daemon_newvm_tx: DashMap::new(), daemon_updatevm_tx: DashMap::new(), - daemon_get_measurement_tx: DashMap::new(), } } @@ -210,20 +208,28 @@ impl BrainData { } let mut public_ipv4 = String::new(); let mut public_ipv6 = String::new(); - for ip in new_vm_resp.args.clone().unwrap_or(grpc::MeasurementArgs::default()).ips { + let args = match new_vm_resp.args { + Some(args) => args, + None => { + log::error!("NewVmResp does not contain MeasurementArgs for {}", new_vm_resp.uuid); + return; + } + }; + + for ip in args.ips { if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { - if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { - public_ipv4 = ipv4_addr.to_string(); - } - continue; + if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { + public_ipv4 = ipv4_addr.to_string(); + } + continue; } if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) { - public_ipv6 = ipv6_addr.to_string(); + public_ipv6 = ipv6_addr.to_string(); } } let contract = Contract { uuid: new_vm_resp.uuid, - exposed_ports: new_vm_resp.args.unwrap_or(grpc::MeasurementArgs::default()).exposed_ports, + exposed_ports: args.exposed_ports, public_ipv4, public_ipv6, created_at: Utc::now().to_rfc3339(), -- 2.43.0 From ab4361ad5e470589f7e0e1f0db33e3dd1d77134e Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Mon, 6 Jan 2025 23:41:39 +0400 Subject: [PATCH 09/11] now update_vm_resp.args.dtrfs_api_endpoint does get the node ip from the brain if it didn't have a public ip --- src/data.rs | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/src/data.rs b/src/data.rs index 3aa8fc2..25c26bd 100644 --- a/src/data.rs +++ b/src/data.rs @@ -184,7 +184,7 @@ impl BrainData { self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); } - pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) { + pub async fn submit_newvm_resp(&self, mut new_vm_resp: grpc::NewVmResp) { let newvmreq = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) { Some((_, r)) => r, None => { @@ -192,9 +192,18 @@ impl BrainData { "Received confirmation for ghost NewVMReq {}", new_vm_resp.uuid ); + new_vm_resp.error = "Received confirmation for ghost NewVMReq.".to_string(); return; } }; + let args = match new_vm_resp.args.clone() { + Some(args) => args, + None => { + log::error!("NewVmResp does not contain MeasurementArgs for {}", new_vm_resp.uuid); + new_vm_resp.error = "Daemon did not return measurement args.".to_string(); + return; + } + }; if let Err(e) = newvmreq.1.send(new_vm_resp.clone()) { log::error!( "CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", @@ -208,13 +217,6 @@ impl BrainData { } let mut public_ipv4 = String::new(); let mut public_ipv6 = String::new(); - let args = match new_vm_resp.args { - Some(args) => args, - None => { - log::error!("NewVmResp does not contain MeasurementArgs for {}", new_vm_resp.uuid); - return; - } - }; for ip in args.ips { if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { @@ -247,7 +249,7 @@ impl BrainData { self.contracts.write().unwrap().push(contract); } - pub async fn submit_updatevm_resp(&self, resp: grpc::UpdateVmResp) { + pub async fn submit_updatevm_resp(&self, mut resp: grpc::UpdateVmResp) { let updatevmreq = match self.tmp_updatevm_reqs.remove(&resp.uuid) { Some((_, r)) => r, None => { @@ -255,9 +257,15 @@ impl BrainData { "Received confirmation for ghost UpdateVMRequest {}", resp.uuid ); + resp.error = "Received confirmation for ghost UpdateVMRequest.".to_string(); return; } }; + if let None = resp.args { + log::error!("NewVmResp does not contain MeasurementArgs for {}", resp.uuid); + resp.error = "Daemon did not return measurement args.".to_string(); + return; + }; if let Err(e) = updatevmreq.1.send(resp.clone()) { log::error!("CLI RX dropped before receiving UpdateVMResp {resp:?}. Error is: {e:?}"); } @@ -266,6 +274,21 @@ impl BrainData { } let mut contracts = self.contracts.write().unwrap(); if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == resp.uuid) { + let args = resp.args.as_mut().unwrap(); + if args.dtrfs_api_endpoint.starts_with(':') { + if let Some(node) = self.find_nodes_by_pubkey(&contract.node_pubkey) { + args.dtrfs_api_endpoint = format!("{}{}", node.ip, args.dtrfs_api_endpoint); + } else { + // This should never happen. + log::error!( + "Node {} not found for contract {}. Cannot update contract.", + contract.node_pubkey, + contract.uuid + ); + return; + } + } + contract.disk_size_gb = updatevmreq.0.disk_size_gb; contract.vcpus = updatevmreq.0.vcpus; contract.memory_mb = updatevmreq.0.memory_mb; -- 2.43.0 From 02ff2f69913b6f6e9ac8bf91e729116727f12e93 Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Tue, 7 Jan 2025 00:08:57 +0400 Subject: [PATCH 10/11] Now new_vm_resp.args.dtrfs_api_endpoint also get modified if there is no public ip of the vm Also removed the unnecessary else clause --- src/data.rs | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/data.rs b/src/data.rs index 25c26bd..d269a37 100644 --- a/src/data.rs +++ b/src/data.rs @@ -185,7 +185,7 @@ impl BrainData { } pub async fn submit_newvm_resp(&self, mut new_vm_resp: grpc::NewVmResp) { - let newvmreq = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) { + let new_vm_req = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) { Some((_, r)) => r, None => { log::error!( @@ -204,10 +204,10 @@ impl BrainData { return; } }; - if let Err(e) = newvmreq.1.send(new_vm_resp.clone()) { + if let Err(e) = new_vm_req.1.send(new_vm_resp.clone()) { log::error!( "CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", - &newvmreq.0.admin_pubkey, + &new_vm_req.0.admin_pubkey, new_vm_resp, e ); @@ -229,6 +229,7 @@ impl BrainData { public_ipv6 = ipv6_addr.to_string(); } } + let contract = Contract { uuid: new_vm_resp.uuid, exposed_ports: args.exposed_ports, @@ -236,17 +237,24 @@ impl BrainData { public_ipv6, created_at: Utc::now().to_rfc3339(), updated_at: String::new(), - hostname: newvmreq.0.hostname, - admin_pubkey: newvmreq.0.admin_pubkey, - node_pubkey: newvmreq.0.node_pubkey, - disk_size_gb: newvmreq.0.disk_size_gb, - vcpus: newvmreq.0.vcpus, - memory_mb: newvmreq.0.memory_mb, - kernel_sha: newvmreq.0.kernel_sha, - dtrfs_sha: newvmreq.0.dtrfs_sha, + hostname: new_vm_req.0.hostname, + admin_pubkey: new_vm_req.0.admin_pubkey, + node_pubkey: new_vm_req.0.node_pubkey.clone(), + disk_size_gb: new_vm_req.0.disk_size_gb, + vcpus: new_vm_req.0.vcpus, + memory_mb: new_vm_req.0.memory_mb, + kernel_sha: new_vm_req.0.kernel_sha, + dtrfs_sha: new_vm_req.0.dtrfs_sha, }; info!("Created new contract: {contract:?}"); self.contracts.write().unwrap().push(contract); + + let args = new_vm_resp.args.as_mut().unwrap(); + if args.dtrfs_api_endpoint.starts_with(':') { + if let Some(node) = self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) { + args.dtrfs_api_endpoint = format!("{}{}", node.ip, args.dtrfs_api_endpoint); + } + } } pub async fn submit_updatevm_resp(&self, mut resp: grpc::UpdateVmResp) { @@ -278,15 +286,7 @@ impl BrainData { if args.dtrfs_api_endpoint.starts_with(':') { if let Some(node) = self.find_nodes_by_pubkey(&contract.node_pubkey) { args.dtrfs_api_endpoint = format!("{}{}", node.ip, args.dtrfs_api_endpoint); - } else { - // This should never happen. - log::error!( - "Node {} not found for contract {}. Cannot update contract.", - contract.node_pubkey, - contract.uuid - ); - return; - } + } } contract.disk_size_gb = updatevmreq.0.disk_size_gb; -- 2.43.0 From 5755ece192b003bcf26cb978b8dc5a95512c10c0 Mon Sep 17 00:00:00 2001 From: Ramil_Algayev Date: Tue, 7 Jan 2025 03:00:13 +0400 Subject: [PATCH 11/11] Fixed the issue with updating the dtrfs_api_endpoint I was updating it after sending it, so I fixed that Removed the unnessary part from the if statement dealing with the send fucntion Also refactored the code for the submit_newvm_resp and submit_updatevm_resp functions --- src/data.rs | 156 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 101 insertions(+), 55 deletions(-) diff --git a/src/data.rs b/src/data.rs index d269a37..bad12ad 100644 --- a/src/data.rs +++ b/src/data.rs @@ -196,43 +196,61 @@ impl BrainData { return; } }; - let args = match new_vm_resp.args.clone() { - Some(args) => args, - None => { - log::error!("NewVmResp does not contain MeasurementArgs for {}", new_vm_resp.uuid); - new_vm_resp.error = "Daemon did not return measurement args.".to_string(); - return; + match new_vm_resp.args { + Some(ref mut args) => { + if args.dtrfs_api_endpoint.starts_with(':') { + match self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) { + Some(node) => { + args.dtrfs_api_endpoint = + format!("{}{}", node.ip, args.dtrfs_api_endpoint); + } + None => { + log::error!("Node not found for pubkey {}", new_vm_req.0.node_pubkey); + new_vm_resp.error = "Node not found.".to_string(); + return; + } + } + } } - }; - if let Err(e) = new_vm_req.1.send(new_vm_resp.clone()) { + None => { + log::error!( + "NewVmResp does not contain MeasurementArgs for {}", + new_vm_resp.uuid + ); + new_vm_resp.error = "Daemon did not return measurement args.".to_string(); + return; + } + } + if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) { log::error!( - "CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", + "CLI RX for {} dropped before receiving confirmation {:?}.", &new_vm_req.0.admin_pubkey, new_vm_resp, - e ); } if new_vm_resp.error != "" { return; } + let mut public_ipv4 = String::new(); let mut public_ipv6 = String::new(); - for ip in args.ips { + let args = new_vm_resp.args.as_ref().unwrap(); + for ip in args.ips.iter() { if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { - if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { - public_ipv4 = ipv4_addr.to_string(); - } - continue; + if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { + public_ipv4 = ipv4_addr.to_string(); + } + continue; } if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) { - public_ipv6 = ipv6_addr.to_string(); + public_ipv6 = ipv6_addr.to_string(); } } let contract = Contract { uuid: new_vm_resp.uuid, - exposed_ports: args.exposed_ports, + exposed_ports: args.exposed_ports.clone(), public_ipv4, public_ipv6, created_at: Utc::now().to_rfc3339(), @@ -248,59 +266,87 @@ impl BrainData { }; info!("Created new contract: {contract:?}"); self.contracts.write().unwrap().push(contract); - - let args = new_vm_resp.args.as_mut().unwrap(); - if args.dtrfs_api_endpoint.starts_with(':') { - if let Some(node) = self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) { - args.dtrfs_api_endpoint = format!("{}{}", node.ip, args.dtrfs_api_endpoint); - } - } } - pub async fn submit_updatevm_resp(&self, mut resp: grpc::UpdateVmResp) { - let updatevmreq = match self.tmp_updatevm_reqs.remove(&resp.uuid) { + pub async fn submit_updatevm_resp(&self, mut update_vm_resp: grpc::UpdateVmResp) { + let update_vm_req = match self.tmp_updatevm_reqs.remove(&update_vm_resp.uuid) { Some((_, r)) => r, None => { log::error!( "Received confirmation for ghost UpdateVMRequest {}", - resp.uuid + update_vm_resp.uuid ); - resp.error = "Received confirmation for ghost UpdateVMRequest.".to_string(); + update_vm_resp.error = + "Received confirmation for ghost UpdateVMRequest.".to_string(); return; } }; - if let None = resp.args { - log::error!("NewVmResp does not contain MeasurementArgs for {}", resp.uuid); - resp.error = "Daemon did not return measurement args.".to_string(); - return; - }; - if let Err(e) = updatevmreq.1.send(resp.clone()) { - log::error!("CLI RX dropped before receiving UpdateVMResp {resp:?}. Error is: {e:?}"); - } - if resp.error != "" { + if update_vm_resp.error != "" { return; } - let mut contracts = self.contracts.write().unwrap(); - if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == resp.uuid) { - let args = resp.args.as_mut().unwrap(); - if args.dtrfs_api_endpoint.starts_with(':') { - if let Some(node) = self.find_nodes_by_pubkey(&contract.node_pubkey) { - args.dtrfs_api_endpoint = format!("{}{}", node.ip, args.dtrfs_api_endpoint); - } - } - contract.disk_size_gb = updatevmreq.0.disk_size_gb; - contract.vcpus = updatevmreq.0.vcpus; - contract.memory_mb = updatevmreq.0.memory_mb; - if !updatevmreq.0.kernel_sha.is_empty() { - info!("Updating kernel sha for {} to {}", contract.uuid, updatevmreq.0.kernel_sha); - contract.kernel_sha = updatevmreq.0.kernel_sha; + let mut contracts = self.contracts.write().unwrap(); + match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) { + Some(contract) => { + match update_vm_resp.args { + Some(ref mut args) => { + if args.dtrfs_api_endpoint.starts_with(':') { + match self.find_nodes_by_pubkey(&contract.node_pubkey) { + Some(node) => { + args.dtrfs_api_endpoint = + format!("{}{}", node.ip, args.dtrfs_api_endpoint); + } + None => { + log::error!( + "Node not found for pubkey {}", + contract.node_pubkey + ); + update_vm_resp.error = "Node not found.".to_string(); + return; + } + } + } + } + None => { + log::error!( + "NewVmResp does not contain MeasurementArgs for {}", + update_vm_resp.uuid + ); + update_vm_resp.error = + "Daemon did not return measurement args.".to_string(); + return; + } + } + + contract.disk_size_gb = update_vm_req.0.disk_size_gb; + contract.vcpus = update_vm_req.0.vcpus; + contract.memory_mb = update_vm_req.0.memory_mb; + if !update_vm_req.0.kernel_sha.is_empty() { + info!( + "Updating kernel sha for {} to {}", + contract.uuid, update_vm_req.0.kernel_sha + ); + contract.kernel_sha = update_vm_req.0.kernel_sha; + } + if !update_vm_req.0.dtrfs_sha.is_empty() { + info!( + "Updating dtrfs sha for {} to {}", + contract.uuid, update_vm_req.0.dtrfs_sha + ); + contract.dtrfs_sha = update_vm_req.0.dtrfs_sha; + } + contract.updated_at = Utc::now().to_rfc3339(); } - if !updatevmreq.0.dtrfs_sha.is_empty() { - info!("Updating dtrfs sha for {} to {}", contract.uuid, updatevmreq.0.dtrfs_sha); - contract.dtrfs_sha = updatevmreq.0.dtrfs_sha; + None => { + log::error!("Contract not found for {}.", update_vm_req.0.uuid); + update_vm_resp.error = "Contract not found.".to_string(); } - contract.updated_at = Utc::now().to_rfc3339(); + } + if let Err(_) = update_vm_req.1.send(update_vm_resp.clone()) { + log::error!( + "CLI RX dropped before receiving UpdateVMResp {:?}.", + update_vm_resp + ); } } -- 2.43.0