update functionality for the brain #2
46
brain.proto
46
brain.proto
@ -24,6 +24,17 @@ message NodeResourceReq {
|
|||||||
uint32 max_ports_per_vm = 8;
|
uint32 max_ports_per_vm = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message MeasurementArgs {
|
||||||
|
// this will be IP:Port of the dtrfs API
|
||||||
|
// actually not a measurement arg, but needed for the injector
|
||||||
|
string dtrfs_api_endpoint = 1;
|
||||||
|
repeated uint32 exposed_ports = 2;
|
||||||
|
string ovmf_hash = 5;
|
||||||
|
// This is needed to allow the CLI to build the kernel params from known data.
|
||||||
|
// The CLI will use the kernel params to get the measurement.
|
||||||
|
repeated NewVmRespIP ips = 6;
|
||||||
|
}
|
||||||
|
|
||||||
message NewVMReq {
|
message NewVMReq {
|
||||||
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
|
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
|
||||||
string hostname = 2;
|
string hostname = 2;
|
||||||
@ -41,20 +52,27 @@ message NewVMReq {
|
|||||||
string dtrfs_sha = 14;
|
string dtrfs_sha = 14;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message NewVMResp {
|
||||||
|
string uuid = 1;
|
||||||
|
string error = 2;
|
||||||
|
MeasurementArgs args = 3;
|
||||||
|
}
|
||||||
|
|
||||||
message UpdateVMReq {
|
message UpdateVMReq {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
uint32 disk_size_gb = 3;
|
uint32 disk_size_gb = 2;
|
||||||
uint32 vcpus = 4;
|
uint32 vcpus = 3;
|
||||||
uint32 memory_mb = 5;
|
uint32 memory_mb = 4;
|
||||||
string kernel_url = 6;
|
string kernel_url = 5;
|
||||||
string kernel_sha = 7;
|
string kernel_sha = 6;
|
||||||
string dtrfs_url = 8;
|
string dtrfs_url = 7;
|
||||||
string dtrfs_sha = 9;
|
string dtrfs_sha = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
message UpdateVMResp {
|
message UpdateVMResp {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
string error = 3;
|
string error = 2;
|
||||||
|
MeasurementArgs args = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message VMContract {
|
message VMContract {
|
||||||
@ -87,16 +105,6 @@ message NewVmRespIP {
|
|||||||
string gateway = 4;
|
string gateway = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NewVMResp {
|
|
||||||
string uuid = 1;
|
|
||||||
repeated uint32 exposed_ports = 2;
|
|
||||||
string ovmf_hash = 5;
|
|
||||||
// This is needed to allow the CLI to build the kernel params from known data.
|
|
||||||
// The CLI will use the kernel params to get the measurement.
|
|
||||||
repeated NewVmRespIP ips = 6;
|
|
||||||
string error = 7;
|
|
||||||
}
|
|
||||||
|
|
||||||
message DeleteVMReq {
|
message DeleteVMReq {
|
||||||
string uuid = 1;
|
string uuid = 1;
|
||||||
}
|
}
|
||||||
@ -110,6 +118,7 @@ service BrainDaemonService {
|
|||||||
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
|
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
|
||||||
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
|
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
|
||||||
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
|
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
|
||||||
|
//rpc GetMeasurementArgs (ListVMContractsReq) returns (stream MeasurementArgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
message NodeFilters {
|
message NodeFilters {
|
||||||
@ -142,4 +151,5 @@ service BrainCliService {
|
|||||||
rpc GetOneNode (NodeFilters) returns (NodeListResp);
|
rpc GetOneNode (NodeFilters) returns (NodeListResp);
|
||||||
rpc DeleteVM (DeleteVMReq) returns (Empty);
|
rpc DeleteVM (DeleteVMReq) returns (Empty);
|
||||||
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
|
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
|
||||||
|
//rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs);
|
||||||
}
|
}
|
158
src/data.rs
158
src/data.rs
@ -1,5 +1,5 @@
|
|||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
use crate::grpc::brain as grpc;
|
use crate::grpc::brain::{self as grpc};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use log::{debug, info, warn};
|
use log::{debug, info, warn};
|
||||||
@ -184,31 +184,59 @@ impl BrainData {
|
|||||||
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
|
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) {
|
pub async fn submit_newvm_resp(&self, mut new_vm_resp: grpc::NewVmResp) {
|
||||||
let newvmreq = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) {
|
let new_vm_req = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) {
|
||||||
Some((_, r)) => r,
|
Some((_, r)) => r,
|
||||||
None => {
|
None => {
|
||||||
log::error!(
|
log::error!(
|
||||||
"Received confirmation for ghost NewVMReq {}",
|
"Received confirmation for ghost NewVMReq {}",
|
||||||
new_vm_resp.uuid
|
new_vm_resp.uuid
|
||||||
);
|
);
|
||||||
|
new_vm_resp.error = "Received confirmation for ghost NewVMReq.".to_string();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if let Err(e) = newvmreq.1.send(new_vm_resp.clone()) {
|
match new_vm_resp.args {
|
||||||
|
Some(ref mut args) => {
|
||||||
|
if args.dtrfs_api_endpoint.starts_with(':') {
|
||||||
|
match self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) {
|
||||||
|
Some(node) => {
|
||||||
|
args.dtrfs_api_endpoint =
|
||||||
|
format!("{}{}", node.ip, args.dtrfs_api_endpoint);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
log::error!("Node not found for pubkey {}", new_vm_req.0.node_pubkey);
|
||||||
|
new_vm_resp.error = "Node not found.".to_string();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
ghe0 marked this conversation as resolved
Outdated
|
|||||||
|
}
|
||||||
|
None => {
|
||||||
|
log::error!(
|
||||||
|
"NewVmResp does not contain MeasurementArgs for {}",
|
||||||
|
new_vm_resp.uuid
|
||||||
|
);
|
||||||
|
new_vm_resp.error = "Daemon did not return measurement args.".to_string();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) {
|
||||||
log::error!(
|
log::error!(
|
||||||
"CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}",
|
"CLI RX for {} dropped before receiving confirmation {:?}.",
|
||||||
&newvmreq.0.admin_pubkey,
|
&new_vm_req.0.admin_pubkey,
|
||||||
new_vm_resp,
|
new_vm_resp,
|
||||||
e
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if new_vm_resp.error != "" {
|
if new_vm_resp.error != "" {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut public_ipv4 = String::new();
|
let mut public_ipv4 = String::new();
|
||||||
let mut public_ipv6 = String::new();
|
let mut public_ipv6 = String::new();
|
||||||
for ip in new_vm_resp.ips {
|
|
||||||
|
let args = new_vm_resp.args.as_ref().unwrap();
|
||||||
|
for ip in args.ips.iter() {
|
||||||
if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) {
|
if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) {
|
||||||
if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() {
|
if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() {
|
||||||
public_ipv4 = ipv4_addr.to_string();
|
public_ipv4 = ipv4_addr.to_string();
|
||||||
@ -219,51 +247,106 @@ impl BrainData {
|
|||||||
public_ipv6 = ipv6_addr.to_string();
|
public_ipv6 = ipv6_addr.to_string();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let contract = Contract {
|
let contract = Contract {
|
||||||
uuid: new_vm_resp.uuid,
|
uuid: new_vm_resp.uuid,
|
||||||
exposed_ports: new_vm_resp.exposed_ports,
|
exposed_ports: args.exposed_ports.clone(),
|
||||||
public_ipv4,
|
public_ipv4,
|
||||||
public_ipv6,
|
public_ipv6,
|
||||||
created_at: Utc::now().to_rfc3339(),
|
created_at: Utc::now().to_rfc3339(),
|
||||||
updated_at: String::new(),
|
updated_at: String::new(),
|
||||||
hostname: newvmreq.0.hostname,
|
hostname: new_vm_req.0.hostname,
|
||||||
admin_pubkey: newvmreq.0.admin_pubkey,
|
admin_pubkey: new_vm_req.0.admin_pubkey,
|
||||||
node_pubkey: newvmreq.0.node_pubkey,
|
node_pubkey: new_vm_req.0.node_pubkey.clone(),
|
||||||
disk_size_gb: newvmreq.0.disk_size_gb,
|
disk_size_gb: new_vm_req.0.disk_size_gb,
|
||||||
vcpus: newvmreq.0.vcpus,
|
vcpus: new_vm_req.0.vcpus,
|
||||||
memory_mb: newvmreq.0.memory_mb,
|
memory_mb: new_vm_req.0.memory_mb,
|
||||||
kernel_sha: newvmreq.0.kernel_sha,
|
kernel_sha: new_vm_req.0.kernel_sha,
|
||||||
dtrfs_sha: newvmreq.0.dtrfs_sha,
|
dtrfs_sha: new_vm_req.0.dtrfs_sha,
|
||||||
};
|
};
|
||||||
info!("Created new contract: {contract:?}");
|
info!("Created new contract: {contract:?}");
|
||||||
self.contracts.write().unwrap().push(contract);
|
self.contracts.write().unwrap().push(contract);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn submit_updatevm_resp(&self, resp: grpc::UpdateVmResp) {
|
pub async fn submit_updatevm_resp(&self, mut update_vm_resp: grpc::UpdateVmResp) {
|
||||||
let updatevmreq = match self.tmp_updatevm_reqs.remove(&resp.uuid) {
|
let update_vm_req = match self.tmp_updatevm_reqs.remove(&update_vm_resp.uuid) {
|
||||||
Some((_, r)) => r,
|
Some((_, r)) => r,
|
||||||
None => {
|
None => {
|
||||||
log::error!(
|
log::error!(
|
||||||
"Received confirmation for ghost UpdateVMRequest {}",
|
"Received confirmation for ghost UpdateVMRequest {}",
|
||||||
resp.uuid
|
update_vm_resp.uuid
|
||||||
);
|
);
|
||||||
|
update_vm_resp.error =
|
||||||
|
"Received confirmation for ghost UpdateVMRequest.".to_string();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if let Err(e) = updatevmreq.1.send(resp.clone()) {
|
if update_vm_resp.error != "" {
|
||||||
log::error!("CLI RX dropped before receiving UpdateVMResp {resp:?}. Error is: {e:?}");
|
|
||||||
}
|
|
||||||
if resp.error != "" {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut contracts = self.contracts.write().unwrap();
|
let mut contracts = self.contracts.write().unwrap();
|
||||||
if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == resp.uuid) {
|
match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) {
|
||||||
contract.disk_size_gb = updatevmreq.0.disk_size_gb;
|
Some(contract) => {
|
||||||
contract.vcpus = updatevmreq.0.vcpus;
|
match update_vm_resp.args {
|
||||||
contract.memory_mb = updatevmreq.0.memory_mb;
|
Some(ref mut args) => {
|
||||||
contract.kernel_sha = updatevmreq.0.kernel_sha;
|
if args.dtrfs_api_endpoint.starts_with(':') {
|
||||||
contract.dtrfs_sha = updatevmreq.0.dtrfs_sha;
|
match self.find_nodes_by_pubkey(&contract.node_pubkey) {
|
||||||
contract.updated_at = Utc::now().to_rfc3339();
|
Some(node) => {
|
||||||
|
args.dtrfs_api_endpoint =
|
||||||
|
format!("{}{}", node.ip, args.dtrfs_api_endpoint);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
log::error!(
|
||||||
|
"Node not found for pubkey {}",
|
||||||
|
contract.node_pubkey
|
||||||
|
);
|
||||||
|
update_vm_resp.error = "Node not found.".to_string();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
log::error!(
|
||||||
|
"NewVmResp does not contain MeasurementArgs for {}",
|
||||||
|
update_vm_resp.uuid
|
||||||
|
);
|
||||||
|
update_vm_resp.error =
|
||||||
|
"Daemon did not return measurement args.".to_string();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
contract.disk_size_gb = update_vm_req.0.disk_size_gb;
|
||||||
|
contract.vcpus = update_vm_req.0.vcpus;
|
||||||
|
contract.memory_mb = update_vm_req.0.memory_mb;
|
||||||
|
if !update_vm_req.0.kernel_sha.is_empty() {
|
||||||
|
info!(
|
||||||
|
"Updating kernel sha for {} to {}",
|
||||||
|
contract.uuid, update_vm_req.0.kernel_sha
|
||||||
|
);
|
||||||
|
contract.kernel_sha = update_vm_req.0.kernel_sha;
|
||||||
|
}
|
||||||
|
if !update_vm_req.0.dtrfs_sha.is_empty() {
|
||||||
|
info!(
|
||||||
|
"Updating dtrfs sha for {} to {}",
|
||||||
|
contract.uuid, update_vm_req.0.dtrfs_sha
|
||||||
|
);
|
||||||
|
contract.dtrfs_sha = update_vm_req.0.dtrfs_sha;
|
||||||
|
}
|
||||||
|
contract.updated_at = Utc::now().to_rfc3339();
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
log::error!("Contract not found for {}.", update_vm_req.0.uuid);
|
||||||
|
update_vm_resp.error = "Contract not found.".to_string();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Err(_) = update_vm_req.1.send(update_vm_resp.clone()) {
|
||||||
|
log::error!(
|
||||||
|
"CLI RX dropped before receiving UpdateVMResp {:?}.",
|
||||||
|
update_vm_resp
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,9 +355,7 @@ impl BrainData {
|
|||||||
mut req: grpc::NewVmReq,
|
mut req: grpc::NewVmReq,
|
||||||
tx: OneshotSender<grpc::NewVmResp>,
|
tx: OneshotSender<grpc::NewVmResp>,
|
||||||
) {
|
) {
|
||||||
let uuid = uuid::Uuid::new_v4().to_string();
|
req.uuid = uuid::Uuid::new_v4().to_string();
|
||||||
|
|
||||||
req.uuid = uuid.clone();
|
|
||||||
info!("Inserting new vm request in memory: {req:?}");
|
info!("Inserting new vm request in memory: {req:?}");
|
||||||
self.tmp_newvm_reqs
|
self.tmp_newvm_reqs
|
||||||
.insert(req.uuid.clone(), (req.clone(), tx));
|
.insert(req.uuid.clone(), (req.clone(), tx));
|
||||||
@ -292,10 +373,8 @@ impl BrainData {
|
|||||||
);
|
);
|
||||||
self.submit_newvm_resp(grpc::NewVmResp {
|
self.submit_newvm_resp(grpc::NewVmResp {
|
||||||
error: "Daemon is offline.".to_string(),
|
error: "Daemon is offline.".to_string(),
|
||||||
uuid,
|
uuid: req.uuid,
|
||||||
exposed_ports: Vec::new(),
|
args: None,
|
||||||
ovmf_hash: "".to_string(),
|
|
||||||
ips: Vec::new(),
|
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
@ -319,6 +398,7 @@ impl BrainData {
|
|||||||
let _ = tx.send(grpc::UpdateVmResp {
|
let _ = tx.send(grpc::UpdateVmResp {
|
||||||
uuid,
|
uuid,
|
||||||
error: "Contract does not exist.".to_string(),
|
error: "Contract does not exist.".to_string(),
|
||||||
|
args: None,
|
||||||
});
|
});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -343,6 +423,7 @@ impl BrainData {
|
|||||||
self.submit_updatevm_resp(grpc::UpdateVmResp {
|
self.submit_updatevm_resp(grpc::UpdateVmResp {
|
||||||
uuid,
|
uuid,
|
||||||
error: "Daemon is offline.".to_string(),
|
error: "Daemon is offline.".to_string(),
|
||||||
|
args: None,
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
@ -352,6 +433,7 @@ impl BrainData {
|
|||||||
self.submit_updatevm_resp(grpc::UpdateVmResp {
|
self.submit_updatevm_resp(grpc::UpdateVmResp {
|
||||||
uuid,
|
uuid,
|
||||||
error: "Daemon is offline.".to_string(),
|
error: "Daemon is offline.".to_string(),
|
||||||
|
args: None,
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
@ -256,6 +256,7 @@ impl BrainDaemonService for BrainDaemonMock {
|
|||||||
data.submit_updatevm_resp(UpdateVmResp {
|
data.submit_updatevm_resp(UpdateVmResp {
|
||||||
error: "Daemon not connected.".to_string(),
|
error: "Daemon not connected.".to_string(),
|
||||||
uuid: updatevmreq.uuid,
|
uuid: updatevmreq.uuid,
|
||||||
|
args: None,
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
break;
|
break;
|
||||||
|
Loading…
Reference in New Issue
Block a user
If the daemon does not provide Measurement Args, the CLI can't execute the attestation. I believe we should not accept
grpc::NewVmResp
that does not contain Args. This should be replaced with a NewVmResp that contains an error with the message "SNP Node did not provide measurement args"Also, this means the contract is broken so we need to discuss the cases.