snp-daemon/src/state.rs

1163 lines
42 KiB
Rust

// SPDX-License-Identifier: Apache-2.0
use crate::{config::Config, global::*, grpc::snp_proto};
use anyhow::{anyhow, Result};
use log::{debug, info};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
fs,
fs::{remove_file, File},
io::Write,
process::Command,
};
#[derive(Debug)]
pub enum VMCreationErrors {
PriceIsTooLow,
VMAlreadyExists(VM),
NATandIPv4Conflict,
ServerFull,
NotEnoughPorts,
NotEnoughCPU,
NotEnoughMemory,
NotEnoughStorage,
IPv4NotAvailable,
IPv6NotAvailable,
MemoryTooLow,
DiskTooSmall,
DowngradeNotSupported,
UnbalancedVM,
ServerDiskError(String),
BootFileError(String),
HypervizorError(String),
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct HwReservation {
vcpus: usize,
memory: usize,
disk: usize,
}
pub struct State {
pub config: Config,
pub res: Resources,
}
impl State {
fn available_ports(&mut self, extra_ports: usize) -> Vec<u16> {
use rand::Rng;
let total_ports = extra_ports + 1;
if self.config.public_port_range.len()
< self.res.reserved_ports.len() + total_ports as usize
{
return Vec::new();
}
if total_ports > self.config.max_ports_per_vm as usize {
return Vec::new();
}
let mut published_ports = Vec::new();
for _ in 0..total_ports {
for _ in 0..5 {
let port = rand::thread_rng().gen_range(self.config.public_port_range.clone());
if self.res.reserved_ports.get(&port).is_none() {
published_ports.push(port);
}
break;
}
}
published_ports.sort();
published_ports
}
fn nr_of_available_ips(&self) -> (u32, u32) {
let mut avail_ipv4 = 0;
let mut avail_ipv6 = 0;
for nic in self.config.network_interfaces.iter() {
for range in nic.ipv4_ranges.iter() {
avail_ipv4 += (range.last_ip.to_bits() + 1) - range.first_ip.to_bits();
}
for range in nic.ipv6_ranges.iter() {
avail_ipv6 += (range.last_ip.to_bits() + 1) - range.first_ip.to_bits();
}
}
(
avail_ipv4.saturating_sub(self.res.reserved_ipv4.len() as u32),
avail_ipv6.saturating_sub(self.res.reserved_ipv6.len() as u128) as u32,
)
}
pub fn available_resources(&self) -> snp_proto::VmNodeResources {
let (avail_ipv4, avail_ipv6) = self.nr_of_available_ips();
let mut offer_resources: Vec<snp_proto::VmNodeOffer> = Vec::new();
for config_offer in self.config.offers.iter() {
let offer = match self.res.reserved_hw.get(&config_offer.storage_path) {
Some(used_resources) => snp_proto::VmNodeOffer {
price: config_offer.price,
vcpus: config_offer.max_vcpu_reservation.saturating_sub(used_resources.vcpus)
as u64,
memory_mib: config_offer
.max_mem_reservation_mib
.saturating_sub(used_resources.memory)
as u64,
disk_mib: config_offer.max_storage_mib.saturating_sub(used_resources.disk)
as u64,
},
None => snp_proto::VmNodeOffer {
price: config_offer.price,
vcpus: config_offer.max_vcpu_reservation as u64,
memory_mib: config_offer.max_mem_reservation_mib as u64,
disk_mib: config_offer.max_storage_mib as u64,
},
};
offer_resources.push(offer);
}
let res = snp_proto::VmNodeResources {
node_pubkey: PUBLIC_KEY.clone(),
avail_ports: (self.config.public_port_range.len() - self.res.reserved_ports.len())
as u32,
avail_ipv4,
avail_ipv6,
offers: offer_resources,
max_ports_per_vm: self.config.max_ports_per_vm as u32,
};
debug!("Node resources are currently: {res:?}");
res
}
pub fn get_offer_path(&self, req: &NewVMRequest) -> Result<String, VMCreationErrors> {
let vm_mem_per_cpu = req.memory_mib / req.vcpus;
let vm_disk_per_cpu = req.disk_size_mib / req.vcpus;
let mut offer_path = String::new();
let mut error = VMCreationErrors::UnbalancedVM;
for offer in self.config.offers.iter() {
let available_resources = match self.res.reserved_hw.get(&offer.storage_path) {
Some(used_resources) => HwReservation {
vcpus: offer.max_vcpu_reservation.saturating_sub(used_resources.vcpus),
memory: offer.max_mem_reservation_mib.saturating_sub(used_resources.memory),
disk: offer.max_storage_mib.saturating_sub(used_resources.disk),
},
None => HwReservation {
vcpus: offer.max_vcpu_reservation,
memory: offer.max_mem_reservation_mib,
disk: offer.max_storage_mib,
},
};
let mem_per_cpu = available_resources.memory / available_resources.vcpus;
let disk_per_cpu = available_resources.disk / available_resources.vcpus;
if !within_10_percent(vm_mem_per_cpu, mem_per_cpu)
|| !within_10_percent(vm_disk_per_cpu, disk_per_cpu)
{
continue;
}
if req.vcpus > available_resources.vcpus {
error = VMCreationErrors::NotEnoughCPU;
continue;
}
if req.memory_mib > available_resources.memory + 50 {
error = VMCreationErrors::NotEnoughMemory;
continue;
}
if req.disk_size_mib > available_resources.disk + 100 {
error = VMCreationErrors::NotEnoughStorage;
continue;
}
if req.price < offer.price {
error = VMCreationErrors::PriceIsTooLow;
continue;
}
offer_path = offer.storage_path.clone();
}
if offer_path.is_empty() {
return Err(error);
}
Ok(offer_path)
}
pub fn new_vm(&mut self, req: NewVMRequest) -> Result<VM, VMCreationErrors> {
let offer_path = self.get_offer_path(&req)?;
if self.res.existing_vms.contains(&req.vm_id) {
let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &req.vm_id + ".yaml")
.map_err(|e| VMCreationErrors::ServerDiskError(e.to_string()))?;
let vm: crate::state::VM = serde_yaml::from_str(&content)
.map_err(|e| VMCreationErrors::ServerDiskError(e.to_string()))?;
return Err(VMCreationErrors::VMAlreadyExists(vm));
}
if req.extra_ports.len() > 0 && req.public_ipv4 {
return Err(VMCreationErrors::NATandIPv4Conflict);
}
if req.memory_mib < 800 {
return Err(VMCreationErrors::MemoryTooLow);
}
if req.disk_size_mib < 4000 {
return Err(VMCreationErrors::DiskTooSmall);
}
if let Err(ovmf_err) =
self.res.find_or_download_file(OVMF_URL.to_string(), OVMF_HASH.to_string())
{
return Err(VMCreationErrors::BootFileError(format!(
"Could not get OVMF: {ovmf_err:?}"
)));
};
if let Err(kernel_err) =
self.res.find_or_download_file(req.kernel_url, req.kernel_sha.clone())
{
return Err(VMCreationErrors::BootFileError(format!(
"Could not get kernel: {kernel_err:?}"
)));
};
if let Err(dtrfs_err) = self.res.find_or_download_file(req.dtrfs_url, req.dtrfs_sha.clone())
{
return Err(VMCreationErrors::BootFileError(format!(
"Could not get dtrfs: {dtrfs_err:?}"
)));
};
let mut vm_nics = Vec::new();
if req.public_ipv4 {
match self.res.available_ipv4(&self.config.network_interfaces) {
Some(vmnic) => vm_nics.push(vmnic),
None => return Err(VMCreationErrors::IPv4NotAvailable),
}
}
if req.public_ipv6 {
match self.res.available_ipv6(&self.config.network_interfaces) {
Some(mut vmnic) => {
if let Some(mut existing_vmnic) = vm_nics.pop() {
if vmnic.if_config.device_name() == existing_vmnic.if_config.device_name() {
vmnic.ips.append(&mut existing_vmnic.ips);
vm_nics.push(vmnic);
} else {
vm_nics.push(existing_vmnic);
vm_nics.push(vmnic);
}
} else {
vm_nics.push(vmnic);
}
}
None => {
return Err(VMCreationErrors::IPv6NotAvailable);
}
}
}
let mut host_ports: Vec<u16> = Vec::new();
let mut port_pairs: Vec<(u16, u16)> = Vec::new();
if !req.public_ipv4 {
host_ports.append(self.available_ports(req.extra_ports.len()).as_mut());
if host_ports.len() == 0 {
return Err(VMCreationErrors::NotEnoughPorts);
}
port_pairs.push((host_ports[0], 22));
for i in 0..req.extra_ports.len() {
port_pairs.push((host_ports[i + 1], req.extra_ports[i]));
}
}
let vm = VM {
vm_id: req.vm_id,
admin_key: req.admin_key,
nics: vm_nics,
vcpus: req.vcpus,
memory_mib: req.memory_mib,
disk_size_mib: req.disk_size_mib,
kernel_sha: req.kernel_sha,
dtrfs_sha: req.dtrfs_sha,
fw_ports: port_pairs,
storage_dir: offer_path,
};
if let Err(e) = vm.write_config() {
return Err(VMCreationErrors::ServerDiskError(e.to_string()));
}
self.res.reserve_vm_resources(&vm);
Ok(vm)
}
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Resources {
pub existing_vms: HashSet<String>,
// QEMU does not support MHz limiation
pub reserved_ports: HashSet<u16>,
// maps storage path to hardware reservation
pub reserved_hw: HashMap<String, HwReservation>,
pub reserved_ipv4: HashSet<String>,
pub reserved_ipv6: HashSet<String>,
pub reserved_if_names: HashSet<String>,
// sha256sum -> absolute path
boot_files: HashSet<String>,
}
impl Resources {
fn save_to_disk(&self) -> Result<()> {
let mut file = File::create(USED_RESOURCES)?;
file.write_all(serde_yaml::to_string(self)?.as_bytes())?;
Ok(())
}
pub fn load_from_disk() -> Result<Self> {
let mut res = Self { ..Default::default() };
log::debug!("Reading VMs saved to disk to calculate used resources...");
for entry in fs::read_dir(VM_CONFIG_DIR)? {
let entry = entry?;
let path = entry.path();
if path.is_file() && path.to_string_lossy().ends_with(".yaml") {
log::info!("Found VM config: {:?}", path.to_str());
let content = std::fs::read_to_string(path)?;
let vm: VM = serde_yaml::from_str(&content)?;
res.existing_vms.insert(vm.vm_id);
res.reserved_hw
.entry(vm.storage_dir)
.and_modify(|utilization| {
utilization.vcpus = utilization.vcpus.saturating_add(vm.vcpus);
utilization.memory = utilization.memory.saturating_add(vm.memory_mib);
utilization.disk = utilization.disk.saturating_add(vm.disk_size_mib);
})
.or_insert(HwReservation {
vcpus: vm.vcpus,
memory: vm.memory_mib,
disk: vm.disk_size_mib,
});
for (port, _) in vm.fw_ports.iter() {
res.reserved_ports.insert(*port);
}
for nic in vm.nics {
for ip in nic.ips {
if let Ok(ip_address) = ip.address.parse::<std::net::IpAddr>() {
if ip_address.is_ipv4() {
res.reserved_ipv4.insert(ip.address.clone());
}
if ip_address.is_ipv6() {
res.reserved_ipv6.insert(ip.address);
}
}
}
if let Some(vtap_name) = nic.if_config.vtap_name() {
res.reserved_if_names.insert(vtap_name);
}
}
}
}
res.scan_boot_files().unwrap();
res.save_to_disk()?;
Ok(res)
}
fn available_if_name(&mut self) -> String {
use rand::{distributions::Alphanumeric, Rng};
loop {
let mut interface_name: String =
rand::thread_rng().sample_iter(&Alphanumeric).take(9).map(char::from).collect();
interface_name = "detee".to_string() + &interface_name;
if !self.reserved_if_names.contains(&interface_name) {
return interface_name;
}
}
}
fn available_ipv4(&mut self, nics: &Vec<crate::config::Interface>) -> Option<VMNIC> {
for nic in nics.iter() {
for range in nic.ipv4_ranges.iter() {
let first_ip = range.first_ip.to_bits();
let last_ip = range.last_ip.to_bits();
for ip in first_ip..last_ip + 1 {
let ip_addr = std::net::Ipv4Addr::from_bits(ip);
if !self.reserved_ipv4.contains(&ip_addr.to_string())
&& ip_addr != range.gateway
{
let if_config = match nic.driver {
crate::config::InterfaceType::MACVTAP => InterfaceConfig::MACVTAP {
name: self.available_if_name(),
device: nic.device.clone(),
},
crate::config::InterfaceType::IPVTAP => InterfaceConfig::IPVTAP {
name: self.available_if_name(),
device: nic.device.clone(),
},
crate::config::InterfaceType::Bridge => {
InterfaceConfig::Bridge { device: nic.device.clone() }
}
};
let mut ips = Vec::new();
ips.push(IPConfig {
address: ip_addr.to_string(),
mask: range.netmask.clone(),
gateway: range.gateway.to_string(),
});
return Some(VMNIC { if_config, ips });
}
}
}
}
None
}
fn available_ipv6(&mut self, nics: &Vec<crate::config::Interface>) -> Option<VMNIC> {
for nic in nics.iter() {
for range in nic.ipv6_ranges.iter() {
let first_ip = range.first_ip.to_bits();
let last_ip = range.last_ip.to_bits();
for ip in first_ip..last_ip + 1 {
let ip_addr = std::net::Ipv6Addr::from_bits(ip);
if !self.reserved_ipv6.contains(&ip_addr.to_string())
&& ip_addr != range.gateway
{
let if_config = match nic.driver {
crate::config::InterfaceType::MACVTAP => InterfaceConfig::MACVTAP {
name: self.available_if_name(),
device: nic.device.clone(),
},
crate::config::InterfaceType::IPVTAP => InterfaceConfig::IPVTAP {
name: self.available_if_name(),
device: nic.device.clone(),
},
crate::config::InterfaceType::Bridge => {
InterfaceConfig::Bridge { device: nic.device.clone() }
}
};
let mut ips = Vec::new();
ips.push(IPConfig {
address: ip_addr.to_string(),
mask: range.netmask.clone(),
gateway: range.gateway.to_string(),
});
return Some(VMNIC { if_config, ips });
}
}
}
}
None
}
fn scan_boot_files(&mut self) -> Result<()> {
for entry in fs::read_dir(VM_BOOT_DIR)? {
let entry = entry?;
let path = entry.path();
if path.is_file() {
match compute_sha256(&path) {
Ok(hash) => {
if *hash == *entry.file_name() {
self.boot_files.insert(hash);
} else {
// TODO: rename file and insert
}
}
Err(e) => return Err(anyhow!("Error computing hash for {:?}: {}", path, e)),
}
}
}
Ok(())
}
fn find_or_download_file(&mut self, url: String, sha: String) -> Result<()> {
if !self.boot_files.contains(&sha) {
download_and_check_sha(&url, &sha)?;
}
self.boot_files.insert(sha);
Ok(())
}
fn reserve_vm_resources(&mut self, vm: &VM) {
self.existing_vms.insert(vm.vm_id.clone());
for nic in vm.nics.iter() {
if let Some(vtap) = nic.if_config.vtap_name() {
self.reserved_if_names.insert(vtap);
}
for ip in nic.ips.iter() {
if let Ok(ip_address) = ip.address.parse::<std::net::IpAddr>() {
if ip_address.is_ipv4() {
self.reserved_ipv4.insert(ip.address.clone());
}
if ip_address.is_ipv6() {
self.reserved_ipv6.insert(ip.address.clone());
}
}
}
}
for (host_port, _) in vm.fw_ports.iter() {
self.reserved_ports.insert(*host_port);
}
self.reserved_hw
.entry(vm.storage_dir.clone())
.and_modify(|hw_res| {
hw_res.vcpus = hw_res.vcpus.saturating_add(vm.vcpus);
hw_res.memory = hw_res.memory.saturating_add(vm.memory_mib);
hw_res.disk = hw_res.disk.saturating_add(vm.disk_size_mib);
})
.or_insert(HwReservation {
vcpus: vm.vcpus,
memory: vm.memory_mib,
disk: vm.disk_size_mib,
});
let _ = self.save_to_disk();
}
fn free_vm_resources(&mut self, vm: &VM) {
if !self.existing_vms.remove(&vm.vm_id) {
return;
}
for nic in vm.nics.iter() {
if let Some(vtap) = nic.if_config.vtap_name() {
self.reserved_if_names.remove(&vtap);
}
for ip in nic.ips.iter() {
if let Ok(ip_address) = ip.address.parse::<std::net::IpAddr>() {
if ip_address.is_ipv4() {
self.reserved_ipv4.remove(&ip.address);
}
if ip_address.is_ipv6() {
self.reserved_ipv6.remove(&ip.address);
}
}
}
}
for (host_port, _) in vm.fw_ports.iter() {
self.reserved_ports.remove(host_port);
}
self.reserved_hw.entry(vm.storage_dir.clone()).and_modify(|hw_res| {
hw_res.vcpus = hw_res.vcpus.saturating_sub(vm.vcpus);
hw_res.memory = hw_res.memory.saturating_sub(vm.memory_mib);
hw_res.disk = hw_res.disk.saturating_sub(vm.disk_size_mib);
});
if let Err(e) = self.save_to_disk() {
log::error!("Could not save resources to disk: {e}");
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub enum InterfaceConfig {
// TODO: instead of QEMU userspace NAT, use iptables kernelspace NAT
// in case of QEMU-base NAT, device name is not needed
NAT { device: String },
MACVTAP { name: String, device: String },
IPVTAP { name: String, device: String },
Bridge { device: String },
}
impl InterfaceConfig {
fn if_type(&self) -> String {
match self {
InterfaceConfig::IPVTAP { .. } => format!("ipvtap"),
InterfaceConfig::MACVTAP { .. } => format!("macvtap"),
InterfaceConfig::NAT { .. } => format!("nat"),
InterfaceConfig::Bridge { .. } => format!("bridge"),
}
}
fn device_name(&self) -> String {
match self {
InterfaceConfig::IPVTAP { device, .. } => device.clone(),
InterfaceConfig::MACVTAP { device, .. } => device.clone(),
InterfaceConfig::NAT { device, .. } => device.clone(),
InterfaceConfig::Bridge { device, .. } => device.clone(),
}
}
fn vtap_name(&self) -> Option<String> {
match self {
InterfaceConfig::IPVTAP { name, .. } => Some(name.clone()),
InterfaceConfig::MACVTAP { name, .. } => Some(name.clone()),
InterfaceConfig::NAT { .. } | InterfaceConfig::Bridge { .. } => None,
}
}
// We will need this when we enable more complex NIC setup.
// fn is_vtap(&self) -> bool {
// match self {
// InterfaceConfig::IPVTAP { .. } | InterfaceConfig::MACVTAP { .. } => true,
// InterfaceConfig::NAT { .. } | InterfaceConfig::Bridge { .. } => false,
// }
// }
}
#[derive(Serialize, Deserialize, Debug)]
struct IPConfig {
address: String,
// requires short format (example: 24)
mask: String,
gateway: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct VMNIC {
if_config: InterfaceConfig,
ips: Vec<IPConfig>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct VM {
pub vm_id: String,
admin_key: String,
fw_ports: Vec<(u16, u16)>,
nics: Vec<VMNIC>,
// currently hardcoded to EPYC-v4
// cpu_type: String,
vcpus: usize,
memory_mib: usize,
disk_size_mib: usize,
kernel_sha: String,
dtrfs_sha: String,
storage_dir: String,
}
impl From<VM> for snp_proto::MeasurementArgs {
fn from(vm: VM) -> Self {
let mut nic_index: u32 = if vm.fw_ports.is_empty() { 0 } else { 1 };
let mut ips = Vec::new();
let mut dtrfs_api_endpoint = String::new();
// TODO: when brain supports multiple IPs per VM, fix this
for nic in vm.nics {
for ip in nic.ips {
if ip.address.parse::<std::net::Ipv4Addr>().is_ok() {
dtrfs_api_endpoint = ip.address.clone();
}
ips.push(snp_proto::MeasurementIp {
nic_index,
address: ip.address,
mask: ip.mask,
gateway: ip.gateway,
});
}
nic_index += 1;
}
if !dtrfs_api_endpoint.is_empty() {
dtrfs_api_endpoint += ":22";
} else {
dtrfs_api_endpoint += &format!("{}:{}", IP_INFO.ip, vm.fw_ports[0].0);
}
snp_proto::MeasurementArgs {
dtrfs_api_endpoint,
exposed_ports: vm.fw_ports.iter().map(|(host_port, _)| *host_port as u32).collect(),
ips,
ovmf_hash: OVMF_HASH.to_string(),
}
}
}
impl From<VM> for snp_proto::NewVmResp {
fn from(vm: VM) -> Self {
let vm_id = vm.vm_id.clone();
snp_proto::NewVmResp { vm_id, args: Some(vm.into()), error: "".to_string() }
}
}
impl From<VM> for snp_proto::UpdateVmResp {
fn from(vm: VM) -> Self {
let vm_id = vm.vm_id.clone();
snp_proto::UpdateVmResp { vm_id, args: Some(vm.into()), error: "".to_string() }
}
}
#[derive(Deserialize, Debug)]
pub struct NewVMRequest {
vm_id: String,
admin_key: String,
extra_ports: Vec<u16>,
public_ipv4: bool,
public_ipv6: bool,
disk_size_mib: usize,
vcpus: usize,
memory_mib: usize,
kernel_url: String,
kernel_sha: String,
dtrfs_url: String,
dtrfs_sha: String,
price: u64,
}
impl From<snp_proto::NewVmReq> for NewVMRequest {
fn from(req: snp_proto::NewVmReq) -> Self {
Self {
vm_id: req.vm_id,
admin_key: req.admin_pubkey,
extra_ports: req.extra_ports.iter().map(|&port| port as u16).collect(),
public_ipv4: req.public_ipv4,
public_ipv6: req.public_ipv6,
disk_size_mib: req.disk_size_mib as usize,
vcpus: req.vcpus as usize,
memory_mib: req.memory_mib as usize,
kernel_url: req.kernel_url,
kernel_sha: req.kernel_sha,
dtrfs_url: req.dtrfs_url,
dtrfs_sha: req.dtrfs_sha,
price: req.price_per_unit,
}
}
}
#[derive(Deserialize, Debug)]
pub struct UpdateVMReq {
pub vm_id: String,
vcpus: usize,
memory_mib: usize,
disk_size_mib: usize,
// we are not using Option<String>, as these will be passed from gRPC
kernel_url: String,
kernel_sha: String,
dtrfs_url: String,
dtrfs_sha: String,
}
impl From<snp_proto::UpdateVmReq> for UpdateVMReq {
fn from(req: snp_proto::UpdateVmReq) -> Self {
Self {
vm_id: req.vm_id,
vcpus: req.vcpus as usize,
memory_mib: req.memory_mib as usize,
disk_size_mib: req.disk_size_mib as usize,
kernel_url: req.kernel_url,
kernel_sha: req.kernel_sha,
dtrfs_url: req.dtrfs_url,
dtrfs_sha: req.dtrfs_sha,
}
}
}
impl VM {
pub fn update(&mut self, req: UpdateVMReq, state: &mut State) -> Result<(), VMCreationErrors> {
if req.vcpus < self.vcpus && req.vcpus != 0 && req.memory_mib != 0 && req.disk_size_mib != 0
{
// Downgrade will be upported only after we implement deviation for VMs.
// (Deviation from the slot size allows management of VMs with unbalanced resources,
// without fully saturating a node. We are disabling downgrades to avoid complexity at
// this stage of the product)
return Err(VMCreationErrors::DowngradeNotSupported);
}
let offer_path = self.storage_dir.clone();
let mut available_vcpus = 0;
let mut available_mem = 0;
let mut available_disk = 0;
for offer in state.config.offers.iter() {
if offer.storage_path != offer_path {
continue;
}
let used_hw_resources = match state.res.reserved_hw.get(&offer_path) {
Some(r) => r,
None => continue,
};
available_vcpus = offer.max_vcpu_reservation.saturating_sub(used_hw_resources.vcpus);
available_mem = offer.max_mem_reservation_mib.saturating_sub(used_hw_resources.memory);
available_disk = offer.max_storage_mib.saturating_sub(used_hw_resources.disk);
}
if req.vcpus > 0 && available_vcpus.saturating_add(self.vcpus) < req.vcpus {
return Err(VMCreationErrors::NotEnoughCPU);
}
if req.memory_mib > 0 && available_mem.saturating_add(self.memory_mib) < req.memory_mib {
return Err(VMCreationErrors::NotEnoughMemory);
}
if req.disk_size_mib > 0 {
if available_disk.saturating_add(self.disk_size_mib) < req.disk_size_mib {
return Err(VMCreationErrors::NotEnoughStorage);
}
if req.disk_size_mib < self.disk_size_mib {
return Err(VMCreationErrors::DiskTooSmall);
}
}
if !req.kernel_sha.is_empty() || !req.dtrfs_sha.is_empty() {
if req.kernel_sha.is_empty() || req.dtrfs_sha.is_empty() {
return Err(VMCreationErrors::BootFileError(
"Kernel and DTRFS can be upgraded only as a bundle".to_string(),
));
}
if let Err(kern_err) =
state.res.find_or_download_file(req.kernel_url, req.kernel_sha.clone())
{
return Err(VMCreationErrors::BootFileError(format!(
"Could not get kernel: {kern_err:?}"
)));
};
if let Err(dtrfs_err) =
state.res.find_or_download_file(req.dtrfs_url, req.dtrfs_sha.clone())
{
return Err(VMCreationErrors::BootFileError(format!(
"Could not get dtrfs: {dtrfs_err:?}"
)));
};
info!(
"Kernel and DTRFS updated for VM: {}, kernel {}, dtrfs: {}",
self.vm_id, req.kernel_sha, req.dtrfs_sha
);
self.kernel_sha = req.kernel_sha;
self.dtrfs_sha = req.dtrfs_sha;
}
// Update the resources
state.res.reserved_hw.entry(offer_path).and_modify(|hw_res| {
hw_res.vcpus = hw_res.vcpus.saturating_add(req.vcpus);
hw_res.vcpus = hw_res.vcpus.saturating_sub(self.vcpus);
hw_res.memory = hw_res.memory.saturating_add(req.memory_mib);
hw_res.memory = hw_res.memory.saturating_sub(self.memory_mib);
hw_res.disk = hw_res.disk.saturating_add(req.disk_size_mib);
hw_res.disk = hw_res.disk.saturating_sub(self.disk_size_mib);
});
let _ = state.res.save_to_disk();
if req.memory_mib != 0 {
self.memory_mib = req.memory_mib;
}
if req.vcpus != 0 {
self.vcpus = req.vcpus;
}
if req.disk_size_mib != 0 {
self.disk_size_mib = req.disk_size_mib;
}
if let Err(e) = systemctl_stop_and_disable(&self.vm_id) {
return Err(VMCreationErrors::HypervizorError(e.to_string()));
}
if let Err(e) = self.write_config() {
return Err(VMCreationErrors::ServerDiskError(e.to_string()));
}
if let Err(e) = self.write_sh_exports() {
return Err(VMCreationErrors::ServerDiskError(e.to_string()));
}
if let Err(e) = self.resize_disk() {
return Err(VMCreationErrors::HypervizorError(e.to_string()));
}
if let Err(e) = systemctl_start_and_enable(&self.vm_id) {
return Err(VMCreationErrors::HypervizorError(e.to_string()));
}
Ok(())
}
pub fn start(&self) -> Result<()> {
self.create_disk()?;
self.write_sh_exports()?;
self.write_systemd_unit_file()?;
systemctl_reload()?;
systemctl_start_and_enable(&self.vm_id)?;
Ok(())
}
pub fn delete(&self, res: &mut Resources) -> Result<()> {
let _ = systemctl_stop_and_disable(&self.vm_id);
let _ = self.delete_systemd_unit_file();
let _ = systemctl_reload();
let _ = self.delete_disk();
let _ = self.delete_sh_exports();
let _ = self.delete_vtap_interfaces();
let _ = self.delete_config();
res.free_vm_resources(&self);
Ok(())
}
// For the MVP, the T-Contract offers VM+IP+Disk as a bundle.
// This means we can enforce the path to the disk.
// This may change in the future as the VM is allowed to have multiple disks.
pub fn disk_path(&self) -> String {
let dir = match self.storage_dir.ends_with("/") {
true => self.storage_dir.clone(),
false => self.storage_dir.clone() + "/",
};
dir + &self.vm_id + ".qcow2"
}
// If you change this here, you also have to change it in the CLI.
// The kernel params must match on both daemon and CLI to build the measurement.
pub fn kernel_params(&self) -> String {
let mut ip_string = String::new();
let mut i = 0;
if self.fw_ports.len() > 0 {
ip_string += &format!("detee_net_eth{}={}_{}_{} ", i, "10.0.2.15", "24", "10.0.2.2");
i += 1;
}
for nic in self.nics.iter() {
for ip in nic.ips.iter() {
ip_string +=
&format!("detee_net_eth{}={}_{}_{} ", i, ip.address, ip.mask, ip.gateway);
}
i += 1;
}
let admin_key = format!("detee_admin={} ", self.admin_key);
let vm_id = format!("detee_vm_id={}", self.vm_id);
format!("{}{}{}", ip_string, admin_key, vm_id)
}
fn write_config(&self) -> Result<()> {
let mut file = File::create(VM_CONFIG_DIR.to_string() + &self.vm_id + ".yaml")?;
file.write_all(serde_yaml::to_string(self)?.as_bytes())?;
Ok(())
}
fn delete_config(&self) -> Result<()> {
remove_file(VM_CONFIG_DIR.to_string() + &self.vm_id + ".yaml")?;
Ok(())
}
fn write_sh_exports(&self) -> Result<()> {
let mut vars = String::new();
let mut i = 0;
for nic in self.nics.iter() {
let mut interface = String::new();
interface += &format!(r#"export NETWORK_INTERFACE_{}="{}"#, i, nic.if_config.if_type());
// device is currently ignored in case of NAT cause we assume QEMU userspace NAT
if let Some(vtap_name) = nic.if_config.vtap_name() {
interface += &format!("_{}_{}", nic.if_config.device_name(), vtap_name);
}
interface += r#"""#;
vars += &format!(r#"{}"#, interface);
vars += "\n";
i += 1;
}
let mut ports = String::new();
for port in self.fw_ports.iter() {
ports += &format!("{}:{} ", port.0, port.1);
}
if ports != "" {
vars += &format!(r#"export NAT_PORT_FW="{}""#, ports.trim_end());
vars += "\n";
vars += "export NETWORK_INTERFACE_0000=NAT\n";
}
vars += &format!(r#"export KERNEL="{}""#, VM_BOOT_DIR.to_string() + &self.kernel_sha);
vars += "\n";
vars += &format!(r#"export INITRD="{}""#, VM_BOOT_DIR.to_string() + &self.dtrfs_sha);
vars += "\n";
vars += &format!(r#"export PARAMS="{}""#, self.kernel_params());
vars += "\n";
vars += &format!(r#"export CPU_TYPE="{}""#, QEMU_VM_CPU_TYPE);
vars += "\n";
vars += &format!(r#"export VCPUS="{}""#, self.vcpus);
vars += "\n";
vars += &format!(r#"export MEMORY="{}M""#, (self.memory_mib / 2 * 2));
vars += "\n";
vars += &format!(r#"export MAX_MEMORY="{}M""#, (self.memory_mib / 2 * 2) + 256);
vars += "\n";
vars += &format!(r#"export DISK="{}""#, self.disk_path());
vars += "\n";
let mut file = File::create(VM_CONFIG_DIR.to_string() + &self.vm_id + ".sh")?;
file.write_all(vars.as_bytes())?;
Ok(())
}
fn delete_sh_exports(&self) -> Result<()> {
remove_file(VM_CONFIG_DIR.to_string() + &self.vm_id + ".sh")?;
Ok(())
}
fn delete_vtap_interfaces(&self) -> Result<()> {
for nic in self.nics.iter() {
if let Some(name) = nic.if_config.vtap_name() {
let result = Command::new("ip").arg("link").arg("del").arg(&name).output()?;
if !result.status.success() {
return Err(anyhow!(
"Could not delete vtap interface {:?}:\n{:?}\n{:?}",
name,
result.stdout,
result.stderr
));
}
}
}
Ok(())
}
fn write_systemd_unit_file(&self) -> Result<()> {
let mut contents = String::new();
contents += &format!("[Unit]\n");
contents += &format!("Description=DeTEE {}\n", self.vm_id);
contents += &format!("After=network.target\n");
contents += &format!("\n");
contents += &format!("[Service]\n");
contents += &format!("Type=simple\n");
contents += &format!("Environment=VM_UUID={}\n", self.vm_id);
contents += &format!("ExecStart={}\n", START_VM_SCRIPT);
contents += &format!("ExecStop=/bin/kill -s SIGINT $MAINPID\n");
contents += &format!("Restart=always\n");
contents += &format!("\n");
contents += &format!("[Install]\n");
contents += &format!("WantedBy=multi-user.target\n");
let mut file =
File::create_new("/etc/systemd/system/".to_string() + &self.vm_id + ".service")?;
file.write_all(contents.as_bytes())?;
Ok(())
}
fn delete_systemd_unit_file(&self) -> Result<()> {
remove_file("/etc/systemd/system/".to_string() + &self.vm_id + ".service")?;
Ok(())
}
fn create_disk(&self) -> Result<()> {
if std::path::Path::new(&self.disk_path()).exists() {
return Err(anyhow!("Could not create {}. The file already exists.", self.disk_path()));
}
let result = Command::new("qemu-img")
.arg("create")
.arg("-f")
.arg("qcow2")
.arg(self.disk_path())
.arg(self.disk_size_mib.to_string() + "M")
.output()?;
if !result.status.success() {
return Err(anyhow!(
"Could not create VM Disk:\n{:?}\n{:?}",
String::from_utf8(result.stdout)
.unwrap_or("Could not grab stdout from creation script.".to_string()),
String::from_utf8(result.stderr)
.unwrap_or("Could not grab stderr from creation script.".to_string()),
));
}
Ok(())
}
fn resize_disk(&self) -> Result<()> {
let result = Command::new("qemu-img")
.arg("resize")
.arg(self.disk_path())
.arg(self.disk_size_mib.to_string() + "M")
.output()?;
if !result.status.success() {
return Err(anyhow!(
"Could not create VM Disk:\n{:?}\n{:?}",
String::from_utf8(result.stdout)
.unwrap_or("Could not grab stdout from creation script.".to_string()),
String::from_utf8(result.stderr)
.unwrap_or("Could not grab stderr from creation script.".to_string()),
));
}
Ok(())
}
fn delete_disk(&self) -> Result<()> {
remove_file(self.disk_path())?;
Ok(())
}
}
fn systemctl_start_and_enable(vm_id: &str) -> Result<()> {
let result =
Command::new("systemctl").arg("start").arg(vm_id.to_string() + ".service").output()?;
if !result.status.success() {
return Err(anyhow!(
"Could not reload systemctl daemon:\n{:?}\n{:?}",
String::from_utf8(result.stdout)
.unwrap_or("Could not grab stdout from creation script.".to_string()),
String::from_utf8(result.stderr)
.unwrap_or("Could not grab stderr from creation script.".to_string()),
));
}
let result =
Command::new("systemctl").arg("enable").arg(vm_id.to_string() + ".service").output()?;
if !result.status.success() {
return Err(anyhow!(
"Could not reload systemctl daemon:\n{:?}\n{:?}",
String::from_utf8(result.stdout)
.unwrap_or("Could not grab stdout from creation script.".to_string()),
String::from_utf8(result.stderr)
.unwrap_or("Could not grab stderr from creation script.".to_string()),
));
}
Ok(())
}
fn systemctl_stop_and_disable(vm_id: &str) -> Result<()> {
let result =
Command::new("systemctl").arg("stop").arg(vm_id.to_string() + ".service").output()?;
if !result.status.success() {
return Err(anyhow!(
"Could not reload systemctl daemon:\n{:?}\n{:?}",
String::from_utf8(result.stdout)
.unwrap_or("Could not grab stdout from creation script.".to_string()),
String::from_utf8(result.stderr)
.unwrap_or("Could not grab stderr from creation script.".to_string()),
));
}
let result =
Command::new("systemctl").arg("disable").arg(vm_id.to_string() + ".service").output()?;
if !result.status.success() {
return Err(anyhow!(
"Could not reload systemctl daemon:\n{:?}\n{:?}",
String::from_utf8(result.stdout)
.unwrap_or("Could not grab stdout from creation script.".to_string()),
String::from_utf8(result.stderr)
.unwrap_or("Could not grab stderr from creation script.".to_string()),
));
}
Ok(())
}
fn systemctl_reload() -> Result<()> {
let result = Command::new("systemctl").arg("daemon-reload").output()?;
if !result.status.success() {
return Err(anyhow!(
"Could not reload systemctl daemon:\n{:?}\n{:?}",
String::from_utf8(result.stdout)
.unwrap_or("Could not grab stdout from creation script.".to_string()),
String::from_utf8(result.stderr)
.unwrap_or("Could not grab stderr from creation script.".to_string()),
));
}
Ok(())
}
fn download_and_check_sha(url: &str, sha: &str) -> Result<()> {
use reqwest::blocking::get;
use std::{fs::File, io::copy, path::Path};
let save_path = VM_BOOT_DIR.to_string() + sha;
let response = get(url)?;
if !response.status().is_success() {
return Err(anyhow!("Failed to download file: {}", response.status()));
}
let mut file = File::create(Path::new(&save_path))?;
copy(&mut response.bytes()?.as_ref(), &mut file)?;
match crate::global::compute_sha256(&save_path) {
Ok(hash) => {
if hash != sha {
return Err(anyhow!(
"Sha of the downloaded file did not match supplised sha: {} vs {}",
hash,
sha
));
}
}
Err(e) => return Err(anyhow!("Error computing hash for {:?}: {}", save_path, e)),
}
Ok(())
}
fn within_10_percent(a: usize, b: usize) -> bool {
let diff = a.abs_diff(b); // u32
let reference = a.max(b); // the larger of the two
diff * 10 <= reference
}