// SPDX-License-Identifier: Apache-2.0 mod config; mod global; mod grpc; mod state; use crate::{config::Config, global::*, grpc::snp_proto}; use anyhow::{anyhow, Result}; use log::{debug, info, warn}; use std::{fs::File, path::Path}; use tokio::{ sync::mpsc::{Receiver, Sender}, time::{sleep, Duration}, }; #[allow(dead_code)] struct VMHandler { receiver: Receiver, sender: Sender, config: Config, res: state::Resources, } #[allow(dead_code)] impl VMHandler { fn new( receiver: Receiver, sender: Sender, ) -> Self { let config = match Config::load_from_disk(DAEMON_CONFIG_PATH) { Ok(config) => config, Err(e) => panic!("Could not load config: {e:?}"), }; let res = match state::Resources::load_from_disk() { Ok(res) => res, Err(e) => { log::error!("Could calculate resources: {e:?}"); std::process::exit(1); } }; Self { receiver, sender, config, res } } fn get_available_ips(&self) -> (u32, u32) { let mut avail_ipv4 = 0; let mut avail_ipv6 = 0; for nic in self.config.network_interfaces.iter() { for range in nic.ipv4_ranges.iter() { avail_ipv4 += (range.last_ip.to_bits() + 1) - range.first_ip.to_bits(); } for range in nic.ipv6_ranges.iter() { avail_ipv6 += (range.last_ip.to_bits() + 1) - range.first_ip.to_bits(); } } ( avail_ipv4.saturating_sub(self.res.reserved_ipv4.len() as u32), avail_ipv6.saturating_sub(self.res.reserved_ipv6.len() as u128) as u32, ) } // returns storage available per VM and total storage available fn storage_available(&self) -> (usize, usize) { let mut total_storage_available = 0_usize; let mut avail_storage_mib = 0_usize; for volume in self.config.volumes.iter() { let reservation: usize = match self.res.reserved_storage.get(&volume.path) { Some(reserved) => *reserved, None => 0 as usize, }; let volume_mib_available = volume.max_reservation_mib.saturating_sub(reservation); total_storage_available += volume_mib_available; if avail_storage_mib < volume_mib_available { avail_storage_mib = volume_mib_available; } } (avail_storage_mib, total_storage_available) } /// returns Memory per vCPU and Disk per vCPU ratio fn slot_ratios(&self) -> (usize, usize) { let (_, total_storage_mib) = self.storage_available(); let available_cpus: usize = self.config.max_vcpu_reservation.saturating_sub(self.res.reserved_vcpus); let available_mem: usize = self.config.max_mem_reservation_mib.saturating_sub(self.res.reserved_memory_mib); let memory_per_cpu = available_mem / available_cpus; let disk_per_cpu = total_storage_mib / available_cpus; (memory_per_cpu, disk_per_cpu) } async fn send_node_resources(&mut self) { let (avail_ipv4, avail_ipv6) = self.get_available_ips(); let (avail_storage_mib, total_storage_available) = self.storage_available(); let avail_vcpus = self.config.max_vcpu_reservation.saturating_sub(self.res.reserved_vcpus); let avail_memory_mib = self.config.max_mem_reservation_mib.saturating_sub(self.res.reserved_memory_mib); // If storage is separated into multiple volumes, that limits the maxium VM size. // Due to this, we have to limit the maximum amount of vCPUs and Memory per VM, based on // the maximum possible disk size per VM. let avail_vcpus = avail_vcpus * avail_storage_mib / total_storage_available; let avail_memory_mib = avail_memory_mib * avail_storage_mib / total_storage_available; let res = snp_proto::VmNodeResources { node_pubkey: PUBLIC_KEY.clone(), avail_ports: (self.config.public_port_range.len() - self.res.reserved_ports.len()) as u32, avail_ipv4, avail_ipv6, avail_vcpus: avail_vcpus as u32, avail_memory_mib: avail_memory_mib as u32, avail_storage_mib: avail_storage_mib as u32, max_ports_per_vm: self.config.max_ports_per_vm as u32, }; debug!("sending node resources on brain: {res:?}"); let _ = self.sender.send(res.into()).await; } async fn handle_new_vm_req(&mut self, new_vm_req: snp_proto::NewVmReq) { let (memory_per_cpu, disk_per_cpu) = self.slot_ratios(); let vm_memory_per_cpu = new_vm_req.memory_mib / new_vm_req.vcpus; let vm_disk_per_cpu = new_vm_req.disk_size_mib / new_vm_req.vcpus; if !within_10_percent(memory_per_cpu, vm_memory_per_cpu as usize) || !within_10_percent(disk_per_cpu, vm_disk_per_cpu as usize) { warn!("Refusing to create vm due to unbalanced resources: {new_vm_req:?}"); let _ = self .sender .send( snp_proto::NewVmResp { uuid: new_vm_req.uuid, error: format!("Unbalanced hardware resources."), ..Default::default() } .into(), ) .await; return; }; debug!("Processing new vm request: {new_vm_req:?}"); let uuid = new_vm_req.uuid.clone(); match state::VM::new(new_vm_req.into(), &self.config, &mut self.res) { Ok(vm) => match vm.start() { Ok(_) => { info!("Succesfully started VM {uuid}"); let vm: snp_proto::NewVmResp = vm.into(); let _ = self.sender.send(vm.into()).await; self.send_node_resources().await; } Err(e) => { log::error!("Could not start VM {uuid}: {e:?}"); let _ = self .sender .send( snp_proto::NewVmResp { uuid, error: "This node has an internal error. Choose another node." .to_string(), ..Default::default() } .into(), ) .await; } }, Err(e) => match e { crate::state::VMCreationErrors::VMAlreadyExists(vm) => { log::info!( "Got NewVmReq for VM {}, that already exist. Will send NewVmResp.", vm.uuid ); let vm: snp_proto::NewVmResp = vm.into(); let _ = self.sender.send(vm.into()).await; } _ => { warn!("Refusing to service vm {uuid} due to error: {e:?}"); let _ = self .sender .send( snp_proto::NewVmResp { uuid, error: format!("{e:?}"), ..Default::default() } .into(), ) .await; } }, } } async fn handle_update_vm_req(&mut self, update_vm_req: snp_proto::UpdateVmReq) -> Result<()> { debug!("Processing update vm request: {update_vm_req:?}"); let vm_id = update_vm_req.uuid.clone(); let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; let mut vm: state::VM = serde_yaml::from_str(&content)?; match vm.update(update_vm_req.into(), &self.config, &mut self.res) { Ok(_) => { info!("Succesfully updated VM {vm_id}"); let vm: snp_proto::UpdateVmResp = vm.into(); let _ = self.sender.send(vm.into()).await; self.send_node_resources().await; } Err(e) => { debug!("Unable to update vm {vm_id} due to error: {e:?}"); let _ = self .sender .send( snp_proto::UpdateVmResp { uuid: vm_id, error: format!("{e:?}"), ..Default::default() } .into(), ) .await; } }; Ok(()) } fn handle_delete_vm(&mut self, delete_vm_req: snp_proto::DeleteVmReq) -> Result<()> { let vm_id = delete_vm_req.uuid; let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; let vm: state::VM = serde_yaml::from_str(&content)?; vm.delete(&mut self.res)?; Ok(()) } async fn run(mut self) { sleep(Duration::from_millis(500)).await; self.send_node_resources().await; while let Some(brain_msg) = self.receiver.recv().await { match brain_msg.msg { Some(snp_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => { self.handle_new_vm_req(new_vm_req).await; } Some(snp_proto::brain_vm_message::Msg::UpdateVmReq(update_vm_req)) => { if let Err(e) = self.handle_update_vm_req(update_vm_req).await { log::error!("Could not update vm: {e:?}"); } } Some(snp_proto::brain_vm_message::Msg::DeleteVm(delete_vm_req)) => { let uuid = delete_vm_req.uuid.clone(); if let Err(e) = self.handle_delete_vm(delete_vm_req) { log::error!("Could not delete vm {uuid}: {e:?}"); } else { self.send_node_resources().await; } } None => debug!("Received None from the Brain."), } } } fn clear_deleted_contracts(&mut self, deleted_vms: Vec) { for deleted_vm in deleted_vms { let uuid = deleted_vm.uuid; let content = match std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &uuid + ".yaml") { Ok(content) => content, Err(e) => { log::debug!("Could not find VM config for {uuid}. Maybe it already got deleted? Error: {e:?}"); continue; } }; let vm: crate::state::VM = match serde_yaml::from_str(&content) { Ok(vm) => vm, Err(e) => { log::error!("VM config corrupted for {uuid}. Cannot delete VM: {e:?}"); continue; } }; match vm.delete(&mut self.res) { Ok(()) => info!("Successfully deleted VM {uuid}"), Err(e) => log::error!("Deletion failed for VM {uuid}: {e:?}"), } } } } #[tokio::main] async fn main() { env_logger::builder().filter_level(log::LevelFilter::Debug).init(); loop { if std::env::var("DAEMON_AUTO_UPGRADE") != Ok("OFF".to_string()) { // This upgrade procedure will get replaced in prod. We need this for the testnet. if let Err(e) = download_and_replace_binary() { log::error!("Failed to upgrade detee-snp-daemon to newer version: {e}"); } } let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6); let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6); let mut vm_handler = VMHandler::new(brain_msg_rx, daemon_msg_tx.clone()); let network = vm_handler.config.network.clone(); let contracts: Vec = vm_handler.res.existing_vms.clone().into_iter().collect(); info!("Registering with the brain and getting back deleted VMs."); match grpc::register_node(&vm_handler.config).await { Ok(deleted_vms) => vm_handler.clear_deleted_contracts(deleted_vms), Err(e) => log::error!("Could not get contracts from brain: {e:?}"), }; tokio::spawn(async move { vm_handler.run().await; }); info!("Connecting to brain..."); if let Err(e) = grpc::connect_and_run(grpc::ConnectionData { contracts, network, brain_msg_tx, daemon_msg_rx, daemon_msg_tx, }) .await { log::error!("The connection broke: {e}"); } sleep(Duration::from_secs(3)).await; } } fn download_and_replace_binary() -> Result<()> { use reqwest::blocking::get; use std::os::unix::fs::PermissionsExt; const TMP_DAEMON: &str = "/usr/local/bin/detee/new-daemon"; const BINARY: &str = "/usr/local/bin/detee-snp-daemon"; let response = get("https://registry.detee.ltd/daemon/detee-snp-daemon")?; if !response.status().is_success() { return Err(anyhow!("Failed to download file: {}", response.status())); } let mut tmp_file = File::create(Path::new(&TMP_DAEMON))?; std::io::copy(&mut response.bytes()?.as_ref(), &mut tmp_file)?; let new_hash = crate::global::compute_sha256(&TMP_DAEMON)?; let old_hash = crate::global::compute_sha256(&BINARY)?; log::debug!("Old binary hash: {old_hash}. New binary hash: {new_hash}"); if new_hash != old_hash { std::fs::rename(BINARY, BINARY.to_string() + "_BACKUP")?; std::fs::rename(TMP_DAEMON, BINARY)?; std::fs::set_permissions(BINARY, std::fs::Permissions::from_mode(0o775))?; std::process::exit(0); } Ok(()) } fn within_10_percent(a: usize, b: usize) -> bool { let diff = a.abs_diff(b); // u32 let reference = a.max(b); // the larger of the two diff * 10 <= reference }