Initial update_vm functionality to be merged (#3)
Now measurement args gets passed as an argument for both newvm and updatevm responses VM type now gets converted with the help of an intermediary function, reducing code duplication Updated proto file accordingly After merging, the whole code base still needs a major refactoring, error handling and a lot more checks to make sure daemon behaves as intended. Co-authored-by: Ramil_Algayev <pro.remred@gmail.com> Reviewed-on: SNP/daemon#3 Co-authored-by: ramrem <ralgayev@detee.ltd> Co-committed-by: ramrem <ralgayev@detee.ltd>
This commit is contained in:
parent
3c6074f735
commit
2a6c5d5e25
46
brain.proto
46
brain.proto
@ -24,6 +24,17 @@ message NodeResourceReq {
|
||||
uint32 max_ports_per_vm = 8;
|
||||
}
|
||||
|
||||
message MeasurementArgs {
|
||||
// this will be IP:Port of the dtrfs API
|
||||
// actually not a measurement arg, but needed for the injector
|
||||
string dtrfs_api_endpoint = 1;
|
||||
repeated uint32 exposed_ports = 2;
|
||||
string ovmf_hash = 5;
|
||||
// This is needed to allow the CLI to build the kernel params from known data.
|
||||
// The CLI will use the kernel params to get the measurement.
|
||||
repeated NewVmRespIP ips = 6;
|
||||
}
|
||||
|
||||
message NewVMReq {
|
||||
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
|
||||
string hostname = 2;
|
||||
@ -41,20 +52,27 @@ message NewVMReq {
|
||||
string dtrfs_sha = 14;
|
||||
}
|
||||
|
||||
message NewVMResp {
|
||||
string uuid = 1;
|
||||
string error = 2;
|
||||
MeasurementArgs args = 3;
|
||||
}
|
||||
|
||||
message UpdateVMReq {
|
||||
string uuid = 1;
|
||||
uint32 disk_size_gb = 3;
|
||||
uint32 vcpus = 4;
|
||||
uint32 memory_mb = 5;
|
||||
string kernel_url = 6;
|
||||
string kernel_sha = 7;
|
||||
string dtrfs_url = 8;
|
||||
string dtrfs_sha = 9;
|
||||
uint32 disk_size_gb = 2;
|
||||
uint32 vcpus = 3;
|
||||
uint32 memory_mb = 4;
|
||||
string kernel_url = 5;
|
||||
string kernel_sha = 6;
|
||||
string dtrfs_url = 7;
|
||||
string dtrfs_sha = 8;
|
||||
}
|
||||
|
||||
message UpdateVMResp {
|
||||
string uuid = 1;
|
||||
string error = 3;
|
||||
string error = 2;
|
||||
MeasurementArgs args = 3;
|
||||
}
|
||||
|
||||
message VMContract {
|
||||
@ -87,16 +105,6 @@ message NewVmRespIP {
|
||||
string gateway = 4;
|
||||
}
|
||||
|
||||
message NewVMResp {
|
||||
string uuid = 1;
|
||||
repeated uint32 exposed_ports = 2;
|
||||
string ovmf_hash = 5;
|
||||
// This is needed to allow the CLI to build the kernel params from known data.
|
||||
// The CLI will use the kernel params to get the measurement.
|
||||
repeated NewVmRespIP ips = 6;
|
||||
string error = 7;
|
||||
}
|
||||
|
||||
message DeleteVMReq {
|
||||
string uuid = 1;
|
||||
}
|
||||
@ -110,6 +118,7 @@ service BrainDaemonService {
|
||||
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
|
||||
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
|
||||
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
|
||||
//rpc GetMeasurementArgs (ListVMContractsReq) returns (stream MeasurementArgs);
|
||||
}
|
||||
|
||||
message NodeFilters {
|
||||
@ -142,4 +151,5 @@ service BrainCliService {
|
||||
rpc GetOneNode (NodeFilters) returns (NodeListResp);
|
||||
rpc DeleteVM (DeleteVMReq) returns (Empty);
|
||||
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
|
||||
//rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs);
|
||||
}
|
||||
|
@ -10,5 +10,7 @@ pub(crate) const START_VM_SCRIPT: &str = "/usr/local/bin/detee/start_qemu_vm.sh"
|
||||
// TODO: research if other CPU types provide better performance
|
||||
pub(crate) const QEMU_VM_CPU_TYPE: &str = "EPYC-v4";
|
||||
// If you modify this, also modify scripts/start_qemu_vm.sh
|
||||
pub(crate) const OVMF_HASH: &str = "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76";
|
||||
pub(crate) const OVMF_URL: &str = "https://drive.google.com/uc?export=download&id=1V-vLkaiLaGmFSjrN84Z6nELQOxKNAoSJ";
|
||||
pub(crate) const OVMF_HASH: &str =
|
||||
"0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76";
|
||||
pub(crate) const OVMF_URL: &str =
|
||||
"https://drive.google.com/uc?export=download&id=1V-vLkaiLaGmFSjrN84Z6nELQOxKNAoSJ";
|
||||
|
@ -206,6 +206,8 @@ pub struct ConnectionData {
|
||||
pub brain_url: String,
|
||||
pub newvm_tx: Sender<NewVmReq>,
|
||||
pub confirm_vm_rx: Receiver<NewVmResp>,
|
||||
pub updatevm_tx: Sender<UpdateVmReq>,
|
||||
pub confirm_updatevm_rx: Receiver<UpdateVmResp>,
|
||||
pub delete_vm_tx: Sender<DeleteVmReq>,
|
||||
pub resources_rx: Receiver<NodeResourceReq>,
|
||||
}
|
||||
@ -217,6 +219,8 @@ pub async fn connect_and_run(cd: ConnectionData) -> Result<()> {
|
||||
register_node(client.clone()).await;
|
||||
streaming_tasks.spawn(listen_for_new_vm_reqs(client.clone(), cd.newvm_tx));
|
||||
streaming_tasks.spawn(send_newvm_resp(client.clone(), cd.confirm_vm_rx));
|
||||
streaming_tasks.spawn(listen_for_update_vm_reqs(client.clone(), cd.updatevm_tx));
|
||||
streaming_tasks.spawn(send_updatevm_resp(client.clone(), cd.confirm_updatevm_rx));
|
||||
streaming_tasks.spawn(listen_for_deleted_vms(client.clone(), cd.delete_vm_tx));
|
||||
streaming_tasks.spawn(send_node_resources(client.clone(), cd.resources_rx));
|
||||
|
||||
|
60
src/main.rs
60
src/main.rs
@ -16,6 +16,8 @@ use tokio::{
|
||||
struct VMHandler {
|
||||
new_vm_req_chan: Receiver<brain::NewVmReq>,
|
||||
new_vm_resp_chan: Sender<brain::NewVmResp>,
|
||||
update_vm_req_chan: Receiver<brain::UpdateVmReq>,
|
||||
update_vm_resp_chan: Sender<brain::UpdateVmResp>,
|
||||
delete_vm_chan: Receiver<brain::DeleteVmReq>,
|
||||
resources_chan: Sender<brain::NodeResourceReq>,
|
||||
config: Config,
|
||||
@ -27,6 +29,8 @@ impl VMHandler {
|
||||
fn new(
|
||||
new_vm_req_chan: Receiver<brain::NewVmReq>,
|
||||
new_vm_resp_chan: Sender<brain::NewVmResp>,
|
||||
update_vm_req_chan: Receiver<brain::UpdateVmReq>,
|
||||
update_vm_resp_chan: Sender<brain::UpdateVmResp>,
|
||||
delete_vm_chan: Receiver<brain::DeleteVmReq>,
|
||||
resources_chan: Sender<brain::NodeResourceReq>,
|
||||
) -> Self {
|
||||
@ -42,7 +46,16 @@ impl VMHandler {
|
||||
state::Resources::new(&config.volumes)
|
||||
}
|
||||
};
|
||||
Self { new_vm_req_chan, new_vm_resp_chan, delete_vm_chan, resources_chan, config, res }
|
||||
Self {
|
||||
new_vm_req_chan,
|
||||
new_vm_resp_chan,
|
||||
update_vm_req_chan,
|
||||
update_vm_resp_chan,
|
||||
delete_vm_chan,
|
||||
resources_chan,
|
||||
config,
|
||||
res,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_available_ips(&self) -> (u32, u32) {
|
||||
@ -133,6 +146,33 @@ impl VMHandler {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_update_vm_req(&mut self, update_vm_req: brain::UpdateVmReq) -> Result<()> {
|
||||
debug!("Processing update vm request: {update_vm_req:?}");
|
||||
let vm_id = update_vm_req.uuid.clone();
|
||||
let content =
|
||||
std::fs::read_to_string(constants::VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
|
||||
let mut vm: state::VM = serde_yaml::from_str(&content)?;
|
||||
match vm.update(update_vm_req.into(), &self.config, &mut self.res) {
|
||||
Ok(_) => {
|
||||
info!("Succesfully updated VM {vm_id}");
|
||||
let _ = self.update_vm_resp_chan.send(vm.into()).await;
|
||||
self.send_node_resources().await;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Unable to update vm {vm_id} due to error: {e:?}");
|
||||
let _ = self
|
||||
.update_vm_resp_chan
|
||||
.send(brain::UpdateVmResp {
|
||||
uuid: vm_id,
|
||||
error: format!("{e:?}"),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_delete_vm(&mut self, delete_vm_req: brain::DeleteVmReq) -> Result<()> {
|
||||
let vm_id = delete_vm_req.uuid;
|
||||
let content =
|
||||
@ -149,6 +189,11 @@ impl VMHandler {
|
||||
Some(new_vm_req) = self.new_vm_req_chan.recv() => {
|
||||
self.handle_new_vm_req(new_vm_req).await;
|
||||
}
|
||||
Some(update_vm_req) = self.update_vm_req_chan.recv() => {
|
||||
if let Err(e) = self.handle_update_vm_req(update_vm_req).await {
|
||||
log::error!("Could not update vm: {e:?}");
|
||||
}
|
||||
}
|
||||
Some(delete_vm_req) = self.delete_vm_chan.recv() => {
|
||||
let uuid = delete_vm_req.uuid.clone();
|
||||
if let Err(e) = self.handle_delete_vm(delete_vm_req) {
|
||||
@ -201,10 +246,19 @@ async fn main() {
|
||||
loop {
|
||||
let (newvm_tx, newvm_rx) = tokio::sync::mpsc::channel(6);
|
||||
let (confirm_vm_tx, confirm_vm_rx) = tokio::sync::mpsc::channel(6);
|
||||
let (updatevm_tx, updatevm_rx) = tokio::sync::mpsc::channel(6);
|
||||
let (confirm_updatevm_tx, confirm_updatevm_rx) = tokio::sync::mpsc::channel(6);
|
||||
let (delete_vm_tx, delete_vm_rx) = tokio::sync::mpsc::channel(6);
|
||||
let (resources_tx, resources_rx) = tokio::sync::mpsc::channel(6);
|
||||
|
||||
let mut vm_handler = VMHandler::new(newvm_rx, confirm_vm_tx, delete_vm_rx, resources_tx);
|
||||
let mut vm_handler = VMHandler::new(
|
||||
newvm_rx,
|
||||
confirm_vm_tx,
|
||||
updatevm_rx,
|
||||
confirm_updatevm_tx,
|
||||
delete_vm_rx,
|
||||
resources_tx,
|
||||
);
|
||||
let brain_url = vm_handler.config.brain_url.clone();
|
||||
|
||||
info!("Trying to get VM Contracts from Brain to see if some Contracts got removed...");
|
||||
@ -222,6 +276,8 @@ async fn main() {
|
||||
brain_url,
|
||||
newvm_tx,
|
||||
confirm_vm_rx,
|
||||
updatevm_tx,
|
||||
confirm_updatevm_rx,
|
||||
delete_vm_tx,
|
||||
resources_rx,
|
||||
})
|
||||
|
81
src/state.rs
81
src/state.rs
@ -1,6 +1,11 @@
|
||||
#![allow(dead_code)]
|
||||
use crate::{config::Config, constants::*, grpc::brain};
|
||||
use crate::{
|
||||
config::Config,
|
||||
constants::*,
|
||||
grpc::brain,
|
||||
};
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::{
|
||||
@ -360,16 +365,18 @@ pub struct VM {
|
||||
storage_dir: String,
|
||||
}
|
||||
|
||||
impl Into<brain::NewVmResp> for VM {
|
||||
fn into(self) -> brain::NewVmResp {
|
||||
let mut nic_index: u32 = 0;
|
||||
let mut ips: Vec<brain::NewVmRespIp> = Vec::new();
|
||||
if self.fw_ports.len() > 0 {
|
||||
nic_index += 1;
|
||||
}
|
||||
impl VM {
|
||||
fn to_brain_vm_resp<T>(self, build_resp: impl FnOnce(brain::MeasurementArgs) -> T) -> T {
|
||||
let mut nic_index: u32 = if self.fw_ports.is_empty() { 0 } else { 1 };
|
||||
let mut ips = Vec::new();
|
||||
let mut dtrfs_api_endpoint = String::new();
|
||||
|
||||
// TODO: when brain supports multiple IPs per VM, fix this
|
||||
for nic in self.nics {
|
||||
for ip in nic.ips {
|
||||
if ip.address.parse::<std::net::Ipv4Addr>().is_ok() {
|
||||
dtrfs_api_endpoint = ip.address.clone();
|
||||
}
|
||||
ips.push(brain::NewVmRespIp {
|
||||
nic_index,
|
||||
address: ip.address,
|
||||
@ -379,14 +386,44 @@ impl Into<brain::NewVmResp> for VM {
|
||||
}
|
||||
nic_index += 1;
|
||||
}
|
||||
brain::NewVmResp {
|
||||
uuid: self.uuid,
|
||||
exposed_ports: self.fw_ports.iter().map(|(p, _)| *p as u32).collect(),
|
||||
|
||||
if !dtrfs_api_endpoint.is_empty() {
|
||||
dtrfs_api_endpoint += ":22";
|
||||
} else {
|
||||
dtrfs_api_endpoint += &format!(":{}", self.fw_ports[0].0);
|
||||
}
|
||||
|
||||
let args = brain::MeasurementArgs {
|
||||
dtrfs_api_endpoint,
|
||||
exposed_ports: self.fw_ports.iter().map(|(host_port, _)| *host_port as u32).collect(),
|
||||
ips,
|
||||
ovmf_hash: crate::constants::OVMF_HASH.to_string(),
|
||||
error: "".to_string(),
|
||||
};
|
||||
|
||||
build_resp(args)
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<brain::NewVmResp> for VM {
|
||||
fn into(self) -> brain::NewVmResp {
|
||||
let uuid = self.uuid.clone();
|
||||
self.to_brain_vm_resp(|args| brain::NewVmResp {
|
||||
uuid,
|
||||
args: Some(args),
|
||||
error: "".to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<brain::UpdateVmResp> for VM {
|
||||
fn into(self) -> brain::UpdateVmResp {
|
||||
let uuid = self.uuid.clone();
|
||||
self.to_brain_vm_resp(|args| brain::UpdateVmResp {
|
||||
uuid,
|
||||
args: Some(args),
|
||||
error: "".to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
@ -439,6 +476,21 @@ pub struct UpdateVMReq {
|
||||
dtrfs_sha: String,
|
||||
}
|
||||
|
||||
impl From<brain::UpdateVmReq> for UpdateVMReq {
|
||||
fn from(req: brain::UpdateVmReq) -> Self {
|
||||
Self {
|
||||
uuid: req.uuid,
|
||||
vcpus: req.vcpus as usize,
|
||||
memory_mb: req.memory_mb as usize,
|
||||
disk_size_gb: req.disk_size_gb as usize,
|
||||
kernel_url: req.kernel_url,
|
||||
kernel_sha: req.kernel_sha,
|
||||
dtrfs_url: req.dtrfs_url,
|
||||
dtrfs_sha: req.dtrfs_sha,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum VMCreationErrors {
|
||||
VMAlreadyExists(VM),
|
||||
@ -613,7 +665,10 @@ impl VM {
|
||||
"Could not get dtrfs: {dtrfs_err:?}"
|
||||
)));
|
||||
};
|
||||
|
||||
info!(
|
||||
"Kernel and DTRFS updated for VM: {}, kernel {}, dtrfs: {}",
|
||||
self.uuid, req.kernel_sha, req.dtrfs_sha
|
||||
);
|
||||
self.kernel_sha = req.kernel_sha;
|
||||
self.dtrfs_sha = req.dtrfs_sha;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user