snp-daemon/src/main.rs

302 lines
12 KiB
Rust

mod config;
mod global;
mod grpc;
mod state;
use crate::{config::Config, global::*, grpc::snp_proto};
use anyhow::{anyhow, Result};
use log::{debug, info, warn};
use std::{fs::File, path::Path};
use tokio::{
sync::mpsc::{Receiver, Sender},
time::{sleep, Duration},
};
#[allow(dead_code)]
struct VMHandler {
receiver: Receiver<snp_proto::BrainVmMessage>,
sender: Sender<snp_proto::VmDaemonMessage>,
config: Config,
res: state::Resources,
}
#[allow(dead_code)]
impl VMHandler {
fn new(
receiver: Receiver<snp_proto::BrainVmMessage>,
sender: Sender<snp_proto::VmDaemonMessage>,
) -> Self {
let config = match Config::load_from_disk(DAEMON_CONFIG_PATH) {
Ok(config) => config,
Err(e) => panic!("Could not load config: {e:?}"),
};
let res = match state::Resources::load_from_disk() {
Ok(res) => res,
Err(e) => {
log::error!("Could calculate resources: {e:?}");
std::process::exit(1);
}
};
Self { receiver, sender, config, res }
}
fn get_available_ips(&self) -> (u32, u32) {
let mut avail_ipv4 = 0;
let mut avail_ipv6 = 0;
for nic in self.config.network_interfaces.iter() {
for range in nic.ipv4_ranges.iter() {
avail_ipv4 += (range.last_ip.to_bits() + 1) - range.first_ip.to_bits();
}
for range in nic.ipv6_ranges.iter() {
avail_ipv6 += (range.last_ip.to_bits() + 1) - range.first_ip.to_bits();
}
}
(
avail_ipv4.saturating_sub(self.res.reserved_ipv4.len() as u32),
avail_ipv6.saturating_sub(self.res.reserved_ipv6.len() as u128) as u32,
)
}
async fn send_node_resources(&mut self) {
let (avail_ipv4, avail_ipv6) = self.get_available_ips();
let mut total_gb_available = 0;
for volume in self.config.volumes.iter() {
let reservation: usize = match self.res.reserved_storage.get(&volume.path) {
Some(reserved) => *reserved,
None => 0 as usize,
};
let volume_gb_available = volume.max_reservation_gb - reservation;
if total_gb_available < volume_gb_available {
total_gb_available = volume_gb_available;
}
}
let avail_storage_gb = total_gb_available as u32;
let res = snp_proto::VmNodeResources {
node_pubkey: PUBLIC_KEY.clone(),
avail_ports: (self.config.public_port_range.len() - self.res.reserved_ports.len())
as u32,
avail_ipv4,
avail_ipv6,
avail_vcpus: (self.config.max_vcpu_reservation - self.res.reserved_vcpus) as u32,
avail_memory_mb: (self.config.max_mem_reservation_mb - self.res.reserved_memory) as u32,
avail_storage_gb,
max_ports_per_vm: self.config.max_ports_per_vm as u32,
};
debug!("sending node resources on brain: {res:?}");
let _ = self.sender.send(res.into()).await;
}
async fn handle_new_vm_req(&mut self, new_vm_req: snp_proto::NewVmReq) {
debug!("Processing new vm request: {new_vm_req:?}");
let uuid = new_vm_req.uuid.clone();
match state::VM::new(new_vm_req.into(), &self.config, &mut self.res) {
Ok(vm) => match vm.start() {
Ok(_) => {
info!("Succesfully started VM {uuid}");
let vm: snp_proto::NewVmResp = vm.into();
let _ = self.sender.send(vm.into()).await;
self.send_node_resources().await;
}
Err(e) => {
log::error!("Could not start VM {uuid}: {e:?}");
let _ = self
.sender
.send(
snp_proto::NewVmResp {
uuid,
error: "This node has an internal error. Choose another node."
.to_string(),
..Default::default()
}
.into(),
)
.await;
}
},
Err(e) => match e {
crate::state::VMCreationErrors::VMAlreadyExists(vm) => {
log::info!(
"Got NewVmReq for VM {}, that already exist. Will send NewVmResp.",
vm.uuid
);
let vm: snp_proto::NewVmResp = vm.into();
let _ = self.sender.send(vm.into()).await;
}
_ => {
warn!("Refusing to service vm {uuid} due to error: {e:?}");
let _ = self
.sender
.send(
snp_proto::NewVmResp {
uuid,
error: format!("{e:?}"),
..Default::default()
}
.into(),
)
.await;
}
},
}
}
async fn handle_update_vm_req(&mut self, update_vm_req: snp_proto::UpdateVmReq) -> Result<()> {
debug!("Processing update vm request: {update_vm_req:?}");
let vm_id = update_vm_req.uuid.clone();
let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
let mut vm: state::VM = serde_yaml::from_str(&content)?;
match vm.update(update_vm_req.into(), &self.config, &mut self.res) {
Ok(_) => {
info!("Succesfully updated VM {vm_id}");
let vm: snp_proto::UpdateVmResp = vm.into();
let _ = self.sender.send(vm.into()).await;
self.send_node_resources().await;
}
Err(e) => {
debug!("Unable to update vm {vm_id} due to error: {e:?}");
let _ = self
.sender
.send(
snp_proto::UpdateVmResp {
uuid: vm_id,
error: format!("{e:?}"),
..Default::default()
}
.into(),
)
.await;
}
};
Ok(())
}
fn handle_delete_vm(&mut self, delete_vm_req: snp_proto::DeleteVmReq) -> Result<()> {
let vm_id = delete_vm_req.uuid;
let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
let vm: state::VM = serde_yaml::from_str(&content)?;
vm.delete(&mut self.res)?;
Ok(())
}
async fn run(mut self) {
sleep(Duration::from_millis(500)).await;
self.send_node_resources().await;
while let Some(brain_msg) = self.receiver.recv().await {
match brain_msg.msg {
Some(snp_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
self.handle_new_vm_req(new_vm_req).await;
}
Some(snp_proto::brain_vm_message::Msg::UpdateVmReq(update_vm_req)) => {
if let Err(e) = self.handle_update_vm_req(update_vm_req).await {
log::error!("Could not update vm: {e:?}");
}
}
Some(snp_proto::brain_vm_message::Msg::DeleteVm(delete_vm_req)) => {
let uuid = delete_vm_req.uuid.clone();
if let Err(e) = self.handle_delete_vm(delete_vm_req) {
log::error!("Could not delete vm {uuid}: {e:?}");
} else {
self.send_node_resources().await;
}
}
None => debug!("Received None from the Brain."),
}
}
}
fn clear_deleted_contracts(&mut self, deleted_vms: Vec<snp_proto::DeleteVmReq>) {
for deleted_vm in deleted_vms {
let uuid = deleted_vm.uuid;
let content = match std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &uuid + ".yaml")
{
Ok(content) => content,
Err(e) => {
log::debug!("Could not find VM config for {uuid}. Maybe it already got deleted? Error: {e:?}");
continue;
}
};
let vm: crate::state::VM = match serde_yaml::from_str(&content) {
Ok(vm) => vm,
Err(e) => {
log::error!("VM config corrupted for {uuid}. Cannot delete VM: {e:?}");
continue;
}
};
match vm.delete(&mut self.res) {
Ok(()) => info!("Successfully deleted VM {uuid}"),
Err(e) => log::error!("Deletion failed for VM {uuid}: {e:?}"),
}
}
}
}
#[tokio::main]
async fn main() {
env_logger::builder().filter_level(log::LevelFilter::Debug).init();
loop {
if std::env::var("DAEMON_AUTO_UPGRADE") != Ok("OFF".to_string()) {
// This upgrade procedure will get replaced in prod. We need this for the testnet.
if let Err(e) = download_and_replace_binary() {
log::error!("Failed to upgrade detee-snp-daemon to newer version: {e}");
}
}
let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6);
let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6);
let mut vm_handler = VMHandler::new(brain_msg_rx, daemon_msg_tx.clone());
let network = vm_handler.config.network.clone();
let contracts: Vec<String> = vm_handler.res.existing_vms.clone().into_iter().collect();
info!("Registering with the brain and getting back deleted VMs.");
match grpc::register_node(&vm_handler.config).await {
Ok(deleted_vms) => {
vm_handler.clear_deleted_contracts(deleted_vms)
}
Err(e) => log::error!("Could not get contracts from brain: {e:?}"),
};
tokio::spawn(async move {
vm_handler.run().await;
});
info!("Connecting to brain...");
if let Err(e) = grpc::connect_and_run(grpc::ConnectionData {
contracts,
network,
brain_msg_tx,
daemon_msg_rx,
daemon_msg_tx,
})
.await
{
log::error!("The connection broke: {e}");
}
sleep(Duration::from_secs(3)).await;
}
}
fn download_and_replace_binary() -> Result<()> {
use reqwest::blocking::get;
use std::os::unix::fs::PermissionsExt;
const TMP_DAEMON: &str = "/usr/local/bin/detee/new-daemon";
const BINARY: &str = "/usr/local/bin/detee-snp-daemon";
let response = get("https://registry.detee.ltd/daemon/detee-snp-daemon")?;
if !response.status().is_success() {
return Err(anyhow!("Failed to download file: {}", response.status()));
}
let mut tmp_file = File::create(Path::new(&TMP_DAEMON))?;
std::io::copy(&mut response.bytes()?.as_ref(), &mut tmp_file)?;
let new_hash = crate::global::compute_sha256(&TMP_DAEMON)?;
let old_hash = crate::global::compute_sha256(&BINARY)?;
log::debug!("Old binary hash: {old_hash}. New binary hash: {new_hash}");
if new_hash != old_hash {
std::fs::rename(BINARY, BINARY.to_string() + "_BACKUP")?;
std::fs::rename(TMP_DAEMON, BINARY)?;
std::fs::set_permissions(BINARY, std::fs::Permissions::from_mode(0o775))?;
std::process::exit(0);
}
Ok(())
}