From 0e8516524017449e0fe3d5fcac608d52ee28021a Mon Sep 17 00:00:00 2001 From: ramrem Date: Thu, 9 Jan 2025 22:29:56 +0000 Subject: [PATCH] update functionality for the brain (#2) These changes have been done: measurement args are a separate argument which get passed from daemon revamped updatevm and newvm resp functions, specially how args.dtrfs_api_endpoint gets handled updating now returns error from brain if it cant find the contract proto has been updated accordingly Co-authored-by: Ramil_Algayev Co-authored-by: ghe0 Reviewed-on: https://gitea.detee.cloud/ghe0/brain-mock/pulls/2 Co-authored-by: ramrem Co-committed-by: ramrem --- brain.proto | 48 +++++++++------- src/data.rs | 158 +++++++++++++++++++++++++++++++++++++++------------- src/grpc.rs | 1 + 3 files changed, 150 insertions(+), 57 deletions(-) diff --git a/brain.proto b/brain.proto index 5fd8227..7253420 100644 --- a/brain.proto +++ b/brain.proto @@ -24,6 +24,17 @@ message NodeResourceReq { uint32 max_ports_per_vm = 8; } +message MeasurementArgs { + // this will be IP:Port of the dtrfs API + // actually not a measurement arg, but needed for the injector + string dtrfs_api_endpoint = 1; + repeated uint32 exposed_ports = 2; + string ovmf_hash = 5; + // This is needed to allow the CLI to build the kernel params from known data. + // The CLI will use the kernel params to get the measurement. + repeated NewVmRespIP ips = 6; +} + message NewVMReq { string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID string hostname = 2; @@ -41,20 +52,27 @@ message NewVMReq { string dtrfs_sha = 14; } +message NewVMResp { + string uuid = 1; + string error = 2; + MeasurementArgs args = 3; +} + message UpdateVMReq { string uuid = 1; - uint32 disk_size_gb = 3; - uint32 vcpus = 4; - uint32 memory_mb = 5; - string kernel_url = 6; - string kernel_sha = 7; - string dtrfs_url = 8; - string dtrfs_sha = 9; + uint32 disk_size_gb = 2; + uint32 vcpus = 3; + uint32 memory_mb = 4; + string kernel_url = 5; + string kernel_sha = 6; + string dtrfs_url = 7; + string dtrfs_sha = 8; } message UpdateVMResp { string uuid = 1; - string error = 3; + string error = 2; + MeasurementArgs args = 3; } message VMContract { @@ -87,16 +105,6 @@ message NewVmRespIP { string gateway = 4; } -message NewVMResp { - string uuid = 1; - repeated uint32 exposed_ports = 2; - string ovmf_hash = 5; - // This is needed to allow the CLI to build the kernel params from known data. - // The CLI will use the kernel params to get the measurement. - repeated NewVmRespIP ips = 6; - string error = 7; -} - message DeleteVMReq { string uuid = 1; } @@ -110,6 +118,7 @@ service BrainDaemonService { rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); + //rpc GetMeasurementArgs (ListVMContractsReq) returns (stream MeasurementArgs); } message NodeFilters { @@ -142,4 +151,5 @@ service BrainCliService { rpc GetOneNode (NodeFilters) returns (NodeListResp); rpc DeleteVM (DeleteVMReq) returns (Empty); rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); -} + //rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs); +} \ No newline at end of file diff --git a/src/data.rs b/src/data.rs index bac98dd..bad12ad 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,5 +1,5 @@ #![allow(dead_code)] -use crate::grpc::brain as grpc; +use crate::grpc::brain::{self as grpc}; use chrono::Utc; use dashmap::DashMap; use log::{debug, info, warn}; @@ -184,31 +184,59 @@ impl BrainData { self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); } - pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) { - let newvmreq = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) { + pub async fn submit_newvm_resp(&self, mut new_vm_resp: grpc::NewVmResp) { + let new_vm_req = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) { Some((_, r)) => r, None => { log::error!( "Received confirmation for ghost NewVMReq {}", new_vm_resp.uuid ); + new_vm_resp.error = "Received confirmation for ghost NewVMReq.".to_string(); return; } }; - if let Err(e) = newvmreq.1.send(new_vm_resp.clone()) { + match new_vm_resp.args { + Some(ref mut args) => { + if args.dtrfs_api_endpoint.starts_with(':') { + match self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) { + Some(node) => { + args.dtrfs_api_endpoint = + format!("{}{}", node.ip, args.dtrfs_api_endpoint); + } + None => { + log::error!("Node not found for pubkey {}", new_vm_req.0.node_pubkey); + new_vm_resp.error = "Node not found.".to_string(); + return; + } + } + } + } + None => { + log::error!( + "NewVmResp does not contain MeasurementArgs for {}", + new_vm_resp.uuid + ); + new_vm_resp.error = "Daemon did not return measurement args.".to_string(); + return; + } + } + if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) { log::error!( - "CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", - &newvmreq.0.admin_pubkey, + "CLI RX for {} dropped before receiving confirmation {:?}.", + &new_vm_req.0.admin_pubkey, new_vm_resp, - e ); } if new_vm_resp.error != "" { return; } + let mut public_ipv4 = String::new(); let mut public_ipv6 = String::new(); - for ip in new_vm_resp.ips { + + let args = new_vm_resp.args.as_ref().unwrap(); + for ip in args.ips.iter() { if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { public_ipv4 = ipv4_addr.to_string(); @@ -219,51 +247,106 @@ impl BrainData { public_ipv6 = ipv6_addr.to_string(); } } + let contract = Contract { uuid: new_vm_resp.uuid, - exposed_ports: new_vm_resp.exposed_ports, + exposed_ports: args.exposed_ports.clone(), public_ipv4, public_ipv6, created_at: Utc::now().to_rfc3339(), updated_at: String::new(), - hostname: newvmreq.0.hostname, - admin_pubkey: newvmreq.0.admin_pubkey, - node_pubkey: newvmreq.0.node_pubkey, - disk_size_gb: newvmreq.0.disk_size_gb, - vcpus: newvmreq.0.vcpus, - memory_mb: newvmreq.0.memory_mb, - kernel_sha: newvmreq.0.kernel_sha, - dtrfs_sha: newvmreq.0.dtrfs_sha, + hostname: new_vm_req.0.hostname, + admin_pubkey: new_vm_req.0.admin_pubkey, + node_pubkey: new_vm_req.0.node_pubkey.clone(), + disk_size_gb: new_vm_req.0.disk_size_gb, + vcpus: new_vm_req.0.vcpus, + memory_mb: new_vm_req.0.memory_mb, + kernel_sha: new_vm_req.0.kernel_sha, + dtrfs_sha: new_vm_req.0.dtrfs_sha, }; info!("Created new contract: {contract:?}"); self.contracts.write().unwrap().push(contract); } - pub async fn submit_updatevm_resp(&self, resp: grpc::UpdateVmResp) { - let updatevmreq = match self.tmp_updatevm_reqs.remove(&resp.uuid) { + pub async fn submit_updatevm_resp(&self, mut update_vm_resp: grpc::UpdateVmResp) { + let update_vm_req = match self.tmp_updatevm_reqs.remove(&update_vm_resp.uuid) { Some((_, r)) => r, None => { log::error!( "Received confirmation for ghost UpdateVMRequest {}", - resp.uuid + update_vm_resp.uuid ); + update_vm_resp.error = + "Received confirmation for ghost UpdateVMRequest.".to_string(); return; } }; - if let Err(e) = updatevmreq.1.send(resp.clone()) { - log::error!("CLI RX dropped before receiving UpdateVMResp {resp:?}. Error is: {e:?}"); - } - if resp.error != "" { + if update_vm_resp.error != "" { return; } + let mut contracts = self.contracts.write().unwrap(); - if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == resp.uuid) { - contract.disk_size_gb = updatevmreq.0.disk_size_gb; - contract.vcpus = updatevmreq.0.vcpus; - contract.memory_mb = updatevmreq.0.memory_mb; - contract.kernel_sha = updatevmreq.0.kernel_sha; - contract.dtrfs_sha = updatevmreq.0.dtrfs_sha; - contract.updated_at = Utc::now().to_rfc3339(); + match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) { + Some(contract) => { + match update_vm_resp.args { + Some(ref mut args) => { + if args.dtrfs_api_endpoint.starts_with(':') { + match self.find_nodes_by_pubkey(&contract.node_pubkey) { + Some(node) => { + args.dtrfs_api_endpoint = + format!("{}{}", node.ip, args.dtrfs_api_endpoint); + } + None => { + log::error!( + "Node not found for pubkey {}", + contract.node_pubkey + ); + update_vm_resp.error = "Node not found.".to_string(); + return; + } + } + } + } + None => { + log::error!( + "NewVmResp does not contain MeasurementArgs for {}", + update_vm_resp.uuid + ); + update_vm_resp.error = + "Daemon did not return measurement args.".to_string(); + return; + } + } + + contract.disk_size_gb = update_vm_req.0.disk_size_gb; + contract.vcpus = update_vm_req.0.vcpus; + contract.memory_mb = update_vm_req.0.memory_mb; + if !update_vm_req.0.kernel_sha.is_empty() { + info!( + "Updating kernel sha for {} to {}", + contract.uuid, update_vm_req.0.kernel_sha + ); + contract.kernel_sha = update_vm_req.0.kernel_sha; + } + if !update_vm_req.0.dtrfs_sha.is_empty() { + info!( + "Updating dtrfs sha for {} to {}", + contract.uuid, update_vm_req.0.dtrfs_sha + ); + contract.dtrfs_sha = update_vm_req.0.dtrfs_sha; + } + contract.updated_at = Utc::now().to_rfc3339(); + } + None => { + log::error!("Contract not found for {}.", update_vm_req.0.uuid); + update_vm_resp.error = "Contract not found.".to_string(); + } + } + if let Err(_) = update_vm_req.1.send(update_vm_resp.clone()) { + log::error!( + "CLI RX dropped before receiving UpdateVMResp {:?}.", + update_vm_resp + ); } } @@ -272,9 +355,7 @@ impl BrainData { mut req: grpc::NewVmReq, tx: OneshotSender, ) { - let uuid = uuid::Uuid::new_v4().to_string(); - - req.uuid = uuid.clone(); + req.uuid = uuid::Uuid::new_v4().to_string(); info!("Inserting new vm request in memory: {req:?}"); self.tmp_newvm_reqs .insert(req.uuid.clone(), (req.clone(), tx)); @@ -292,10 +373,8 @@ impl BrainData { ); self.submit_newvm_resp(grpc::NewVmResp { error: "Daemon is offline.".to_string(), - uuid, - exposed_ports: Vec::new(), - ovmf_hash: "".to_string(), - ips: Vec::new(), + uuid: req.uuid, + args: None, }) .await; } @@ -319,6 +398,7 @@ impl BrainData { let _ = tx.send(grpc::UpdateVmResp { uuid, error: "Contract does not exist.".to_string(), + args: None, }); return; } @@ -343,6 +423,7 @@ impl BrainData { self.submit_updatevm_resp(grpc::UpdateVmResp { uuid, error: "Daemon is offline.".to_string(), + args: None, }) .await; } @@ -352,6 +433,7 @@ impl BrainData { self.submit_updatevm_resp(grpc::UpdateVmResp { uuid, error: "Daemon is offline.".to_string(), + args: None, }) .await; } diff --git a/src/grpc.rs b/src/grpc.rs index 7db03e3..adb280e 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -256,6 +256,7 @@ impl BrainDaemonService for BrainDaemonMock { data.submit_updatevm_resp(UpdateVmResp { error: "Daemon not connected.".to_string(), uuid: updatevmreq.uuid, + args: None, }) .await; break;