diff --git a/Cargo.lock b/Cargo.lock index 557fbe0..3294435 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -396,7 +396,7 @@ dependencies = [ [[package]] name = "detee-shared" version = "0.1.0" -source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=credits-v2#f344c171c5a8d7ae8cad1628396e6b3a1af0f1ba" +source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=offers#4753a17fa29393b3f99b6dfcdcec48d935e6ebd9" dependencies = [ "bincode", "prost", diff --git a/Cargo.toml b/Cargo.toml index 3360d27..9c55019 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ bs58 = "0.5.1" chrono = "0.4.39" # TODO: switch this back to main after the upgrade -detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "credits-v2" } +detee-shared = { git = "ssh://git@gitea.detee.cloud/testnet/proto", branch = "offers" } # detee-shared = { path = "../detee-shared" } [build-dependencies] diff --git a/config_sample.yaml b/config_sample.yaml new file mode 100644 index 0000000..1b9f901 --- /dev/null +++ b/config_sample.yaml @@ -0,0 +1,34 @@ +owner_wallet: "x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK" +network: "staging" +network_interfaces: + - driver: "MACVTAP" + device: "eno8303" + ipv4_ranges: + - first_ip: "173.234.136.154" + last_ip: "173.234.136.155" + netmask: "27" + gateway: "173.234.136.158" + - first_ip: "173.234.137.17" + last_ip: "173.234.137.17" + netmask: "27" + gateway: "173.234.137.30" + ipv6_ranges: + - first_ip: "2a0d:3003:b666:a00c:0002:0000:0000:0011" + last_ip: "2a0d:3003:b666:a00c:0002:0000:0000:fffc" + netmask: "64" + gateway: "2a0d:3003:b666:a00c::1" +offers: + - storage_path: "/opt/detee_vms/" + max_storage_mib: 819200 + max_vcpu_reservation: 20 + max_mem_reservation_mib: 20480 + price: 725 + - storage_path: "/opt/detee_vms2/" + max_storage_mib: 855040 + max_vcpu_reservation: 12 + max_mem_reservation_mib: 12288 + price: 775 +public_port_range: + start: 30000 + end: 50000 +max_ports_per_vm: 5 diff --git a/src/config.rs b/src/config.rs index f7e1a9e..8485856 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,12 +7,6 @@ use std::{ ops::Range, }; -#[derive(Deserialize, Debug, Clone)] -pub struct Volume { - pub path: String, - pub max_reservation_mib: usize, -} - #[derive(Deserialize, Debug)] pub struct IPv4Range { pub first_ip: Ipv4Addr, @@ -44,19 +38,25 @@ pub enum InterfaceType { Bridge, } +#[derive(Deserialize, Debug)] +pub struct Offer { + // price per unit per minute + pub price: u64, + pub max_vcpu_reservation: usize, + pub max_mem_reservation_mib: usize, + pub storage_path: String, + pub max_storage_mib: usize, +} + #[derive(Deserialize, Debug)] pub struct Config { pub owner_wallet: String, pub network: String, - pub max_vcpu_reservation: usize, - pub max_mem_reservation_mib: usize, pub network_interfaces: Vec, - pub volumes: Vec, + pub offers: Vec, #[serde(with = "range_format")] pub public_port_range: Range, pub max_ports_per_vm: u16, - // price per unit per minute - pub price: u64, } mod range_format { @@ -82,6 +82,13 @@ impl Config { pub fn load_from_disk(path: &str) -> Result { let content = std::fs::read_to_string(path)?; let config: Config = serde_yaml::from_str(&content)?; + let offers_path_set: std::collections::HashSet = + config.offers.iter().map(|offer| offer.storage_path.clone()).collect(); + if offers_path_set.len() != config.offers.len() { + return Err(anyhow::anyhow!( + "Each offer must have a unique folder for saving VM disk files." + )); + } for nic in &config.network_interfaces { for range in &nic.ipv4_ranges { let ipv4_netmask = range.netmask.parse::()?; diff --git a/src/grpc.rs b/src/grpc.rs index fe0a1d5..208f767 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -50,7 +50,6 @@ pub async fn register_node(config: &crate::config::Config) -> Result, sender: Sender, - config: Config, - res: state::Resources, + state: State, } #[allow(dead_code)] @@ -39,116 +39,21 @@ impl VMHandler { std::process::exit(1); } }; - Self { receiver, sender, config, res } - } - - fn get_available_ips(&self) -> (u32, u32) { - let mut avail_ipv4 = 0; - let mut avail_ipv6 = 0; - for nic in self.config.network_interfaces.iter() { - for range in nic.ipv4_ranges.iter() { - avail_ipv4 += (range.last_ip.to_bits() + 1) - range.first_ip.to_bits(); - } - for range in nic.ipv6_ranges.iter() { - avail_ipv6 += (range.last_ip.to_bits() + 1) - range.first_ip.to_bits(); - } - } - ( - avail_ipv4.saturating_sub(self.res.reserved_ipv4.len() as u32), - avail_ipv6.saturating_sub(self.res.reserved_ipv6.len() as u128) as u32, - ) - } - - // returns storage available per VM and total storage available - fn storage_available(&self) -> (usize, usize) { - let mut total_storage_available = 0_usize; - let mut avail_storage_mib = 0_usize; - for volume in self.config.volumes.iter() { - let reservation: usize = match self.res.reserved_storage.get(&volume.path) { - Some(reserved) => *reserved, - None => 0 as usize, - }; - let volume_mib_available = volume.max_reservation_mib.saturating_sub(reservation); - total_storage_available += volume_mib_available; - if avail_storage_mib < volume_mib_available { - avail_storage_mib = volume_mib_available; - } - } - - (avail_storage_mib, total_storage_available) - } - - /// returns Memory per vCPU and Disk per vCPU ratio - fn slot_ratios(&self) -> (usize, usize) { - let (_, total_storage_mib) = self.storage_available(); - let available_cpus: usize = - self.config.max_vcpu_reservation.saturating_sub(self.res.reserved_vcpus); - let available_mem: usize = - self.config.max_mem_reservation_mib.saturating_sub(self.res.reserved_memory_mib); - - let memory_per_cpu = available_mem / available_cpus; - let disk_per_cpu = total_storage_mib / available_cpus; - - (memory_per_cpu, disk_per_cpu) + let state = state::State { + config, + res, + }; + Self { receiver, sender, state} } async fn send_node_resources(&mut self) { - let (avail_ipv4, avail_ipv6) = self.get_available_ips(); - - let (avail_storage_mib, total_storage_available) = self.storage_available(); - let avail_vcpus = self.config.max_vcpu_reservation.saturating_sub(self.res.reserved_vcpus); - let avail_memory_mib = - self.config.max_mem_reservation_mib.saturating_sub(self.res.reserved_memory_mib); - - // If storage is separated into multiple volumes, that limits the maxium VM size. - // Due to this, we have to limit the maximum amount of vCPUs and Memory per VM, based on - // the maximum possible disk size per VM. - let avail_vcpus = avail_vcpus * avail_storage_mib / total_storage_available; - let avail_memory_mib = avail_memory_mib * avail_storage_mib / total_storage_available; - - let res = snp_proto::VmNodeResources { - node_pubkey: PUBLIC_KEY.clone(), - avail_ports: (self.config.public_port_range.len() - self.res.reserved_ports.len()) - as u32, - avail_ipv4, - avail_ipv6, - avail_vcpus: avail_vcpus as u32, - avail_memory_mib: avail_memory_mib as u32, - avail_storage_mib: avail_storage_mib as u32, - max_ports_per_vm: self.config.max_ports_per_vm as u32, - }; - debug!("sending node resources on brain: {res:?}"); - let _ = self.sender.send(res.into()).await; + let _ = self.sender.send(self.state.available_resources().into()).await; } async fn handle_new_vm_req(&mut self, new_vm_req: snp_proto::NewVmReq) { - // Currently the daemon allows a deviation of 10% for newly created VMs, so that the - // process doesn't get interrupted by small bugs caused by human error in coding. - // Over time, we will probably allow deviation based on server utilization, however that - // needs to be implmeneted for both VM creation and VM upgrade. - let (memory_per_cpu, disk_per_cpu) = self.slot_ratios(); - let vm_memory_per_cpu = new_vm_req.memory_mib / new_vm_req.vcpus; - let vm_disk_per_cpu = new_vm_req.disk_size_mib / new_vm_req.vcpus; - if !within_10_percent(memory_per_cpu, vm_memory_per_cpu as usize) - || !within_10_percent(disk_per_cpu, vm_disk_per_cpu as usize) - { - warn!("Refusing to create vm due to unbalanced resources: {new_vm_req:?}"); - let _ = self - .sender - .send( - snp_proto::NewVmResp { - uuid: new_vm_req.uuid, - error: format!("Unbalanced hardware resources."), - ..Default::default() - } - .into(), - ) - .await; - return; - }; debug!("Processing new vm request: {new_vm_req:?}"); let uuid = new_vm_req.uuid.clone(); - match state::VM::new(new_vm_req.into(), &self.config, &mut self.res) { + match self.state.new_vm(new_vm_req.into()) { Ok(vm) => match vm.start() { Ok(_) => { info!("Succesfully started VM {uuid}"); @@ -204,7 +109,7 @@ impl VMHandler { let vm_id = update_vm_req.uuid.clone(); let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; let mut vm: state::VM = serde_yaml::from_str(&content)?; - match vm.update(update_vm_req.into(), &self.config, &mut self.res) { + match vm.update(update_vm_req.into(), &mut self.state) { Ok(_) => { info!("Succesfully updated VM {vm_id}"); let vm: snp_proto::UpdateVmResp = vm.into(); @@ -233,7 +138,7 @@ impl VMHandler { let vm_id = delete_vm_req.uuid; let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; let vm: state::VM = serde_yaml::from_str(&content)?; - vm.delete(&mut self.res)?; + vm.delete(&mut self.state.res)?; Ok(()) } @@ -281,7 +186,7 @@ impl VMHandler { continue; } }; - match vm.delete(&mut self.res) { + match vm.delete(&mut self.state.res) { Ok(()) => info!("Successfully deleted VM {uuid}"), Err(e) => log::error!("Deletion failed for VM {uuid}: {e:?}"), } @@ -305,11 +210,11 @@ async fn main() { let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6); let mut vm_handler = VMHandler::new(brain_msg_rx, daemon_msg_tx.clone()); - let network = vm_handler.config.network.clone(); - let contracts: Vec = vm_handler.res.existing_vms.clone().into_iter().collect(); + let network = vm_handler.state.config.network.clone(); + let contracts: Vec = vm_handler.state.res.existing_vms.clone().into_iter().collect(); info!("Registering with the brain and getting back deleted VMs."); - match grpc::register_node(&vm_handler.config).await { + match grpc::register_node(&vm_handler.state.config).await { Ok(deleted_vms) => vm_handler.clear_deleted_contracts(deleted_vms), Err(e) => log::error!("Could not get contracts from brain: {e:?}"), }; @@ -356,9 +261,3 @@ fn download_and_replace_binary() -> Result<()> { } Ok(()) } - -fn within_10_percent(a: usize, b: usize) -> bool { - let diff = a.abs_diff(b); // u32 - let reference = a.max(b); // the larger of the two - diff * 10 <= reference -} diff --git a/src/state.rs b/src/state.rs index 5aff3be..f35613b 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,10 +1,8 @@ -#![allow(dead_code)] - // SPDX-License-Identifier: Apache-2.0 use crate::{config::Config, global::*, grpc::snp_proto}; use anyhow::{anyhow, Result}; -use log::info; +use log::{debug, info}; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, @@ -14,14 +12,291 @@ use std::{ process::Command, }; +#[derive(Debug)] +pub enum VMCreationErrors { + PriceIsTooLow, + VMAlreadyExists(VM), + NATandIPv4Conflict, + ServerFull, + NotEnoughPorts, + NotEnoughCPU, + NotEnoughMemory, + NotEnoughStorage, + IPv4NotAvailable, + IPv6NotAvailable, + MemoryTooLow, + DiskTooSmall, + DowngradeNotSupported, + UnbalancedVM, + ServerDiskError(String), + BootFileError(String), + HypervizorError(String), +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct HwReservation { + vcpus: usize, + memory: usize, + disk: usize, +} + +pub struct State { + pub config: Config, + pub res: Resources, +} + +impl State { + fn available_ports(&mut self, extra_ports: usize) -> Vec { + use rand::Rng; + let total_ports = extra_ports + 1; + if self.config.public_port_range.len() + < self.res.reserved_ports.len() + total_ports as usize + { + return Vec::new(); + } + if total_ports > self.config.max_ports_per_vm as usize { + return Vec::new(); + } + let mut published_ports = Vec::new(); + for _ in 0..total_ports { + for _ in 0..5 { + let port = rand::thread_rng().gen_range(self.config.public_port_range.clone()); + if self.res.reserved_ports.get(&port).is_none() { + published_ports.push(port); + } + break; + } + } + published_ports.sort(); + published_ports + } + + fn nr_of_available_ips(&self) -> (u32, u32) { + let mut avail_ipv4 = 0; + let mut avail_ipv6 = 0; + for nic in self.config.network_interfaces.iter() { + for range in nic.ipv4_ranges.iter() { + avail_ipv4 += (range.last_ip.to_bits() + 1) - range.first_ip.to_bits(); + } + for range in nic.ipv6_ranges.iter() { + avail_ipv6 += (range.last_ip.to_bits() + 1) - range.first_ip.to_bits(); + } + } + ( + avail_ipv4.saturating_sub(self.res.reserved_ipv4.len() as u32), + avail_ipv6.saturating_sub(self.res.reserved_ipv6.len() as u128) as u32, + ) + } + + pub fn available_resources(&self) -> snp_proto::VmNodeResources { + let (avail_ipv4, avail_ipv6) = self.nr_of_available_ips(); + let mut offer_resources: Vec = Vec::new(); + + for config_offer in self.config.offers.iter() { + let offer = match self.res.reserved_hw.get(&config_offer.storage_path) { + Some(used_resources) => snp_proto::VmNodeOffer { + price: config_offer.price, + vcpus: config_offer.max_vcpu_reservation.saturating_sub(used_resources.vcpus) + as u64, + memory_mib: config_offer + .max_mem_reservation_mib + .saturating_sub(used_resources.memory) + as u64, + disk_mib: config_offer.max_storage_mib.saturating_sub(used_resources.disk) + as u64, + }, + None => snp_proto::VmNodeOffer { + price: config_offer.price, + vcpus: config_offer.max_vcpu_reservation as u64, + memory_mib: config_offer.max_mem_reservation_mib as u64, + disk_mib: config_offer.max_storage_mib as u64, + }, + }; + offer_resources.push(offer); + } + + let res = snp_proto::VmNodeResources { + node_pubkey: PUBLIC_KEY.clone(), + avail_ports: (self.config.public_port_range.len() - self.res.reserved_ports.len()) + as u32, + avail_ipv4, + avail_ipv6, + offers: offer_resources, + max_ports_per_vm: self.config.max_ports_per_vm as u32, + }; + debug!("Node resources are currently: {res:?}"); + res + } + + pub fn get_offer_path(&self, req: &NewVMRequest) -> Result { + let vm_mem_per_cpu = req.memory_mib / req.vcpus; + let vm_disk_per_cpu = req.disk_size_mib / req.vcpus; + + let mut offer_path = String::new(); + let mut error = VMCreationErrors::UnbalancedVM; + + for offer in self.config.offers.iter() { + let available_resources = match self.res.reserved_hw.get(&offer.storage_path) { + Some(used_resources) => HwReservation { + vcpus: offer.max_vcpu_reservation.saturating_sub(used_resources.vcpus), + memory: offer.max_mem_reservation_mib.saturating_sub(used_resources.memory), + disk: offer.max_storage_mib.saturating_sub(used_resources.disk), + }, + None => HwReservation { + vcpus: offer.max_vcpu_reservation, + memory: offer.max_mem_reservation_mib, + disk: offer.max_storage_mib, + }, + }; + let mem_per_cpu = available_resources.memory / available_resources.vcpus; + let disk_per_cpu = available_resources.disk / available_resources.vcpus; + + if !within_10_percent(vm_mem_per_cpu, mem_per_cpu) + || !within_10_percent(vm_disk_per_cpu, disk_per_cpu) + { + continue; + } + + if req.vcpus > available_resources.vcpus { + error = VMCreationErrors::NotEnoughCPU; + continue; + } + + if req.memory_mib > available_resources.memory + 50 { + error = VMCreationErrors::NotEnoughMemory; + continue; + } + + if req.disk_size_mib > available_resources.disk + 100 { + error = VMCreationErrors::NotEnoughStorage; + continue; + } + + if req.price < offer.price { + error = VMCreationErrors::PriceIsTooLow; + continue; + } + + offer_path = offer.storage_path.clone(); + } + + if offer_path.is_empty() { + return Err(error); + } + + Ok(offer_path) + } + + pub fn new_vm(&mut self, req: NewVMRequest) -> Result { + let offer_path = self.get_offer_path(&req)?; + + if self.res.existing_vms.contains(&req.uuid) { + let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &req.uuid + ".yaml") + .map_err(|e| VMCreationErrors::ServerDiskError(e.to_string()))?; + let vm: crate::state::VM = serde_yaml::from_str(&content) + .map_err(|e| VMCreationErrors::ServerDiskError(e.to_string()))?; + return Err(VMCreationErrors::VMAlreadyExists(vm)); + } + if req.extra_ports.len() > 0 && req.public_ipv4 { + return Err(VMCreationErrors::NATandIPv4Conflict); + } + if req.memory_mib < 800 { + return Err(VMCreationErrors::MemoryTooLow); + } + if req.disk_size_mib < 4000 { + return Err(VMCreationErrors::DiskTooSmall); + } + + if let Err(ovmf_err) = + self.res.find_or_download_file(OVMF_URL.to_string(), OVMF_HASH.to_string()) + { + return Err(VMCreationErrors::BootFileError(format!( + "Could not get OVMF: {ovmf_err:?}" + ))); + }; + if let Err(kernel_err) = + self.res.find_or_download_file(req.kernel_url, req.kernel_sha.clone()) + { + return Err(VMCreationErrors::BootFileError(format!( + "Could not get kernel: {kernel_err:?}" + ))); + }; + if let Err(dtrfs_err) = self.res.find_or_download_file(req.dtrfs_url, req.dtrfs_sha.clone()) + { + return Err(VMCreationErrors::BootFileError(format!( + "Could not get dtrfs: {dtrfs_err:?}" + ))); + }; + + let mut vm_nics = Vec::new(); + if req.public_ipv4 { + match self.res.available_ipv4(&self.config.network_interfaces) { + Some(vmnic) => vm_nics.push(vmnic), + None => return Err(VMCreationErrors::IPv4NotAvailable), + } + } + if req.public_ipv6 { + match self.res.available_ipv6(&self.config.network_interfaces) { + Some(mut vmnic) => { + if let Some(mut existing_vmnic) = vm_nics.pop() { + if vmnic.if_config.device_name() == existing_vmnic.if_config.device_name() { + vmnic.ips.append(&mut existing_vmnic.ips); + vm_nics.push(vmnic); + } else { + vm_nics.push(existing_vmnic); + vm_nics.push(vmnic); + } + } else { + vm_nics.push(vmnic); + } + } + None => { + return Err(VMCreationErrors::IPv6NotAvailable); + } + } + } + + let mut host_ports: Vec = Vec::new(); + let mut port_pairs: Vec<(u16, u16)> = Vec::new(); + if !req.public_ipv4 { + host_ports.append(self.available_ports(req.extra_ports.len()).as_mut()); + if host_ports.len() == 0 { + return Err(VMCreationErrors::NotEnoughPorts); + } + port_pairs.push((host_ports[0], 22)); + for i in 0..req.extra_ports.len() { + port_pairs.push((host_ports[i + 1], req.extra_ports[i])); + } + } + + let vm = VM { + uuid: req.uuid, + admin_key: req.admin_key, + nics: vm_nics, + vcpus: req.vcpus, + memory_mib: req.memory_mib, + disk_size_mib: req.disk_size_mib, + kernel_sha: req.kernel_sha, + dtrfs_sha: req.dtrfs_sha, + fw_ports: port_pairs, + storage_dir: offer_path, + }; + + if let Err(e) = vm.write_config() { + return Err(VMCreationErrors::ServerDiskError(e.to_string())); + } + self.res.reserve_vm_resources(&vm); + Ok(vm) + } +} + #[derive(Serialize, Deserialize, Debug, Default)] pub struct Resources { pub existing_vms: HashSet, // QEMU does not support MHz limiation - pub reserved_vcpus: usize, - pub reserved_memory_mib: usize, pub reserved_ports: HashSet, - pub reserved_storage: HashMap, + // maps storage path to hardware reservation + pub reserved_hw: HashMap, pub reserved_ipv4: HashSet, pub reserved_ipv6: HashSet, pub reserved_if_names: HashSet, @@ -48,17 +323,21 @@ impl Resources { let content = std::fs::read_to_string(path)?; let vm: VM = serde_yaml::from_str(&content)?; res.existing_vms.insert(vm.uuid); - res.reserved_vcpus = res.reserved_vcpus.saturating_add(vm.vcpus); - res.reserved_memory_mib = res.reserved_memory_mib.saturating_add(vm.memory_mib); + res.reserved_hw + .entry(vm.storage_dir) + .and_modify(|utilization| { + utilization.vcpus = utilization.vcpus.saturating_add(vm.vcpus); + utilization.memory = utilization.memory.saturating_add(vm.memory_mib); + utilization.disk = utilization.disk.saturating_add(vm.disk_size_mib); + }) + .or_insert(HwReservation { + vcpus: vm.vcpus, + memory: vm.memory_mib, + disk: vm.disk_size_mib, + }); for (port, _) in vm.fw_ports.iter() { res.reserved_ports.insert(*port); } - res.reserved_storage - .entry(vm.storage_dir.clone()) - .and_modify(|gb| { - *gb = gb.saturating_add(vm.disk_size_mib); - }) - .or_insert(vm.disk_size_mib); for nic in vm.nics { for ip in nic.ips { if let Ok(ip_address) = ip.address.parse::() { @@ -83,71 +362,6 @@ impl Resources { Ok(res) } - pub fn new(config_volumes: &Vec) -> Self { - let mut storage_pools = Vec::new(); - for config_vol in config_volumes.iter() { - storage_pools.push(StoragePool { - path: config_vol.path.clone(), - // TODO: check if the storage is actualy available at that path - available_gb: config_vol.max_reservation_mib, - }); - } - let mut res = Resources { - existing_vms: HashSet::new(), - reserved_vcpus: 0, - reserved_memory_mib: 0, - reserved_ports: HashSet::new(), - reserved_storage: HashMap::new(), - reserved_ipv4: HashSet::new(), - reserved_ipv6: HashSet::new(), - reserved_if_names: HashSet::new(), - boot_files: HashSet::new(), - }; - res.scan_boot_files().unwrap(); - let _ = res.save_to_disk(); - res - } - - fn available_storage_pool(&mut self, required_gb: usize, config: &Config) -> Option { - let mut volumes = config.volumes.clone(); - for volume in volumes.iter_mut() { - if let Some(reservation) = self.reserved_storage.get(&volume.path) { - volume.max_reservation_mib = - volume.max_reservation_mib.saturating_sub(*reservation); - } - } - volumes.sort_by_key(|v| v.max_reservation_mib); - if let Some(biggest_volume) = volumes.last() { - if biggest_volume.max_reservation_mib >= required_gb { - return Some(biggest_volume.path.clone()); - } - } - None - } - - fn available_ports(&mut self, extra_ports: usize, config: &Config) -> Vec { - use rand::Rng; - let total_ports = extra_ports + 1; - if config.public_port_range.len() < self.reserved_ports.len() + total_ports as usize { - return Vec::new(); - } - if total_ports > config.max_ports_per_vm as usize { - return Vec::new(); - } - let mut published_ports = Vec::new(); - for _ in 0..total_ports { - for _ in 0..5 { - let port = rand::thread_rng().gen_range(config.public_port_range.clone()); - if self.reserved_ports.get(&port).is_none() { - published_ports.push(port); - } - break; - } - } - published_ports.sort(); - published_ports - } - fn available_if_name(&mut self) -> String { use rand::{distributions::Alphanumeric, Rng}; loop { @@ -264,8 +478,6 @@ impl Resources { fn reserve_vm_resources(&mut self, vm: &VM) { self.existing_vms.insert(vm.uuid.clone()); - self.reserved_vcpus += vm.vcpus; - self.reserved_memory_mib += vm.memory_mib; for nic in vm.nics.iter() { if let Some(vtap) = nic.if_config.vtap_name() { self.reserved_if_names.insert(vtap); @@ -285,10 +497,18 @@ impl Resources { self.reserved_ports.insert(*host_port); } - self.reserved_storage + self.reserved_hw .entry(vm.storage_dir.clone()) - .and_modify(|gb| *gb = gb.saturating_add(vm.disk_size_mib)) - .or_insert(vm.disk_size_mib); + .and_modify(|hw_res| { + hw_res.vcpus = hw_res.vcpus.saturating_add(vm.vcpus); + hw_res.memory = hw_res.memory.saturating_add(vm.memory_mib); + hw_res.disk = hw_res.disk.saturating_add(vm.disk_size_mib); + }) + .or_insert(HwReservation { + vcpus: vm.vcpus, + memory: vm.memory_mib, + disk: vm.disk_size_mib, + }); let _ = self.save_to_disk(); } @@ -296,8 +516,6 @@ impl Resources { if !self.existing_vms.remove(&vm.uuid) { return; } - self.reserved_vcpus = self.reserved_vcpus.saturating_sub(vm.vcpus); - self.reserved_memory_mib = self.reserved_memory_mib.saturating_sub(vm.memory_mib); for nic in vm.nics.iter() { if let Some(vtap) = nic.if_config.vtap_name() { self.reserved_if_names.remove(&vtap); @@ -316,9 +534,12 @@ impl Resources { for (host_port, _) in vm.fw_ports.iter() { self.reserved_ports.remove(host_port); } - self.reserved_storage - .entry(vm.storage_dir.clone()) - .and_modify(|gb| *gb = gb.saturating_sub(vm.disk_size_mib)); + + self.reserved_hw.entry(vm.storage_dir.clone()).and_modify(|hw_res| { + hw_res.vcpus = hw_res.vcpus.saturating_sub(vm.vcpus); + hw_res.memory = hw_res.memory.saturating_sub(vm.memory_mib); + hw_res.disk = hw_res.disk.saturating_sub(vm.disk_size_mib); + }); if let Err(e) = self.save_to_disk() { log::error!("Could not save resources to disk: {e}"); } @@ -370,12 +591,13 @@ impl InterfaceConfig { } } - fn is_vtap(&self) -> bool { - match self { - InterfaceConfig::IPVTAP { .. } | InterfaceConfig::MACVTAP { .. } => true, - InterfaceConfig::NAT { .. } | InterfaceConfig::Bridge { .. } => false, - } - } + // We will need this when we enable more complex NIC setup. + // fn is_vtap(&self) -> bool { + // match self { + // InterfaceConfig::IPVTAP { .. } | InterfaceConfig::MACVTAP { .. } => true, + // InterfaceConfig::NAT { .. } | InterfaceConfig::Bridge { .. } => false, + // } + // } } #[derive(Serialize, Deserialize, Debug)] @@ -524,143 +746,8 @@ impl From for UpdateVMReq { } } -#[derive(Debug)] -pub enum VMCreationErrors { - PriceIsTooLow, - VMAlreadyExists(VM), - NATandIPv4Conflict, - NotEnoughPorts, - NotEnoughCPU, - NotEnoughMemory, - NotEnoughStorage, - IPv4NotAvailable, - IPv6NotAvailable, - DiskTooSmall, - DowngradeNotSupported, - ServerDiskError(String), - BootFileError(String), - HypervizorError(String), -} - impl VM { - pub fn new( - req: NewVMRequest, - config: &Config, - res: &mut Resources, - ) -> Result { - if req.price < config.price { - return Err(VMCreationErrors::PriceIsTooLow); - } - if res.existing_vms.contains(&req.uuid) { - let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &req.uuid + ".yaml") - .map_err(|e| VMCreationErrors::ServerDiskError(e.to_string()))?; - let vm: crate::state::VM = serde_yaml::from_str(&content) - .map_err(|e| VMCreationErrors::ServerDiskError(e.to_string()))?; - return Err(VMCreationErrors::VMAlreadyExists(vm)); - } - if req.extra_ports.len() > 0 && req.public_ipv4 { - return Err(VMCreationErrors::NATandIPv4Conflict); - } - if config.max_vcpu_reservation < res.reserved_vcpus.saturating_add(req.vcpus) { - return Err(VMCreationErrors::NotEnoughCPU); - } - if config.max_mem_reservation_mib < res.reserved_memory_mib.saturating_add(req.memory_mib) { - return Err(VMCreationErrors::NotEnoughMemory); - } - if req.disk_size_mib < 4 { - return Err(VMCreationErrors::DiskTooSmall); - } - - if let Err(ovmf_err) = - res.find_or_download_file(OVMF_URL.to_string(), OVMF_HASH.to_string()) - { - return Err(VMCreationErrors::BootFileError(format!( - "Could not get OVMF: {ovmf_err:?}" - ))); - }; - if let Err(kernel_err) = res.find_or_download_file(req.kernel_url, req.kernel_sha.clone()) { - return Err(VMCreationErrors::BootFileError(format!( - "Could not get kernel: {kernel_err:?}" - ))); - }; - if let Err(dtrfs_err) = res.find_or_download_file(req.dtrfs_url, req.dtrfs_sha.clone()) { - return Err(VMCreationErrors::BootFileError(format!( - "Could not get dtrfs: {dtrfs_err:?}" - ))); - }; - - let mut vm_nics = Vec::new(); - if req.public_ipv4 { - match res.available_ipv4(&config.network_interfaces) { - Some(vmnic) => vm_nics.push(vmnic), - None => return Err(VMCreationErrors::IPv4NotAvailable), - } - } - if req.public_ipv6 { - match res.available_ipv6(&config.network_interfaces) { - Some(mut vmnic) => { - if let Some(mut existing_vmnic) = vm_nics.pop() { - if vmnic.if_config.device_name() == existing_vmnic.if_config.device_name() { - vmnic.ips.append(&mut existing_vmnic.ips); - vm_nics.push(vmnic); - } else { - vm_nics.push(existing_vmnic); - vm_nics.push(vmnic); - } - } else { - vm_nics.push(vmnic); - } - } - None => { - return Err(VMCreationErrors::IPv6NotAvailable); - } - } - } - - let mut host_ports: Vec = Vec::new(); - let mut port_pairs: Vec<(u16, u16)> = Vec::new(); - if !req.public_ipv4 { - host_ports.append(res.available_ports(req.extra_ports.len(), &config).as_mut()); - if host_ports.len() == 0 { - return Err(VMCreationErrors::NotEnoughPorts); - } - port_pairs.push((host_ports[0], 22)); - for i in 0..req.extra_ports.len() { - port_pairs.push((host_ports[i + 1], req.extra_ports[i])); - } - } - - let storage_pool_path = match res.available_storage_pool(req.disk_size_mib, config) { - Some(path) => path, - None => return Err(VMCreationErrors::NotEnoughStorage), - }; - - let vm = VM { - uuid: req.uuid, - admin_key: req.admin_key, - nics: vm_nics, - vcpus: req.vcpus, - memory_mib: req.memory_mib, - disk_size_mib: req.disk_size_mib, - kernel_sha: req.kernel_sha, - dtrfs_sha: req.dtrfs_sha, - fw_ports: port_pairs, - storage_dir: storage_pool_path, - }; - - if let Err(e) = vm.write_config() { - return Err(VMCreationErrors::ServerDiskError(e.to_string())); - } - res.reserve_vm_resources(&vm); - Ok(vm) - } - - pub fn update( - &mut self, - req: UpdateVMReq, - config: &Config, - res: &mut Resources, - ) -> Result<(), VMCreationErrors> { + pub fn update(&mut self, req: UpdateVMReq, state: &mut State) -> Result<(), VMCreationErrors> { if req.vcpus < self.vcpus && req.vcpus != 0 && req.memory_mib != 0 && req.disk_size_mib != 0 { // Downgrade will be upported only after we implement deviation for VMs. @@ -669,23 +756,37 @@ impl VM { // this stage of the product) return Err(VMCreationErrors::DowngradeNotSupported); } - if req.vcpus > 0 - && config.max_vcpu_reservation - < res.reserved_vcpus.saturating_sub(self.vcpus).saturating_add(req.vcpus) - { + let offer_path = self.storage_dir.clone(); + let mut available_vcpus = 0; + let mut available_mem = 0; + let mut available_disk = 0; + + for offer in state.config.offers.iter() { + if offer.storage_path != offer_path { + continue; + } + let used_hw_resources = match state.res.reserved_hw.get(&offer_path) { + Some(r) => r, + None => continue, + }; + available_vcpus = offer.max_vcpu_reservation.saturating_sub(used_hw_resources.vcpus); + available_mem = offer.max_mem_reservation_mib.saturating_sub(used_hw_resources.memory); + available_disk = offer.max_storage_mib.saturating_sub(used_hw_resources.disk); + } + + if req.vcpus > 0 && available_vcpus.saturating_add(self.vcpus) < req.vcpus { return Err(VMCreationErrors::NotEnoughCPU); } - if req.memory_mib > 0 - && config.max_mem_reservation_mib - < res - .reserved_memory_mib - .saturating_sub(self.memory_mib) - .saturating_add(req.memory_mib) - { + if req.memory_mib > 0 && available_mem.saturating_add(self.memory_mib) < req.memory_mib { return Err(VMCreationErrors::NotEnoughMemory); } - if req.disk_size_mib > 0 && req.disk_size_mib < self.disk_size_mib { - return Err(VMCreationErrors::DiskTooSmall); + if req.disk_size_mib > 0 { + if available_disk.saturating_add(self.disk_size_mib) < req.disk_size_mib { + return Err(VMCreationErrors::NotEnoughStorage); + } + if req.disk_size_mib < self.disk_size_mib { + return Err(VMCreationErrors::DiskTooSmall); + } } if !req.kernel_sha.is_empty() || !req.dtrfs_sha.is_empty() { @@ -695,13 +796,15 @@ impl VM { )); } - if let Err(kern_err) = res.find_or_download_file(req.kernel_url, req.kernel_sha.clone()) + if let Err(kern_err) = + state.res.find_or_download_file(req.kernel_url, req.kernel_sha.clone()) { return Err(VMCreationErrors::BootFileError(format!( "Could not get kernel: {kern_err:?}" ))); }; - if let Err(dtrfs_err) = res.find_or_download_file(req.dtrfs_url, req.dtrfs_sha.clone()) + if let Err(dtrfs_err) = + state.res.find_or_download_file(req.dtrfs_url, req.dtrfs_sha.clone()) { return Err(VMCreationErrors::BootFileError(format!( "Could not get dtrfs: {dtrfs_err:?}" @@ -716,15 +819,15 @@ impl VM { } // Update the resources - res.reserved_memory_mib = res.reserved_memory_mib.saturating_add(req.memory_mib); - res.reserved_memory_mib = res.reserved_memory_mib.saturating_sub(self.memory_mib); - res.reserved_vcpus = res.reserved_vcpus.saturating_add(req.vcpus); - res.reserved_vcpus = res.reserved_vcpus.saturating_sub(self.vcpus); - res.reserved_storage.entry(self.storage_dir.clone()).and_modify(|gb| { - *gb = gb.saturating_add(req.disk_size_mib); - *gb = gb.saturating_sub(self.disk_size_mib); + state.res.reserved_hw.entry(offer_path).and_modify(|hw_res| { + hw_res.vcpus = hw_res.vcpus.saturating_add(req.vcpus); + hw_res.vcpus = hw_res.vcpus.saturating_sub(self.vcpus); + hw_res.memory = hw_res.memory.saturating_add(req.memory_mib); + hw_res.memory = hw_res.memory.saturating_sub(self.memory_mib); + hw_res.disk = hw_res.disk.saturating_add(req.disk_size_mib); + hw_res.disk = hw_res.disk.saturating_sub(self.disk_size_mib); }); - let _ = res.save_to_disk(); + let _ = state.res.save_to_disk(); if req.memory_mib != 0 { self.memory_mib = req.memory_mib; @@ -1059,3 +1162,9 @@ fn download_and_check_sha(url: &str, sha: &str) -> Result<()> { } Ok(()) } + +fn within_10_percent(a: usize, b: usize) -> bool { + let diff = a.abs_diff(b); // u32 + let reference = a.max(b); // the larger of the two + diff * 10 <= reference +}