update functionality for the brain #2

Merged
ghe0 merged 11 commits from new-updater into main 2025-01-09 22:29:56 +00:00
3 changed files with 150 additions and 57 deletions

@ -24,6 +24,17 @@ message NodeResourceReq {
uint32 max_ports_per_vm = 8; uint32 max_ports_per_vm = 8;
} }
message MeasurementArgs {
// this will be IP:Port of the dtrfs API
// actually not a measurement arg, but needed for the injector
string dtrfs_api_endpoint = 1;
repeated uint32 exposed_ports = 2;
string ovmf_hash = 5;
// This is needed to allow the CLI to build the kernel params from known data.
// The CLI will use the kernel params to get the measurement.
repeated NewVmRespIP ips = 6;
}
message NewVMReq { message NewVMReq {
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
string hostname = 2; string hostname = 2;
@ -41,20 +52,27 @@ message NewVMReq {
string dtrfs_sha = 14; string dtrfs_sha = 14;
} }
message NewVMResp {
string uuid = 1;
string error = 2;
MeasurementArgs args = 3;
}
message UpdateVMReq { message UpdateVMReq {
string uuid = 1; string uuid = 1;
uint32 disk_size_gb = 3; uint32 disk_size_gb = 2;
uint32 vcpus = 4; uint32 vcpus = 3;
uint32 memory_mb = 5; uint32 memory_mb = 4;
string kernel_url = 6; string kernel_url = 5;
string kernel_sha = 7; string kernel_sha = 6;
string dtrfs_url = 8; string dtrfs_url = 7;
string dtrfs_sha = 9; string dtrfs_sha = 8;
} }
message UpdateVMResp { message UpdateVMResp {
string uuid = 1; string uuid = 1;
string error = 3; string error = 2;
MeasurementArgs args = 3;
} }
message VMContract { message VMContract {
@ -87,16 +105,6 @@ message NewVmRespIP {
string gateway = 4; string gateway = 4;
} }
message NewVMResp {
string uuid = 1;
repeated uint32 exposed_ports = 2;
string ovmf_hash = 5;
// This is needed to allow the CLI to build the kernel params from known data.
// The CLI will use the kernel params to get the measurement.
repeated NewVmRespIP ips = 6;
string error = 7;
}
message DeleteVMReq { message DeleteVMReq {
string uuid = 1; string uuid = 1;
} }
@ -110,6 +118,7 @@ service BrainDaemonService {
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
//rpc GetMeasurementArgs (ListVMContractsReq) returns (stream MeasurementArgs);
} }
message NodeFilters { message NodeFilters {
@ -142,4 +151,5 @@ service BrainCliService {
rpc GetOneNode (NodeFilters) returns (NodeListResp); rpc GetOneNode (NodeFilters) returns (NodeListResp);
rpc DeleteVM (DeleteVMReq) returns (Empty); rpc DeleteVM (DeleteVMReq) returns (Empty);
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
//rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs);
} }

@ -1,5 +1,5 @@
#![allow(dead_code)] #![allow(dead_code)]
use crate::grpc::brain as grpc; use crate::grpc::brain::{self as grpc};
use chrono::Utc; use chrono::Utc;
use dashmap::DashMap; use dashmap::DashMap;
use log::{debug, info, warn}; use log::{debug, info, warn};
@ -184,31 +184,59 @@ impl BrainData {
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
} }
pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) { pub async fn submit_newvm_resp(&self, mut new_vm_resp: grpc::NewVmResp) {
let newvmreq = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) { let new_vm_req = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) {
Some((_, r)) => r, Some((_, r)) => r,
None => { None => {
log::error!( log::error!(
"Received confirmation for ghost NewVMReq {}", "Received confirmation for ghost NewVMReq {}",
new_vm_resp.uuid new_vm_resp.uuid
); );
new_vm_resp.error = "Received confirmation for ghost NewVMReq.".to_string();
return; return;
} }
}; };
if let Err(e) = newvmreq.1.send(new_vm_resp.clone()) { match new_vm_resp.args {
Some(ref mut args) => {
if args.dtrfs_api_endpoint.starts_with(':') {
match self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) {
Some(node) => {
args.dtrfs_api_endpoint =
format!("{}{}", node.ip, args.dtrfs_api_endpoint);
}
None => {
log::error!("Node not found for pubkey {}", new_vm_req.0.node_pubkey);
new_vm_resp.error = "Node not found.".to_string();
return;
}
}
}
ghe0 marked this conversation as resolved Outdated
Outdated
Review

If the daemon does not provide Measurement Args, the CLI can't execute the attestation. I believe we should not accept grpc::NewVmResp that does not contain Args. This should be replaced with a NewVmResp that contains an error with the message "SNP Node did not provide measurement args"

Also, this means the contract is broken so we need to discuss the cases.

If the daemon does not provide Measurement Args, the CLI can't execute the attestation. I believe we should not accept `grpc::NewVmResp` that does not contain Args. This should be replaced with a NewVmResp that contains an error with the message "SNP Node did not provide measurement args" Also, this means the contract is broken so we need to discuss the cases.
}
None => {
log::error!(
"NewVmResp does not contain MeasurementArgs for {}",
new_vm_resp.uuid
);
new_vm_resp.error = "Daemon did not return measurement args.".to_string();
return;
}
}
if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) {
log::error!( log::error!(
"CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", "CLI RX for {} dropped before receiving confirmation {:?}.",
&newvmreq.0.admin_pubkey, &new_vm_req.0.admin_pubkey,
new_vm_resp, new_vm_resp,
e
); );
} }
if new_vm_resp.error != "" { if new_vm_resp.error != "" {
return; return;
} }
let mut public_ipv4 = String::new(); let mut public_ipv4 = String::new();
let mut public_ipv6 = String::new(); let mut public_ipv6 = String::new();
for ip in new_vm_resp.ips {
let args = new_vm_resp.args.as_ref().unwrap();
for ip in args.ips.iter() {
if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) {
if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() {
public_ipv4 = ipv4_addr.to_string(); public_ipv4 = ipv4_addr.to_string();
@ -219,51 +247,106 @@ impl BrainData {
public_ipv6 = ipv6_addr.to_string(); public_ipv6 = ipv6_addr.to_string();
} }
} }
let contract = Contract { let contract = Contract {
uuid: new_vm_resp.uuid, uuid: new_vm_resp.uuid,
exposed_ports: new_vm_resp.exposed_ports, exposed_ports: args.exposed_ports.clone(),
public_ipv4, public_ipv4,
public_ipv6, public_ipv6,
created_at: Utc::now().to_rfc3339(), created_at: Utc::now().to_rfc3339(),
updated_at: String::new(), updated_at: String::new(),
hostname: newvmreq.0.hostname, hostname: new_vm_req.0.hostname,
admin_pubkey: newvmreq.0.admin_pubkey, admin_pubkey: new_vm_req.0.admin_pubkey,
node_pubkey: newvmreq.0.node_pubkey, node_pubkey: new_vm_req.0.node_pubkey.clone(),
disk_size_gb: newvmreq.0.disk_size_gb, disk_size_gb: new_vm_req.0.disk_size_gb,
vcpus: newvmreq.0.vcpus, vcpus: new_vm_req.0.vcpus,
memory_mb: newvmreq.0.memory_mb, memory_mb: new_vm_req.0.memory_mb,
kernel_sha: newvmreq.0.kernel_sha, kernel_sha: new_vm_req.0.kernel_sha,
dtrfs_sha: newvmreq.0.dtrfs_sha, dtrfs_sha: new_vm_req.0.dtrfs_sha,
}; };
info!("Created new contract: {contract:?}"); info!("Created new contract: {contract:?}");
self.contracts.write().unwrap().push(contract); self.contracts.write().unwrap().push(contract);
} }
pub async fn submit_updatevm_resp(&self, resp: grpc::UpdateVmResp) { pub async fn submit_updatevm_resp(&self, mut update_vm_resp: grpc::UpdateVmResp) {
let updatevmreq = match self.tmp_updatevm_reqs.remove(&resp.uuid) { let update_vm_req = match self.tmp_updatevm_reqs.remove(&update_vm_resp.uuid) {
Some((_, r)) => r, Some((_, r)) => r,
None => { None => {
log::error!( log::error!(
"Received confirmation for ghost UpdateVMRequest {}", "Received confirmation for ghost UpdateVMRequest {}",
resp.uuid update_vm_resp.uuid
); );
update_vm_resp.error =
"Received confirmation for ghost UpdateVMRequest.".to_string();
return; return;
} }
}; };
if let Err(e) = updatevmreq.1.send(resp.clone()) { if update_vm_resp.error != "" {
log::error!("CLI RX dropped before receiving UpdateVMResp {resp:?}. Error is: {e:?}");
}
if resp.error != "" {
return; return;
} }
let mut contracts = self.contracts.write().unwrap(); let mut contracts = self.contracts.write().unwrap();
if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == resp.uuid) { match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) {
contract.disk_size_gb = updatevmreq.0.disk_size_gb; Some(contract) => {
contract.vcpus = updatevmreq.0.vcpus; match update_vm_resp.args {
contract.memory_mb = updatevmreq.0.memory_mb; Some(ref mut args) => {
contract.kernel_sha = updatevmreq.0.kernel_sha; if args.dtrfs_api_endpoint.starts_with(':') {
contract.dtrfs_sha = updatevmreq.0.dtrfs_sha; match self.find_nodes_by_pubkey(&contract.node_pubkey) {
contract.updated_at = Utc::now().to_rfc3339(); Some(node) => {
args.dtrfs_api_endpoint =
format!("{}{}", node.ip, args.dtrfs_api_endpoint);
}
None => {
log::error!(
"Node not found for pubkey {}",
contract.node_pubkey
);
update_vm_resp.error = "Node not found.".to_string();
return;
}
}
}
}
None => {
log::error!(
"NewVmResp does not contain MeasurementArgs for {}",
update_vm_resp.uuid
);
update_vm_resp.error =
"Daemon did not return measurement args.".to_string();
return;
}
}
contract.disk_size_gb = update_vm_req.0.disk_size_gb;
contract.vcpus = update_vm_req.0.vcpus;
contract.memory_mb = update_vm_req.0.memory_mb;
if !update_vm_req.0.kernel_sha.is_empty() {
info!(
"Updating kernel sha for {} to {}",
contract.uuid, update_vm_req.0.kernel_sha
);
contract.kernel_sha = update_vm_req.0.kernel_sha;
}
if !update_vm_req.0.dtrfs_sha.is_empty() {
info!(
"Updating dtrfs sha for {} to {}",
contract.uuid, update_vm_req.0.dtrfs_sha
);
contract.dtrfs_sha = update_vm_req.0.dtrfs_sha;
}
contract.updated_at = Utc::now().to_rfc3339();
}
None => {
log::error!("Contract not found for {}.", update_vm_req.0.uuid);
update_vm_resp.error = "Contract not found.".to_string();
}
}
if let Err(_) = update_vm_req.1.send(update_vm_resp.clone()) {
log::error!(
"CLI RX dropped before receiving UpdateVMResp {:?}.",
update_vm_resp
);
} }
} }
@ -272,9 +355,7 @@ impl BrainData {
mut req: grpc::NewVmReq, mut req: grpc::NewVmReq,
tx: OneshotSender<grpc::NewVmResp>, tx: OneshotSender<grpc::NewVmResp>,
) { ) {
let uuid = uuid::Uuid::new_v4().to_string(); req.uuid = uuid::Uuid::new_v4().to_string();
req.uuid = uuid.clone();
info!("Inserting new vm request in memory: {req:?}"); info!("Inserting new vm request in memory: {req:?}");
self.tmp_newvm_reqs self.tmp_newvm_reqs
.insert(req.uuid.clone(), (req.clone(), tx)); .insert(req.uuid.clone(), (req.clone(), tx));
@ -292,10 +373,8 @@ impl BrainData {
); );
self.submit_newvm_resp(grpc::NewVmResp { self.submit_newvm_resp(grpc::NewVmResp {
error: "Daemon is offline.".to_string(), error: "Daemon is offline.".to_string(),
uuid, uuid: req.uuid,
exposed_ports: Vec::new(), args: None,
ovmf_hash: "".to_string(),
ips: Vec::new(),
}) })
.await; .await;
} }
@ -319,6 +398,7 @@ impl BrainData {
let _ = tx.send(grpc::UpdateVmResp { let _ = tx.send(grpc::UpdateVmResp {
uuid, uuid,
error: "Contract does not exist.".to_string(), error: "Contract does not exist.".to_string(),
args: None,
}); });
return; return;
} }
@ -343,6 +423,7 @@ impl BrainData {
self.submit_updatevm_resp(grpc::UpdateVmResp { self.submit_updatevm_resp(grpc::UpdateVmResp {
uuid, uuid,
error: "Daemon is offline.".to_string(), error: "Daemon is offline.".to_string(),
args: None,
}) })
.await; .await;
} }
@ -352,6 +433,7 @@ impl BrainData {
self.submit_updatevm_resp(grpc::UpdateVmResp { self.submit_updatevm_resp(grpc::UpdateVmResp {
uuid, uuid,
error: "Daemon is offline.".to_string(), error: "Daemon is offline.".to_string(),
args: None,
}) })
.await; .await;
} }

@ -256,6 +256,7 @@ impl BrainDaemonService for BrainDaemonMock {
data.submit_updatevm_resp(UpdateVmResp { data.submit_updatevm_resp(UpdateVmResp {
error: "Daemon not connected.".to_string(), error: "Daemon not connected.".to_string(),
uuid: updatevmreq.uuid, uuid: updatevmreq.uuid,
args: None,
}) })
.await; .await;
break; break;