update functionality for the brain (#2)

These changes have been done:
measurement args are a separate argument which get passed from daemon
revamped updatevm and newvm resp functions, specially how args.dtrfs_api_endpoint gets handled
updating now returns error from brain if it cant find the contract
proto has been updated accordingly

Co-authored-by: Ramil_Algayev <pro.remred@gmail.com>
Co-authored-by: ghe0 <gheorghe@gheo.tech>
Reviewed-on: #2
Co-authored-by: ramrem <ralgayev@detee.ltd>
Co-committed-by: ramrem <ralgayev@detee.ltd>
This commit is contained in:
ramrem 2025-01-09 22:29:56 +00:00 committed by ghe0
parent 3ab668c4b1
commit 0e85165240
3 changed files with 150 additions and 57 deletions

@ -24,6 +24,17 @@ message NodeResourceReq {
uint32 max_ports_per_vm = 8; uint32 max_ports_per_vm = 8;
} }
message MeasurementArgs {
// this will be IP:Port of the dtrfs API
// actually not a measurement arg, but needed for the injector
string dtrfs_api_endpoint = 1;
repeated uint32 exposed_ports = 2;
string ovmf_hash = 5;
// This is needed to allow the CLI to build the kernel params from known data.
// The CLI will use the kernel params to get the measurement.
repeated NewVmRespIP ips = 6;
}
message NewVMReq { message NewVMReq {
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
string hostname = 2; string hostname = 2;
@ -41,20 +52,27 @@ message NewVMReq {
string dtrfs_sha = 14; string dtrfs_sha = 14;
} }
message NewVMResp {
string uuid = 1;
string error = 2;
MeasurementArgs args = 3;
}
message UpdateVMReq { message UpdateVMReq {
string uuid = 1; string uuid = 1;
uint32 disk_size_gb = 3; uint32 disk_size_gb = 2;
uint32 vcpus = 4; uint32 vcpus = 3;
uint32 memory_mb = 5; uint32 memory_mb = 4;
string kernel_url = 6; string kernel_url = 5;
string kernel_sha = 7; string kernel_sha = 6;
string dtrfs_url = 8; string dtrfs_url = 7;
string dtrfs_sha = 9; string dtrfs_sha = 8;
} }
message UpdateVMResp { message UpdateVMResp {
string uuid = 1; string uuid = 1;
string error = 3; string error = 2;
MeasurementArgs args = 3;
} }
message VMContract { message VMContract {
@ -87,16 +105,6 @@ message NewVmRespIP {
string gateway = 4; string gateway = 4;
} }
message NewVMResp {
string uuid = 1;
repeated uint32 exposed_ports = 2;
string ovmf_hash = 5;
// This is needed to allow the CLI to build the kernel params from known data.
// The CLI will use the kernel params to get the measurement.
repeated NewVmRespIP ips = 6;
string error = 7;
}
message DeleteVMReq { message DeleteVMReq {
string uuid = 1; string uuid = 1;
} }
@ -110,6 +118,7 @@ service BrainDaemonService {
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
//rpc GetMeasurementArgs (ListVMContractsReq) returns (stream MeasurementArgs);
} }
message NodeFilters { message NodeFilters {
@ -142,4 +151,5 @@ service BrainCliService {
rpc GetOneNode (NodeFilters) returns (NodeListResp); rpc GetOneNode (NodeFilters) returns (NodeListResp);
rpc DeleteVM (DeleteVMReq) returns (Empty); rpc DeleteVM (DeleteVMReq) returns (Empty);
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
} //rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs);
}

@ -1,5 +1,5 @@
#![allow(dead_code)] #![allow(dead_code)]
use crate::grpc::brain as grpc; use crate::grpc::brain::{self as grpc};
use chrono::Utc; use chrono::Utc;
use dashmap::DashMap; use dashmap::DashMap;
use log::{debug, info, warn}; use log::{debug, info, warn};
@ -184,31 +184,59 @@ impl BrainData {
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
} }
pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) { pub async fn submit_newvm_resp(&self, mut new_vm_resp: grpc::NewVmResp) {
let newvmreq = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) { let new_vm_req = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) {
Some((_, r)) => r, Some((_, r)) => r,
None => { None => {
log::error!( log::error!(
"Received confirmation for ghost NewVMReq {}", "Received confirmation for ghost NewVMReq {}",
new_vm_resp.uuid new_vm_resp.uuid
); );
new_vm_resp.error = "Received confirmation for ghost NewVMReq.".to_string();
return; return;
} }
}; };
if let Err(e) = newvmreq.1.send(new_vm_resp.clone()) { match new_vm_resp.args {
Some(ref mut args) => {
if args.dtrfs_api_endpoint.starts_with(':') {
match self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) {
Some(node) => {
args.dtrfs_api_endpoint =
format!("{}{}", node.ip, args.dtrfs_api_endpoint);
}
None => {
log::error!("Node not found for pubkey {}", new_vm_req.0.node_pubkey);
new_vm_resp.error = "Node not found.".to_string();
return;
}
}
}
}
None => {
log::error!(
"NewVmResp does not contain MeasurementArgs for {}",
new_vm_resp.uuid
);
new_vm_resp.error = "Daemon did not return measurement args.".to_string();
return;
}
}
if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) {
log::error!( log::error!(
"CLI RX for {} dropped before receiving confirmation {:?}. Error is: {:?}", "CLI RX for {} dropped before receiving confirmation {:?}.",
&newvmreq.0.admin_pubkey, &new_vm_req.0.admin_pubkey,
new_vm_resp, new_vm_resp,
e
); );
} }
if new_vm_resp.error != "" { if new_vm_resp.error != "" {
return; return;
} }
let mut public_ipv4 = String::new(); let mut public_ipv4 = String::new();
let mut public_ipv6 = String::new(); let mut public_ipv6 = String::new();
for ip in new_vm_resp.ips {
let args = new_vm_resp.args.as_ref().unwrap();
for ip in args.ips.iter() {
if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) { if let Ok(ipv4_addr) = std::net::Ipv4Addr::from_str(&ip.address) {
if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() { if !ipv4_addr.is_private() && !ipv4_addr.is_link_local() {
public_ipv4 = ipv4_addr.to_string(); public_ipv4 = ipv4_addr.to_string();
@ -219,51 +247,106 @@ impl BrainData {
public_ipv6 = ipv6_addr.to_string(); public_ipv6 = ipv6_addr.to_string();
} }
} }
let contract = Contract { let contract = Contract {
uuid: new_vm_resp.uuid, uuid: new_vm_resp.uuid,
exposed_ports: new_vm_resp.exposed_ports, exposed_ports: args.exposed_ports.clone(),
public_ipv4, public_ipv4,
public_ipv6, public_ipv6,
created_at: Utc::now().to_rfc3339(), created_at: Utc::now().to_rfc3339(),
updated_at: String::new(), updated_at: String::new(),
hostname: newvmreq.0.hostname, hostname: new_vm_req.0.hostname,
admin_pubkey: newvmreq.0.admin_pubkey, admin_pubkey: new_vm_req.0.admin_pubkey,
node_pubkey: newvmreq.0.node_pubkey, node_pubkey: new_vm_req.0.node_pubkey.clone(),
disk_size_gb: newvmreq.0.disk_size_gb, disk_size_gb: new_vm_req.0.disk_size_gb,
vcpus: newvmreq.0.vcpus, vcpus: new_vm_req.0.vcpus,
memory_mb: newvmreq.0.memory_mb, memory_mb: new_vm_req.0.memory_mb,
kernel_sha: newvmreq.0.kernel_sha, kernel_sha: new_vm_req.0.kernel_sha,
dtrfs_sha: newvmreq.0.dtrfs_sha, dtrfs_sha: new_vm_req.0.dtrfs_sha,
}; };
info!("Created new contract: {contract:?}"); info!("Created new contract: {contract:?}");
self.contracts.write().unwrap().push(contract); self.contracts.write().unwrap().push(contract);
} }
pub async fn submit_updatevm_resp(&self, resp: grpc::UpdateVmResp) { pub async fn submit_updatevm_resp(&self, mut update_vm_resp: grpc::UpdateVmResp) {
let updatevmreq = match self.tmp_updatevm_reqs.remove(&resp.uuid) { let update_vm_req = match self.tmp_updatevm_reqs.remove(&update_vm_resp.uuid) {
Some((_, r)) => r, Some((_, r)) => r,
None => { None => {
log::error!( log::error!(
"Received confirmation for ghost UpdateVMRequest {}", "Received confirmation for ghost UpdateVMRequest {}",
resp.uuid update_vm_resp.uuid
); );
update_vm_resp.error =
"Received confirmation for ghost UpdateVMRequest.".to_string();
return; return;
} }
}; };
if let Err(e) = updatevmreq.1.send(resp.clone()) { if update_vm_resp.error != "" {
log::error!("CLI RX dropped before receiving UpdateVMResp {resp:?}. Error is: {e:?}");
}
if resp.error != "" {
return; return;
} }
let mut contracts = self.contracts.write().unwrap(); let mut contracts = self.contracts.write().unwrap();
if let Some(contract) = contracts.iter_mut().find(|c| c.uuid == resp.uuid) { match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) {
contract.disk_size_gb = updatevmreq.0.disk_size_gb; Some(contract) => {
contract.vcpus = updatevmreq.0.vcpus; match update_vm_resp.args {
contract.memory_mb = updatevmreq.0.memory_mb; Some(ref mut args) => {
contract.kernel_sha = updatevmreq.0.kernel_sha; if args.dtrfs_api_endpoint.starts_with(':') {
contract.dtrfs_sha = updatevmreq.0.dtrfs_sha; match self.find_nodes_by_pubkey(&contract.node_pubkey) {
contract.updated_at = Utc::now().to_rfc3339(); Some(node) => {
args.dtrfs_api_endpoint =
format!("{}{}", node.ip, args.dtrfs_api_endpoint);
}
None => {
log::error!(
"Node not found for pubkey {}",
contract.node_pubkey
);
update_vm_resp.error = "Node not found.".to_string();
return;
}
}
}
}
None => {
log::error!(
"NewVmResp does not contain MeasurementArgs for {}",
update_vm_resp.uuid
);
update_vm_resp.error =
"Daemon did not return measurement args.".to_string();
return;
}
}
contract.disk_size_gb = update_vm_req.0.disk_size_gb;
contract.vcpus = update_vm_req.0.vcpus;
contract.memory_mb = update_vm_req.0.memory_mb;
if !update_vm_req.0.kernel_sha.is_empty() {
info!(
"Updating kernel sha for {} to {}",
contract.uuid, update_vm_req.0.kernel_sha
);
contract.kernel_sha = update_vm_req.0.kernel_sha;
}
if !update_vm_req.0.dtrfs_sha.is_empty() {
info!(
"Updating dtrfs sha for {} to {}",
contract.uuid, update_vm_req.0.dtrfs_sha
);
contract.dtrfs_sha = update_vm_req.0.dtrfs_sha;
}
contract.updated_at = Utc::now().to_rfc3339();
}
None => {
log::error!("Contract not found for {}.", update_vm_req.0.uuid);
update_vm_resp.error = "Contract not found.".to_string();
}
}
if let Err(_) = update_vm_req.1.send(update_vm_resp.clone()) {
log::error!(
"CLI RX dropped before receiving UpdateVMResp {:?}.",
update_vm_resp
);
} }
} }
@ -272,9 +355,7 @@ impl BrainData {
mut req: grpc::NewVmReq, mut req: grpc::NewVmReq,
tx: OneshotSender<grpc::NewVmResp>, tx: OneshotSender<grpc::NewVmResp>,
) { ) {
let uuid = uuid::Uuid::new_v4().to_string(); req.uuid = uuid::Uuid::new_v4().to_string();
req.uuid = uuid.clone();
info!("Inserting new vm request in memory: {req:?}"); info!("Inserting new vm request in memory: {req:?}");
self.tmp_newvm_reqs self.tmp_newvm_reqs
.insert(req.uuid.clone(), (req.clone(), tx)); .insert(req.uuid.clone(), (req.clone(), tx));
@ -292,10 +373,8 @@ impl BrainData {
); );
self.submit_newvm_resp(grpc::NewVmResp { self.submit_newvm_resp(grpc::NewVmResp {
error: "Daemon is offline.".to_string(), error: "Daemon is offline.".to_string(),
uuid, uuid: req.uuid,
exposed_ports: Vec::new(), args: None,
ovmf_hash: "".to_string(),
ips: Vec::new(),
}) })
.await; .await;
} }
@ -319,6 +398,7 @@ impl BrainData {
let _ = tx.send(grpc::UpdateVmResp { let _ = tx.send(grpc::UpdateVmResp {
uuid, uuid,
error: "Contract does not exist.".to_string(), error: "Contract does not exist.".to_string(),
args: None,
}); });
return; return;
} }
@ -343,6 +423,7 @@ impl BrainData {
self.submit_updatevm_resp(grpc::UpdateVmResp { self.submit_updatevm_resp(grpc::UpdateVmResp {
uuid, uuid,
error: "Daemon is offline.".to_string(), error: "Daemon is offline.".to_string(),
args: None,
}) })
.await; .await;
} }
@ -352,6 +433,7 @@ impl BrainData {
self.submit_updatevm_resp(grpc::UpdateVmResp { self.submit_updatevm_resp(grpc::UpdateVmResp {
uuid, uuid,
error: "Daemon is offline.".to_string(), error: "Daemon is offline.".to_string(),
args: None,
}) })
.await; .await;
} }

@ -256,6 +256,7 @@ impl BrainDaemonService for BrainDaemonMock {
data.submit_updatevm_resp(UpdateVmResp { data.submit_updatevm_resp(UpdateVmResp {
error: "Daemon not connected.".to_string(), error: "Daemon not connected.".to_string(),
uuid: updatevmreq.uuid, uuid: updatevmreq.uuid,
args: None,
}) })
.await; .await;
break; break;