snp-daemon/src/main.rs

264 lines
9.8 KiB
Rust

// SPDX-License-Identifier: Apache-2.0
mod config;
mod global;
mod grpc;
mod state;
use crate::{config::Config, global::*, grpc::snp_proto};
use anyhow::{anyhow, Result};
use log::{debug, info, warn};
use state::State;
use std::{fs::File, path::Path};
use tokio::{
sync::mpsc::{Receiver, Sender},
time::{sleep, Duration},
};
#[allow(dead_code)]
struct VMHandler {
receiver: Receiver<snp_proto::BrainVmMessage>,
sender: Sender<snp_proto::VmDaemonMessage>,
state: State,
}
#[allow(dead_code)]
impl VMHandler {
fn new(
receiver: Receiver<snp_proto::BrainVmMessage>,
sender: Sender<snp_proto::VmDaemonMessage>,
) -> Self {
let config = match Config::load_from_disk(DAEMON_CONFIG_PATH) {
Ok(config) => config,
Err(e) => panic!("Could not load config: {e:?}"),
};
let res = match state::Resources::load_from_disk() {
Ok(res) => res,
Err(e) => {
log::error!("Could calculate resources: {e:?}");
std::process::exit(1);
}
};
let state = state::State {
config,
res,
};
Self { receiver, sender, state}
}
async fn send_node_resources(&mut self) {
let _ = self.sender.send(self.state.available_resources().into()).await;
}
async fn handle_new_vm_req(&mut self, new_vm_req: snp_proto::NewVmReq) {
debug!("Processing new vm request: {new_vm_req:?}");
let uuid = new_vm_req.uuid.clone();
match self.state.new_vm(new_vm_req.into()) {
Ok(vm) => match vm.start() {
Ok(_) => {
info!("Succesfully started VM {uuid}");
let vm: snp_proto::NewVmResp = vm.into();
let _ = self.sender.send(vm.into()).await;
self.send_node_resources().await;
}
Err(e) => {
log::error!("Could not start VM {uuid}: {e:?}");
let _ = self
.sender
.send(
snp_proto::NewVmResp {
uuid,
error: "This node has an internal error. Choose another node."
.to_string(),
..Default::default()
}
.into(),
)
.await;
}
},
Err(e) => match e {
crate::state::VMCreationErrors::VMAlreadyExists(vm) => {
log::info!(
"Got NewVmReq for VM {}, that already exist. Will send NewVmResp.",
vm.uuid
);
let vm: snp_proto::NewVmResp = vm.into();
let _ = self.sender.send(vm.into()).await;
}
_ => {
warn!("Refusing to service vm {uuid} due to error: {e:?}");
let _ = self
.sender
.send(
snp_proto::NewVmResp {
uuid,
error: format!("{e:?}"),
..Default::default()
}
.into(),
)
.await;
}
},
}
}
async fn handle_update_vm_req(&mut self, update_vm_req: snp_proto::UpdateVmReq) -> Result<()> {
debug!("Processing update vm request: {update_vm_req:?}");
let vm_id = update_vm_req.uuid.clone();
let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
let mut vm: state::VM = serde_yaml::from_str(&content)?;
match vm.update(update_vm_req.into(), &mut self.state) {
Ok(_) => {
info!("Succesfully updated VM {vm_id}");
let vm: snp_proto::UpdateVmResp = vm.into();
let _ = self.sender.send(vm.into()).await;
self.send_node_resources().await;
}
Err(e) => {
debug!("Unable to update vm {vm_id} due to error: {e:?}");
let _ = self
.sender
.send(
snp_proto::UpdateVmResp {
uuid: vm_id,
error: format!("{e:?}"),
..Default::default()
}
.into(),
)
.await;
}
};
Ok(())
}
fn handle_delete_vm(&mut self, delete_vm_req: snp_proto::DeleteVmReq) -> Result<()> {
let vm_id = delete_vm_req.uuid;
let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
let vm: state::VM = serde_yaml::from_str(&content)?;
vm.delete(&mut self.state.res)?;
Ok(())
}
async fn run(mut self) {
sleep(Duration::from_millis(500)).await;
self.send_node_resources().await;
while let Some(brain_msg) = self.receiver.recv().await {
match brain_msg.msg {
Some(snp_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
self.handle_new_vm_req(new_vm_req).await;
}
Some(snp_proto::brain_vm_message::Msg::UpdateVmReq(update_vm_req)) => {
if let Err(e) = self.handle_update_vm_req(update_vm_req).await {
log::error!("Could not update vm: {e:?}");
}
}
Some(snp_proto::brain_vm_message::Msg::DeleteVm(delete_vm_req)) => {
let uuid = delete_vm_req.uuid.clone();
if let Err(e) = self.handle_delete_vm(delete_vm_req) {
log::error!("Could not delete vm {uuid}: {e:?}");
} else {
self.send_node_resources().await;
}
}
None => debug!("Received None from the Brain."),
}
}
}
fn clear_deleted_contracts(&mut self, deleted_vms: Vec<snp_proto::DeleteVmReq>) {
for deleted_vm in deleted_vms {
let uuid = deleted_vm.uuid;
let content = match std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &uuid + ".yaml")
{
Ok(content) => content,
Err(e) => {
log::debug!("Could not find VM config for {uuid}. Maybe it already got deleted? Error: {e:?}");
continue;
}
};
let vm: crate::state::VM = match serde_yaml::from_str(&content) {
Ok(vm) => vm,
Err(e) => {
log::error!("VM config corrupted for {uuid}. Cannot delete VM: {e:?}");
continue;
}
};
match vm.delete(&mut self.state.res) {
Ok(()) => info!("Successfully deleted VM {uuid}"),
Err(e) => log::error!("Deletion failed for VM {uuid}: {e:?}"),
}
}
}
}
#[tokio::main]
async fn main() {
env_logger::builder().filter_level(log::LevelFilter::Debug).init();
loop {
if std::env::var("DAEMON_AUTO_UPGRADE") != Ok("OFF".to_string()) {
// This upgrade procedure will get replaced in prod. We need this for the testnet.
if let Err(e) = download_and_replace_binary() {
log::error!("Failed to upgrade detee-snp-daemon to newer version: {e}");
}
}
let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6);
let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6);
let mut vm_handler = VMHandler::new(brain_msg_rx, daemon_msg_tx.clone());
let network = vm_handler.state.config.network.clone();
let contracts: Vec<String> = vm_handler.state.res.existing_vms.clone().into_iter().collect();
info!("Registering with the brain and getting back deleted VMs.");
match grpc::register_node(&vm_handler.state.config).await {
Ok(deleted_vms) => vm_handler.clear_deleted_contracts(deleted_vms),
Err(e) => log::error!("Could not get contracts from brain: {e:?}"),
};
tokio::spawn(async move {
vm_handler.run().await;
});
info!("Connecting to brain...");
if let Err(e) = grpc::connect_and_run(grpc::ConnectionData {
contracts,
network,
brain_msg_tx,
daemon_msg_rx,
daemon_msg_tx,
})
.await
{
log::error!("The connection broke: {e}");
}
sleep(Duration::from_secs(3)).await;
}
}
fn download_and_replace_binary() -> Result<()> {
use reqwest::blocking::get;
use std::os::unix::fs::PermissionsExt;
const TMP_DAEMON: &str = "/usr/local/bin/detee/new-daemon";
const BINARY: &str = "/usr/local/bin/detee-snp-daemon";
let response = get("https://registry.detee.ltd/daemon/detee-snp-daemon")?;
if !response.status().is_success() {
return Err(anyhow!("Failed to download file: {}", response.status()));
}
let mut tmp_file = File::create(Path::new(&TMP_DAEMON))?;
std::io::copy(&mut response.bytes()?.as_ref(), &mut tmp_file)?;
let new_hash = crate::global::compute_sha256(&TMP_DAEMON)?;
let old_hash = crate::global::compute_sha256(&BINARY)?;
log::debug!("Old binary hash: {old_hash}. New binary hash: {new_hash}");
if new_hash != old_hash {
std::fs::rename(BINARY, BINARY.to_string() + "_BACKUP")?;
std::fs::rename(TMP_DAEMON, BINARY)?;
std::fs::set_permissions(BINARY, std::fs::Permissions::from_mode(0o775))?;
std::process::exit(0);
}
Ok(())
}