update functionality for the brain #2

Merged
ghe0 merged 11 commits from new-updater into main 2025-01-09 22:29:56 +00:00
Showing only changes of commit 5755ece192 - Show all commits

@ -196,43 +196,61 @@ impl BrainData {
return; return;
} }
}; };
let args = match new_vm_resp.args.clone() { match new_vm_resp.args {
Some(args) => args, Some(ref mut args) => {
None => { if args.dtrfs_api_endpoint.starts_with(':') {
log::error!("NewVmResp does not contain MeasurementArgs for {}", new_vm_resp.uuid); match self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) {
new_vm_resp.error = "Daemon did not return measurement args.".to_string(); Some(node) => {
return; args.dtrfs_api_endpoint =
format!("{}{}", node.ip, args.dtrfs_api_endpoint);
}
None => {
log::error!("Node not found for pubkey {}", new_vm_req.0.node_pubkey);
new_vm_resp.error = "Node not found.".to_string();
return;
}
}
}
ghe0 marked this conversation as resolved Outdated
Outdated
Review

If the daemon does not provide Measurement Args, the CLI can't execute the attestation. I believe we should not accept grpc::NewVmResp that does not contain Args. This should be replaced with a NewVmResp that contains an error with the message "SNP Node did not provide measurement args"

Also, this means the contract is broken so we need to discuss the cases.

If the daemon does not provide Measurement Args, the CLI can't execute the attestation. I believe we should not accept `grpc::NewVmResp` that does not contain Args. This should be replaced with a NewVmResp that contains an error with the message "SNP Node did not provide measurement args" Also, this means the contract is broken so we need to discuss the cases.
} }
}; None => {
if let Err(e) = new_vm_req.1.send(new_vm_resp.clone()) { log::error!(
"NewVmResp does not contain MeasurementArgs for {}",
new_vm_resp.uuid
);
new_vm_resp.error = "Daemon did not return measurement args.".to_string();
return;
}
}
if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) {
log::error!( log::error!(
"CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", "CLI RX for {} dropped before receiving confirmation {:?}.",
&new_vm_req.0.admin_pubkey, &new_vm_req.0.admin_pubkey,
new_vm_resp, new_vm_resp,
e
); );
} }
if new_vm_resp.error != "" { if new_vm_resp.error != "" {
return; return;
} }
let mut public_ipv4 = String::new(); let mut public_ipv4 = String::new();
let mut public_ipv6 = String::new(); let mut public_ipv6 = String::new();
for ip in args.ips { let args = new_vm_resp.args.as_ref().unwrap();
for ip in args.ips.iter() {
if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) {
if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() {
public_ipv4 = ipv4_addr.to_string(); public_ipv4 = ipv4_addr.to_string();
} }
continue; continue;
} }
if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) { if let Ok(ipv6_addr) = std::net::Ipv6Addr::from_str(&ip.address) {
public_ipv6 = ipv6_addr.to_string(); public_ipv6 = ipv6_addr.to_string();
} }
} }
let contract = Contract { let contract = Contract {
uuid: new_vm_resp.uuid, uuid: new_vm_resp.uuid,
exposed_ports: args.exposed_ports, exposed_ports: args.exposed_ports.clone(),
public_ipv4, public_ipv4,
public_ipv6, public_ipv6,
created_at: Utc::now().to_rfc3339(), created_at: Utc::now().to_rfc3339(),
@ -248,59 +266,87 @@ impl BrainData {
}; };
info!("Created new contract: {contract:?}"); info!("Created new contract: {contract:?}");
self.contracts.write().unwrap().push(contract); self.contracts.write().unwrap().push(contract);
let args = new_vm_resp.args.as_mut().unwrap();
if args.dtrfs_api_endpoint.starts_with(':') {
if let Some(node) = self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) {
args.dtrfs_api_endpoint = format!("{}{}", node.ip, args.dtrfs_api_endpoint);
}
}
} }
pub async fn submit_updatevm_resp(&self, mut resp: grpc::UpdateVmResp) { pub async fn submit_updatevm_resp(&self, mut update_vm_resp: grpc::UpdateVmResp) {
let updatevmreq = match self.tmp_updatevm_reqs.remove(&resp.uuid) { let update_vm_req = match self.tmp_updatevm_reqs.remove(&update_vm_resp.uuid) {
Some((_, r)) => r, Some((_, r)) => r,
None => { None => {
log::error!( log::error!(
"Received confirmation for ghost UpdateVMRequest {}", "Received confirmation for ghost UpdateVMRequest {}",
resp.uuid update_vm_resp.uuid
); );
resp.error = "Received confirmation for ghost UpdateVMRequest.".to_string(); update_vm_resp.error =
"Received confirmation for ghost UpdateVMRequest.".to_string();
return; return;
} }
}; };
if let None = resp.args { if update_vm_resp.error != "" {
log::error!("NewVmResp does not contain MeasurementArgs for {}", resp.uuid);
resp.error = "Daemon did not return measurement args.".to_string();
return;
};
if let Err(e) = updatevmreq.1.send(resp.clone()) {
log::error!("CLI RX dropped before receiving UpdateVMResp {resp:?}. Error is: {e:?}");
}
if resp.error != "" {
return; return;
} }
let mut contracts = self.contracts.write().unwrap();
if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == resp.uuid) {
let args = resp.args.as_mut().unwrap();
if args.dtrfs_api_endpoint.starts_with(':') {
if let Some(node) = self.find_nodes_by_pubkey(&contract.node_pubkey) {
args.dtrfs_api_endpoint = format!("{}{}", node.ip, args.dtrfs_api_endpoint);
}
}
contract.disk_size_gb = updatevmreq.0.disk_size_gb; let mut contracts = self.contracts.write().unwrap();
contract.vcpus = updatevmreq.0.vcpus; match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) {
contract.memory_mb = updatevmreq.0.memory_mb; Some(contract) => {
if !updatevmreq.0.kernel_sha.is_empty() { match update_vm_resp.args {
info!("Updating kernel sha for {} to {}", contract.uuid, updatevmreq.0.kernel_sha); Some(ref mut args) => {
contract.kernel_sha = updatevmreq.0.kernel_sha; if args.dtrfs_api_endpoint.starts_with(':') {
match self.find_nodes_by_pubkey(&contract.node_pubkey) {
Some(node) => {
args.dtrfs_api_endpoint =
format!("{}{}", node.ip, args.dtrfs_api_endpoint);
}
None => {
log::error!(
"Node not found for pubkey {}",
contract.node_pubkey
);
update_vm_resp.error = "Node not found.".to_string();
return;
}
}
}
}
None => {
log::error!(
"NewVmResp does not contain MeasurementArgs for {}",
update_vm_resp.uuid
);
update_vm_resp.error =
"Daemon did not return measurement args.".to_string();
return;
}
}
contract.disk_size_gb = update_vm_req.0.disk_size_gb;
contract.vcpus = update_vm_req.0.vcpus;
contract.memory_mb = update_vm_req.0.memory_mb;
if !update_vm_req.0.kernel_sha.is_empty() {
info!(
"Updating kernel sha for {} to {}",
contract.uuid, update_vm_req.0.kernel_sha
);
contract.kernel_sha = update_vm_req.0.kernel_sha;
}
if !update_vm_req.0.dtrfs_sha.is_empty() {
info!(
"Updating dtrfs sha for {} to {}",
contract.uuid, update_vm_req.0.dtrfs_sha
);
contract.dtrfs_sha = update_vm_req.0.dtrfs_sha;
}
contract.updated_at = Utc::now().to_rfc3339();
} }
if !updatevmreq.0.dtrfs_sha.is_empty() { None => {
info!("Updating dtrfs sha for {} to {}", contract.uuid, updatevmreq.0.dtrfs_sha); log::error!("Contract not found for {}.", update_vm_req.0.uuid);
contract.dtrfs_sha = updatevmreq.0.dtrfs_sha; update_vm_resp.error = "Contract not found.".to_string();
} }
contract.updated_at = Utc::now().to_rfc3339(); }
if let Err(_) = update_vm_req.1.send(update_vm_resp.clone()) {
log::error!(
"CLI RX dropped before receiving UpdateVMResp {:?}.",
update_vm_resp
);
} }
} }