modified code based on new snp proto

This commit is contained in:
ghe0 2025-01-18 02:26:53 +02:00
parent d8488476c5
commit abf09e8d26
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
10 changed files with 333 additions and 404 deletions

5
Cargo.lock generated

@ -348,6 +348,7 @@ dependencies = [
"rand_core",
"reqwest",
"serde",
"serde_json",
"serde_yaml",
"sha2",
"tokio",
@ -1522,9 +1523,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.134"
version = "1.0.135"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d"
checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9"
dependencies = [
"itoa",
"memchr",

@ -20,6 +20,7 @@ rand = "0.8.5"
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.17"
tonic = "0.12"
serde_json = "1.0.135"
[build-dependencies]
tonic-build = "0.12"

@ -1,6 +1,6 @@
fn main() {
tonic_build::configure()
.build_server(true)
.compile_protos(&["brain.proto"], &["proto"])
.compile_protos(&["snp.proto"], &["proto"])
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
}

@ -1,81 +1,14 @@
syntax = "proto3";
package brain;
package snp_proto;
message Empty {
}
message NodePubkey {
string node_pubkey = 1;
message Pubkey {
string pubkey = 1;
}
message RegisterNodeReq {
string node_pubkey = 1;
string owner_pubkey = 2;
}
message NodeResourceReq {
string node_pubkey = 1;
uint32 avail_ports = 2;
uint32 avail_ipv4 = 3;
uint32 avail_ipv6 = 4;
uint32 avail_vcpus = 5;
uint32 avail_memory_mb = 6;
uint32 avail_storage_gb = 7;
uint32 max_ports_per_vm = 8;
}
message MeasurementArgs {
// this will be IP:Port of the dtrfs API
// actually not a measurement arg, but needed for the injector
string dtrfs_api_endpoint = 1;
repeated uint32 exposed_ports = 2;
string ovmf_hash = 5;
// This is needed to allow the CLI to build the kernel params from known data.
// The CLI will use the kernel params to get the measurement.
repeated NewVmRespIP ips = 6;
}
message NewVMReq {
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
string hostname = 2;
string admin_pubkey = 3;
string node_pubkey = 4;
repeated uint32 extra_ports = 5;
bool public_ipv4 = 6;
bool public_ipv6 = 7;
uint32 disk_size_gb = 8;
uint32 vcpus = 9;
uint32 memory_mb = 10;
string kernel_url = 11;
string kernel_sha = 12;
string dtrfs_url = 13;
string dtrfs_sha = 14;
}
message NewVMResp {
string uuid = 1;
string error = 2;
MeasurementArgs args = 3;
}
message UpdateVMReq {
string uuid = 1;
uint32 disk_size_gb = 2;
uint32 vcpus = 3;
uint32 memory_mb = 4;
string kernel_url = 5;
string kernel_sha = 6;
string dtrfs_url = 7;
string dtrfs_sha = 8;
}
message UpdateVMResp {
string uuid = 1;
string error = 2;
MeasurementArgs args = 3;
}
message VMContract {
message Contract {
string uuid = 1;
string hostname = 2;
string admin_pubkey = 3;
@ -92,33 +25,115 @@ message VMContract {
string updated_at = 14;
}
message ListVMContractsReq {
string admin_pubkey = 1;
string node_pubkey = 2;
string uuid = 3;
message MeasurementArgs {
// this will be IP:Port of the dtrfs API
// actually not a measurement arg, but needed for the injector
string dtrfs_api_endpoint = 1;
repeated uint32 exposed_ports = 2;
string ovmf_hash = 5;
// This is needed to allow the CLI to build the kernel params from known data.
// The CLI will use the kernel params to get the measurement.
repeated MeasurementIP ips = 6;
}
message NewVmRespIP {
message MeasurementIP {
uint32 nic_index = 1;
string address = 2;
string mask = 3;
string gateway = 4;
}
message DeleteVMReq {
message RegisterNodeReq {
string node_pubkey = 1;
string owner_pubkey = 2;
string main_ip = 3;
string country = 7;
string region = 8;
string city = 9;
}
message NodeResources {
string node_pubkey = 1;
uint32 avail_ports = 2;
uint32 avail_ipv4 = 3;
uint32 avail_ipv6 = 4;
uint32 avail_vcpus = 5;
uint32 avail_memory_mb = 6;
uint32 avail_storage_gb = 7;
uint32 max_ports_per_vm = 8;
}
message NewVmReq {
string uuid = 1;
string hostname = 2;
string admin_pubkey = 3;
string node_pubkey = 4;
repeated uint32 extra_ports = 5;
bool public_ipv4 = 6;
bool public_ipv6 = 7;
uint32 disk_size_gb = 8;
uint32 vcpus = 9;
uint32 memory_mb = 10;
string kernel_url = 11;
string kernel_sha = 12;
string dtrfs_url = 13;
string dtrfs_sha = 14;
}
message NewVmResp {
string uuid = 1;
string error = 2;
MeasurementArgs args = 3;
}
message UpdateVmReq {
string uuid = 1;
uint32 disk_size_gb = 2;
uint32 vcpus = 3;
uint32 memory_mb = 4;
string kernel_url = 5;
string kernel_sha = 6;
string dtrfs_url = 7;
string dtrfs_sha = 8;
}
message UpdateVmResp {
string uuid = 1;
string error = 2;
MeasurementArgs args = 3;
}
message DeleteVmReq {
string uuid = 1;
}
service BrainDaemonService {
rpc RegisterNode (RegisterNodeReq) returns (Empty);
rpc SendNodeResources (stream NodeResourceReq) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
rpc SendNewVMResp (stream NewVMResp) returns (Empty);
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
//rpc GetMeasurementArgs (ListVMContractsReq) returns (stream MeasurementArgs);
message BrainMessage {
oneof Msg {
NewVmReq new_vm_req = 1;
UpdateVmReq update_vm_req = 2;
DeleteVmReq delete_vm = 3;
}
}
message DaemonMessage {
oneof Msg {
Pubkey pubkey = 1;
NewVmResp new_vm_resp = 2;
UpdateVmResp update_vm_resp = 3;
NodeResources node_resources = 4;
}
}
service BrainDaemon {
rpc RegisterNode (RegisterNodeReq) returns (stream Contract);
rpc BrainMessages (Pubkey) returns (stream BrainMessage);
rpc DaemonMessages (stream DaemonMessage) returns (Empty);
}
message ListContractsReq {
string admin_pubkey = 1;
string node_pubkey = 2;
string uuid = 3;
}
message NodeFilters {
@ -144,12 +159,11 @@ message NodeListResp {
uint32 provider_rating = 7;
}
service BrainCliService {
rpc CreateVMContract (NewVMReq) returns (NewVMResp);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
service BrainCli {
rpc NewVm (NewVmReq) returns (NewVmResp);
rpc ListContracts (ListContractsReq) returns (stream Contract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp);
rpc GetOneNode (NodeFilters) returns (NodeListResp);
rpc DeleteVM (DeleteVMReq) returns (Empty);
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
//rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs);
rpc DeleteVm (DeleteVmReq) returns (Empty);
rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp);
}

@ -1,4 +1,3 @@
#![allow(dead_code)]
use anyhow::Result;
use serde::Deserialize;
use std::{
@ -57,17 +56,9 @@ pub struct Config {
}
mod range_format {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde::{Deserialize, Deserializer, Serialize};
use std::ops::Range;
pub fn serialize<S>(range: &Range<u16>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let range_repr = RangeRepr { start: range.start, end: range.end };
range_repr.serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Range<u16>, D::Error>
where
D: Deserializer<'de>,

@ -1,16 +0,0 @@
#![allow(dead_code)]
pub(crate) const DEFAULT_OVMF: &str = "/usr/share/edk2/ovmf/OVMF.amdsev.fd";
pub(crate) const VM_BOOT_DIR: &str = "/var/lib/detee/boot/";
pub(crate) const USED_RESOURCES: &str = "/etc/detee/daemon/used_resources.yaml";
pub(crate) const VM_CONFIG_DIR: &str = "/etc/detee/daemon/vms/";
pub(crate) const SECRET_KEY_PATH: &str = "/etc/detee/daemon/node_secret_key.pem";
pub(crate) const DAEMON_CONFIG_PATH: &str = "/etc/detee/daemon/config.yaml";
pub(crate) const START_VM_SCRIPT: &str = "/usr/local/bin/detee/start_qemu_vm.sh";
// TODO: research if other CPU types provide better performance
pub(crate) const QEMU_VM_CPU_TYPE: &str = "EPYC-v4";
// If you modify this, also modify scripts/start_qemu_vm.sh
pub(crate) const OVMF_HASH: &str =
"0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76";
pub(crate) const OVMF_URL: &str =
"https://drive.google.com/uc?export=download&id=1V-vLkaiLaGmFSjrN84Z6nELQOxKNAoSJ";

75
src/global.rs Normal file

@ -0,0 +1,75 @@
use anyhow::Result;
use lazy_static::lazy_static;
use log::{info, warn};
use std::{fs::File, io::Write};
pub(crate) const VM_BOOT_DIR: &str = "/var/lib/detee/boot/";
pub(crate) const USED_RESOURCES: &str = "/etc/detee/daemon/used_resources.yaml";
pub(crate) const VM_CONFIG_DIR: &str = "/etc/detee/daemon/vms/";
pub(crate) const SECRET_KEY_PATH: &str = "/etc/detee/daemon/node_secret_key.pem";
pub(crate) const DAEMON_CONFIG_PATH: &str = "/etc/detee/daemon/config.yaml";
pub(crate) const START_VM_SCRIPT: &str = "/usr/local/bin/detee/start_qemu_vm.sh";
// TODO: research if other CPU types provide better performance
pub(crate) const QEMU_VM_CPU_TYPE: &str = "EPYC-v4";
// If you modify this, also modify scripts/start_qemu_vm.sh
pub(crate) const OVMF_HASH: &str =
"0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76";
pub(crate) const OVMF_URL: &str =
"https://drive.google.com/uc?export=download&id=1V-vLkaiLaGmFSjrN84Z6nELQOxKNAoSJ";
lazy_static! {
pub static ref PUBLIC_KEY: String = get_public_key();
pub static ref IP_INFO: IPInfo = get_ip_info().unwrap();
}
fn create_secret_key() -> Result<ed25519_dalek::SigningKey> {
use ed25519_dalek::pkcs8::{spki::der::pem::LineEnding, EncodePrivateKey};
let key_path = SECRET_KEY_PATH;
info!("Creating new secret key at {}", key_path);
let sk = ed25519_dalek::SigningKey::generate(&mut rand_core::OsRng);
let sk_pem = sk.to_pkcs8_pem(LineEnding::default()).unwrap();
let mut file = File::create(key_path)?;
file.write_all(sk_pem.as_bytes())?;
Ok(sk)
}
fn load_secret_key() -> Result<ed25519_dalek::SigningKey> {
use ed25519_dalek::pkcs8::DecodePrivateKey;
let secret_key_pem = match std::fs::read_to_string(SECRET_KEY_PATH) {
Ok(secret_key_pem) => secret_key_pem,
Err(e) => {
warn!("Could not load secret key due to error: {e:?}");
return Ok(create_secret_key()?);
}
};
Ok(ed25519_dalek::SigningKey::from_pkcs8_pem(&secret_key_pem)?)
}
pub fn get_public_key() -> String {
use ed25519_dalek::pkcs8::{spki::der::pem::LineEnding, EncodePublicKey};
let pubkey = load_secret_key()
.unwrap()
.verifying_key()
.to_public_key_pem(LineEnding::default())
.unwrap()
.lines()
.nth(1)
.unwrap()
.to_string();
log::info!("Loaded the following public key: {pubkey}");
pubkey
}
#[derive(serde::Deserialize, Clone)]
pub struct IPInfo {
pub country: String,
pub region: String,
pub city: String,
pub ip: String,
}
fn get_ip_info() -> anyhow::Result<IPInfo> {
let body = reqwest::blocking::get("https://ipinfo.io/".to_string())?.text()?;
info!("Got the following data from ipinfo.io: {body}");
Ok(serde_json::de::from_str(&body)?)
}

@ -1,76 +1,53 @@
#![allow(dead_code)]
pub mod brain {
tonic::include_proto!("brain");
}
use crate::snp_proto::DaemonMessage;
use anyhow::Result;
use brain::{
brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, ListVmContractsReq,
NewVmReq, NewVmResp, NodePubkey, NodeResourceReq, RegisterNodeReq, UpdateVmReq, UpdateVmResp,
VmContract,
use log::{debug, info, warn};
use snp_proto::{
brain_daemon_client::BrainDaemonClient, BrainMessage, Contract, Pubkey, RegisterNodeReq,
};
use lazy_static::lazy_static;
use log::{debug, error, info, warn};
use std::{fs::File, io::Write};
use tokio::{
sync::mpsc::{Receiver, Sender},
task::JoinSet,
};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tonic::transport::Channel;
use crate::global::*;
lazy_static! {
static ref PUBLIC_KEY: String = get_public_key();
pub mod snp_proto {
tonic::include_proto!("snp_proto");
}
fn create_secret_key() -> Result<ed25519_dalek::SigningKey> {
use ed25519_dalek::pkcs8::{spki::der::pem::LineEnding, EncodePrivateKey};
let key_path = crate::constants::SECRET_KEY_PATH;
info!("Creating new secret key at {}", key_path);
let sk = ed25519_dalek::SigningKey::generate(&mut rand_core::OsRng);
let sk_pem = sk.to_pkcs8_pem(LineEnding::default()).unwrap();
let mut file = File::create(key_path)?;
file.write_all(sk_pem.as_bytes())?;
Ok(sk)
impl From<snp_proto::NewVmResp> for snp_proto::DaemonMessage {
fn from(value: snp_proto::NewVmResp) -> Self {
snp_proto::DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::NewVmResp(value)) }
}
}
fn load_secret_key() -> Result<ed25519_dalek::SigningKey> {
use ed25519_dalek::pkcs8::DecodePrivateKey;
let secret_key_pem = match std::fs::read_to_string(crate::constants::SECRET_KEY_PATH) {
Ok(secret_key_pem) => secret_key_pem,
Err(e) => {
warn!("Could not load secret key due to error: {e:?}");
return Ok(create_secret_key()?);
}
impl From<snp_proto::UpdateVmResp> for snp_proto::DaemonMessage {
fn from(value: snp_proto::UpdateVmResp) -> Self {
snp_proto::DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::UpdateVmResp(value)) }
}
}
impl From<snp_proto::NodeResources> for snp_proto::DaemonMessage {
fn from(value: snp_proto::NodeResources) -> Self {
snp_proto::DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::NodeResources(value)) }
}
}
pub async fn register_node(brain_url: String) -> Result<Vec<Contract>> {
let mut client = BrainDaemonClient::connect(brain_url).await?;
debug!("Starting node registration...");
let ip_info = IP_INFO.clone();
let req = RegisterNodeReq {
node_pubkey: PUBLIC_KEY.clone(),
owner_pubkey: "IamTheOwnerOf".to_string() + &PUBLIC_KEY,
main_ip: ip_info.ip,
country: ip_info.country,
region: ip_info.region,
city: ip_info.city,
};
Ok(ed25519_dalek::SigningKey::from_pkcs8_pem(&secret_key_pem)?)
}
pub fn get_public_key() -> String {
use ed25519_dalek::pkcs8::{spki::der::pem::LineEnding, EncodePublicKey};
let pubkey = load_secret_key()
.unwrap()
.verifying_key()
.to_public_key_pem(LineEnding::default())
.unwrap()
.lines()
.nth(1)
.unwrap()
.to_string();
log::info!("Loaded the following public key: {pubkey}");
pubkey
}
pub async fn list_contracts(brain_url: String) -> Result<Vec<VmContract>> {
let mut client = BrainDaemonServiceClient::connect(brain_url).await?;
let mut contracts = Vec::new();
let mut grpc_stream = client
.list_vm_contracts(ListVmContractsReq {
node_pubkey: PUBLIC_KEY.to_string(),
..Default::default()
})
.await?
.into_inner();
let mut grpc_stream = client.register_node(req).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(node) => {
@ -86,18 +63,18 @@ pub async fn list_contracts(brain_url: String) -> Result<Vec<VmContract>> {
Ok(contracts)
}
async fn listen_for_new_vm_reqs(
mut client: BrainDaemonServiceClient<Channel>,
tx: Sender<NewVmReq>,
async fn receive_messages(
mut client: BrainDaemonClient<Channel>,
tx: Sender<BrainMessage>,
) -> Result<()> {
debug!("starting listen_for_new_vm_reqs");
let node_pubkey = PUBLIC_KEY.clone();
let mut grpc_stream = client.get_new_vm_reqs(NodePubkey { node_pubkey }).await?.into_inner();
debug!("starting to listen for messages from brain");
let pubkey = PUBLIC_KEY.clone();
let mut grpc_stream = client.brain_messages(Pubkey { pubkey }).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(req) => {
info!("Received new vm request: {req:?}");
let _ = tx.send(req).await;
Ok(msg) => {
info!("Received message from brain: {msg:?}");
let _ = tx.send(msg).await;
}
Err(e) => {
warn!("Brain disconnected from listen_for_new_vm_reqs: {e}");
@ -108,121 +85,34 @@ async fn listen_for_new_vm_reqs(
Ok(())
}
async fn send_newvm_resp(
mut client: BrainDaemonServiceClient<Channel>,
rx: Receiver<NewVmResp>,
async fn send_messages(
mut client: BrainDaemonClient<Channel>,
rx: Receiver<DaemonMessage>,
tx: Sender<DaemonMessage>,
) -> Result<()> {
debug!("starting send_newvm_resp stream");
debug!("starting daemon message stream to brain");
let pubkey = PUBLIC_KEY.clone();
let rx_stream = ReceiverStream::new(rx);
client.send_new_vm_resp(rx_stream).await?;
tx.send(DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::Pubkey(Pubkey { pubkey })) })
.await?;
client.daemon_messages(rx_stream).await?;
debug!("send_newvm_resp is about to exit");
Ok(())
}
async fn send_node_resources(
mut client: BrainDaemonServiceClient<Channel>,
rx: Receiver<NodeResourceReq>,
) -> Result<()> {
debug!("starting send_newvm_resp stream");
let rx_stream = ReceiverStream::new(rx).map(|mut node_resources| {
node_resources.node_pubkey = get_public_key();
node_resources
});
client.send_node_resources(rx_stream).await?;
debug!("send_newvm_resp is about to exit");
Ok(())
}
async fn register_node(mut client: BrainDaemonServiceClient<Channel>) {
debug!("Starting node registration...");
let req = RegisterNodeReq {
node_pubkey: PUBLIC_KEY.clone(),
owner_pubkey: "IamTheOwnerOf".to_string() + &PUBLIC_KEY,
};
match client.register_node(req).await {
Ok(_) => {
info!("Registered as 10.0.10.1 from Bruma/Cyrodiil with ID {}", PUBLIC_KEY.clone())
}
Err(e) => error!("Could not register node data: {e:?}"),
};
}
async fn listen_for_deleted_vms(
mut client: BrainDaemonServiceClient<Channel>,
tx: Sender<DeleteVmReq>,
) -> Result<()> {
debug!("starting listen_for_new_vm_reqs");
let node_pubkey = PUBLIC_KEY.clone();
let mut grpc_stream = client.get_delete_vm_req(NodePubkey { node_pubkey }).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(req) => {
info!("Received delete vm request: {req:?}");
let _ = tx.send(req).await;
}
Err(e) => {
warn!("Brain disconnected from listen_for_deleted_vms: {e}");
}
}
}
debug!("listen_for_new_vm_reqs is about to exit");
Ok(())
}
async fn listen_for_update_vm_reqs(
mut client: BrainDaemonServiceClient<Channel>,
tx: Sender<UpdateVmReq>,
) -> Result<()> {
debug!("starting listen_for_update_vm_reqs");
let node_pubkey = PUBLIC_KEY.clone();
let mut grpc_stream = client.get_update_vm_req(NodePubkey { node_pubkey }).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(req) => {
info!("Received update vm request: {req:?}");
let _ = tx.send(req).await;
}
Err(e) => {
warn!("Brain disconnected from listen_for_update_vm_reqs: {e}");
}
}
}
debug!("listen_for_update_vm_reqs is about to exit");
Ok(())
}
async fn send_updatevm_resp(
mut client: BrainDaemonServiceClient<Channel>,
rx: Receiver<UpdateVmResp>,
) -> Result<()> {
debug!("starting send_updatevm_resp stream");
let rx_stream = ReceiverStream::new(rx);
client.send_update_vm_resp(rx_stream).await?;
debug!("send_updatevm_resp is about to exit");
Ok(())
}
pub struct ConnectionData {
pub brain_url: String,
pub newvm_tx: Sender<NewVmReq>,
pub confirm_vm_rx: Receiver<NewVmResp>,
pub updatevm_tx: Sender<UpdateVmReq>,
pub confirm_updatevm_rx: Receiver<UpdateVmResp>,
pub delete_vm_tx: Sender<DeleteVmReq>,
pub resources_rx: Receiver<NodeResourceReq>,
pub brain_msg_tx: Sender<BrainMessage>,
pub daemon_msg_rx: Receiver<DaemonMessage>,
pub daemon_msg_tx: Sender<DaemonMessage>,
}
pub async fn connect_and_run(cd: ConnectionData) -> Result<()> {
let client = BrainDaemonServiceClient::connect(cd.brain_url).await?;
let client = BrainDaemonClient::connect(cd.brain_url).await?;
let mut streaming_tasks = JoinSet::new();
register_node(client.clone()).await;
streaming_tasks.spawn(listen_for_new_vm_reqs(client.clone(), cd.newvm_tx));
streaming_tasks.spawn(send_newvm_resp(client.clone(), cd.confirm_vm_rx));
streaming_tasks.spawn(listen_for_update_vm_reqs(client.clone(), cd.updatevm_tx));
streaming_tasks.spawn(send_updatevm_resp(client.clone(), cd.confirm_updatevm_rx));
streaming_tasks.spawn(listen_for_deleted_vms(client.clone(), cd.delete_vm_tx));
streaming_tasks.spawn(send_node_resources(client.clone(), cd.resources_rx));
streaming_tasks.spawn(receive_messages(client.clone(), cd.brain_msg_tx));
streaming_tasks.spawn(send_messages(client.clone(), cd.daemon_msg_rx, cd.daemon_msg_tx));
let task_output = streaming_tasks.join_next().await;
warn!("One stream exited: {task_output:?}");

@ -1,10 +1,10 @@
#[allow(dead_code)]
mod config;
mod constants;
mod global;
mod grpc;
mod state;
use crate::{config::Config, grpc::brain};
use crate::global::*;
use crate::{config::Config, grpc::snp_proto};
use anyhow::Result;
use log::{debug, info, warn};
use tokio::{
@ -14,12 +14,8 @@ use tokio::{
#[allow(dead_code)]
struct VMHandler {
new_vm_req_chan: Receiver<brain::NewVmReq>,
new_vm_resp_chan: Sender<brain::NewVmResp>,
update_vm_req_chan: Receiver<brain::UpdateVmReq>,
update_vm_resp_chan: Sender<brain::UpdateVmResp>,
delete_vm_chan: Receiver<brain::DeleteVmReq>,
resources_chan: Sender<brain::NodeResourceReq>,
receiver: Receiver<snp_proto::BrainMessage>,
sender: Sender<snp_proto::DaemonMessage>,
config: Config,
res: state::Resources,
}
@ -27,14 +23,10 @@ struct VMHandler {
#[allow(dead_code)]
impl VMHandler {
fn new(
new_vm_req_chan: Receiver<brain::NewVmReq>,
new_vm_resp_chan: Sender<brain::NewVmResp>,
update_vm_req_chan: Receiver<brain::UpdateVmReq>,
update_vm_resp_chan: Sender<brain::UpdateVmResp>,
delete_vm_chan: Receiver<brain::DeleteVmReq>,
resources_chan: Sender<brain::NodeResourceReq>,
receiver: Receiver<snp_proto::BrainMessage>,
sender: Sender<snp_proto::DaemonMessage>,
) -> Self {
let config = match Config::load_from_disk(crate::constants::DAEMON_CONFIG_PATH) {
let config = match Config::load_from_disk(DAEMON_CONFIG_PATH) {
Ok(config) => config,
Err(e) => panic!("Could not load config: {e:?}"),
};
@ -46,16 +38,7 @@ impl VMHandler {
state::Resources::new(&config.volumes)
}
};
Self {
new_vm_req_chan,
new_vm_resp_chan,
update_vm_req_chan,
update_vm_resp_chan,
delete_vm_chan,
resources_chan,
config,
res,
}
Self { receiver, sender, config, res }
}
fn get_available_ips(&self) -> (u32, u32) {
@ -85,8 +68,8 @@ impl VMHandler {
}
}
let avail_storage_gb = avail_storage_gb as u32;
let res = brain::NodeResourceReq {
node_pubkey: String::new(),
let res = snp_proto::NodeResources {
node_pubkey: PUBLIC_KEY.clone(),
avail_ports: (self.config.public_port_range.len() - self.res.reserved_ports.len())
as u32,
avail_ipv4,
@ -97,29 +80,33 @@ impl VMHandler {
max_ports_per_vm: self.config.max_ports_per_vm as u32,
};
debug!("sending node resources on brain: {res:?}");
let _ = self.resources_chan.send(res).await;
let _ = self.sender.send(res.into()).await;
}
async fn handle_new_vm_req(&mut self, new_vm_req: brain::NewVmReq) {
async fn handle_new_vm_req(&mut self, new_vm_req: snp_proto::NewVmReq) {
debug!("Processing new vm request: {new_vm_req:?}");
let uuid = new_vm_req.uuid.clone();
match state::VM::new(new_vm_req.into(), &self.config, &mut self.res) {
Ok(vm) => match vm.start() {
Ok(_) => {
info!("Succesfully started VM {uuid}");
let _ = self.new_vm_resp_chan.send(vm.into()).await;
let vm: snp_proto::NewVmResp = vm.into();
let _ = self.sender.send(vm.into()).await;
self.send_node_resources().await;
}
Err(e) => {
log::error!("Could not start VM {uuid}: {e:?}");
let _ = self
.new_vm_resp_chan
.send(brain::NewVmResp {
uuid,
error: "This node has an internal error. Choose another node."
.to_string(),
..Default::default()
})
.sender
.send(
snp_proto::NewVmResp {
uuid,
error: "This node has an internal error. Choose another node."
.to_string(),
..Default::default()
}
.into(),
)
.await;
}
},
@ -129,54 +116,62 @@ impl VMHandler {
"Got NewVmReq for VM {}, that already exist. Will send NewVmResp.",
vm.uuid
);
let _ = self.new_vm_resp_chan.send(vm.into()).await;
let vm: snp_proto::NewVmResp = vm.into();
let _ = self.sender.send(vm.into()).await;
}
_ => {
warn!("Refusing to service vm {uuid} due to error: {e:?}");
let _ = self
.new_vm_resp_chan
.send(brain::NewVmResp {
uuid,
error: format!("{e:?}"),
..Default::default()
})
.sender
.send(
snp_proto::NewVmResp {
uuid,
error: format!("{e:?}"),
..Default::default()
}
.into(),
)
.await;
}
},
}
}
async fn handle_update_vm_req(&mut self, update_vm_req: brain::UpdateVmReq) -> Result<()> {
async fn handle_update_vm_req(&mut self, update_vm_req: snp_proto::UpdateVmReq) -> Result<()> {
debug!("Processing update vm request: {update_vm_req:?}");
let vm_id = update_vm_req.uuid.clone();
let content =
std::fs::read_to_string(constants::VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
let mut vm: state::VM = serde_yaml::from_str(&content)?;
match vm.update(update_vm_req.into(), &self.config, &mut self.res) {
Ok(_) => {
info!("Succesfully updated VM {vm_id}");
let _ = self.update_vm_resp_chan.send(vm.into()).await;
let vm: snp_proto::UpdateVmResp = vm.into();
let _ = self.sender.send(vm.into()).await;
self.send_node_resources().await;
}
Err(e) => {
debug!("Unable to update vm {vm_id} due to error: {e:?}");
let _ = self
.update_vm_resp_chan
.send(brain::UpdateVmResp {
uuid: vm_id,
error: format!("{e:?}"),
..Default::default()
})
.sender
.send(
snp_proto::UpdateVmResp {
uuid: vm_id,
error: format!("{e:?}"),
..Default::default()
}
.into(),
)
.await;
}
};
Ok(())
}
fn handle_delete_vm(&mut self, delete_vm_req: brain::DeleteVmReq) -> Result<()> {
fn handle_delete_vm(&mut self, delete_vm_req: snp_proto::DeleteVmReq) -> Result<()> {
let vm_id = delete_vm_req.uuid;
let content =
std::fs::read_to_string(constants::VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
let vm: state::VM = serde_yaml::from_str(&content)?;
vm.delete(&mut self.res)?;
Ok(())
@ -184,17 +179,17 @@ impl VMHandler {
async fn run(mut self) {
self.send_node_resources().await;
loop {
tokio::select! {
Some(new_vm_req) = self.new_vm_req_chan.recv() => {
while let Some(brain_msg) = self.receiver.recv().await {
match brain_msg.msg {
Some(snp_proto::brain_message::Msg::NewVmReq(new_vm_req)) => {
self.handle_new_vm_req(new_vm_req).await;
}
Some(update_vm_req) = self.update_vm_req_chan.recv() => {
Some(snp_proto::brain_message::Msg::UpdateVmReq(update_vm_req)) => {
if let Err(e) = self.handle_update_vm_req(update_vm_req).await {
log::error!("Could not update vm: {e:?}");
}
}
Some(delete_vm_req) = self.delete_vm_chan.recv() => {
Some(snp_proto::brain_message::Msg::DeleteVm(delete_vm_req)) => {
let uuid = delete_vm_req.uuid.clone();
if let Err(e) = self.handle_delete_vm(delete_vm_req) {
log::error!("Could not delete vm {uuid}: {e:?}");
@ -202,20 +197,17 @@ impl VMHandler {
self.send_node_resources().await;
}
}
else => {
log::error!("All data channels closed.");
return;
}
None => debug!("Received None from the Brain."),
}
}
}
fn clear_deleted_contracts(&mut self, contracts: Vec<brain::VmContract>) {
fn clear_deleted_contracts(&mut self, contracts: Vec<snp_proto::Contract>) {
for uuid in self.res.existing_vms.clone() {
if contracts.iter().find(|c| c.uuid == uuid).is_none() {
info!("VM {uuid} exists locally but not found in brain. Deleting...");
let content = match std::fs::read_to_string(
crate::constants::VM_CONFIG_DIR.to_string() + &uuid + ".yaml",
VM_CONFIG_DIR.to_string() + &uuid + ".yaml",
) {
Ok(content) => content,
Err(e) => {
@ -244,25 +236,14 @@ async fn main() {
env_logger::builder().filter_level(log::LevelFilter::Debug).init();
loop {
let (newvm_tx, newvm_rx) = tokio::sync::mpsc::channel(6);
let (confirm_vm_tx, confirm_vm_rx) = tokio::sync::mpsc::channel(6);
let (updatevm_tx, updatevm_rx) = tokio::sync::mpsc::channel(6);
let (confirm_updatevm_tx, confirm_updatevm_rx) = tokio::sync::mpsc::channel(6);
let (delete_vm_tx, delete_vm_rx) = tokio::sync::mpsc::channel(6);
let (resources_tx, resources_rx) = tokio::sync::mpsc::channel(6);
let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6);
let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6);
let mut vm_handler = VMHandler::new(
newvm_rx,
confirm_vm_tx,
updatevm_rx,
confirm_updatevm_tx,
delete_vm_rx,
resources_tx,
);
let mut vm_handler = VMHandler::new(brain_msg_rx, daemon_msg_tx.clone());
let brain_url = vm_handler.config.brain_url.clone();
info!("Trying to get VM Contracts from Brain to see if some Contracts got removed...");
match grpc::list_contracts(brain_url.clone()).await {
info!("Registering with the brain and getting back VM Contracts (if they exist).");
match grpc::register_node(brain_url.clone()).await {
Ok(contracts) => vm_handler.clear_deleted_contracts(contracts),
Err(e) => log::error!("Could not get contracts from brain: {e:?}"),
};
@ -274,12 +255,9 @@ async fn main() {
info!("Connecting to brain...");
if let Err(e) = grpc::connect_and_run(grpc::ConnectionData {
brain_url,
newvm_tx,
confirm_vm_rx,
updatevm_tx,
confirm_updatevm_rx,
delete_vm_tx,
resources_rx,
brain_msg_tx,
daemon_msg_rx,
daemon_msg_tx,
})
.await
{

@ -1,9 +1,5 @@
#![allow(dead_code)]
use crate::{
config::Config,
constants::*,
grpc::brain,
};
use crate::{config::Config, global::*, grpc::snp_proto};
use anyhow::{anyhow, Result};
use log::info;
use serde::{Deserialize, Serialize};
@ -366,7 +362,7 @@ pub struct VM {
}
impl VM {
fn to_brain_vm_resp<T>(self, build_resp: impl FnOnce(brain::MeasurementArgs) -> T) -> T {
fn to_grpc_response<T>(self, build_resp: impl FnOnce(snp_proto::MeasurementArgs) -> T) -> T {
let mut nic_index: u32 = if self.fw_ports.is_empty() { 0 } else { 1 };
let mut ips = Vec::new();
let mut dtrfs_api_endpoint = String::new();
@ -377,7 +373,7 @@ impl VM {
if ip.address.parse::<std::net::Ipv4Addr>().is_ok() {
dtrfs_api_endpoint = ip.address.clone();
}
ips.push(brain::NewVmRespIp {
ips.push(snp_proto::MeasurementIp {
nic_index,
address: ip.address,
mask: ip.mask,
@ -390,24 +386,24 @@ impl VM {
if !dtrfs_api_endpoint.is_empty() {
dtrfs_api_endpoint += ":22";
} else {
dtrfs_api_endpoint += &format!(":{}", self.fw_ports[0].0);
dtrfs_api_endpoint += &format!("{}:{}", IP_INFO.ip, self.fw_ports[0].0);
}
let args = brain::MeasurementArgs {
let args = snp_proto::MeasurementArgs {
dtrfs_api_endpoint,
exposed_ports: self.fw_ports.iter().map(|(host_port, _)| *host_port as u32).collect(),
ips,
ovmf_hash: crate::constants::OVMF_HASH.to_string(),
ovmf_hash: OVMF_HASH.to_string(),
};
build_resp(args)
}
}
impl Into<brain::NewVmResp> for VM {
fn into(self) -> brain::NewVmResp {
impl Into<snp_proto::NewVmResp> for VM {
fn into(self) -> snp_proto::NewVmResp {
let uuid = self.uuid.clone();
self.to_brain_vm_resp(|args| brain::NewVmResp {
self.to_grpc_response(|args| snp_proto::NewVmResp {
uuid,
args: Some(args),
error: "".to_string(),
@ -415,10 +411,10 @@ impl Into<brain::NewVmResp> for VM {
}
}
impl Into<brain::UpdateVmResp> for VM {
fn into(self) -> brain::UpdateVmResp {
impl Into<snp_proto::UpdateVmResp> for VM {
fn into(self) -> snp_proto::UpdateVmResp {
let uuid = self.uuid.clone();
self.to_brain_vm_resp(|args| brain::UpdateVmResp {
self.to_grpc_response(|args| snp_proto::UpdateVmResp {
uuid,
args: Some(args),
error: "".to_string(),
@ -443,8 +439,8 @@ pub struct NewVMRequest {
dtrfs_sha: String,
}
impl From<brain::NewVmReq> for NewVMRequest {
fn from(req: brain::NewVmReq) -> Self {
impl From<snp_proto::NewVmReq> for NewVMRequest {
fn from(req: snp_proto::NewVmReq) -> Self {
Self {
uuid: req.uuid,
hostname: req.hostname,
@ -476,8 +472,8 @@ pub struct UpdateVMReq {
dtrfs_sha: String,
}
impl From<brain::UpdateVmReq> for UpdateVMReq {
fn from(req: brain::UpdateVmReq) -> Self {
impl From<snp_proto::UpdateVmReq> for UpdateVMReq {
fn from(req: snp_proto::UpdateVmReq) -> Self {
Self {
uuid: req.uuid,
vcpus: req.vcpus as usize,
@ -537,10 +533,9 @@ impl VM {
return Err(VMCreationErrors::DiskTooSmall);
}
if let Err(ovmf_err) = res.find_or_download_file(
crate::constants::OVMF_URL.to_string(),
crate::constants::OVMF_HASH.to_string(),
) {
if let Err(ovmf_err) =
res.find_or_download_file(OVMF_URL.to_string(), OVMF_HASH.to_string())
{
return Err(VMCreationErrors::BootFileError(format!(
"Could not get OVMF: {ovmf_err:?}"
)));