From abf09e8d26ce35f483c705eaecd7c9973b6820f5 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Sat, 18 Jan 2025 02:26:53 +0200 Subject: [PATCH] modified code based on new snp proto --- Cargo.lock | 5 +- Cargo.toml | 1 + build.rs | 2 +- brain.proto => snp.proto | 200 ++++++++++++++++++---------------- src/config.rs | 11 +- src/constants.rs | 16 --- src/global.rs | 75 +++++++++++++ src/grpc.rs | 228 ++++++++++----------------------------- src/main.rs | 156 ++++++++++++--------------- src/state.rs | 43 ++++---- 10 files changed, 333 insertions(+), 404 deletions(-) rename brain.proto => snp.proto (65%) delete mode 100644 src/constants.rs create mode 100644 src/global.rs diff --git a/Cargo.lock b/Cargo.lock index bd276d3..d7e8466 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -348,6 +348,7 @@ dependencies = [ "rand_core", "reqwest", "serde", + "serde_json", "serde_yaml", "sha2", "tokio", @@ -1522,9 +1523,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.134" +version = "1.0.135" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d" +checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9" dependencies = [ "itoa", "memchr", diff --git a/Cargo.toml b/Cargo.toml index ce8ef08..4c3c33a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ rand = "0.8.5" tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } tokio-stream = "0.1.17" tonic = "0.12" +serde_json = "1.0.135" [build-dependencies] tonic-build = "0.12" diff --git a/build.rs b/build.rs index aa8f7eb..d1b9140 100644 --- a/build.rs +++ b/build.rs @@ -1,6 +1,6 @@ fn main() { tonic_build::configure() .build_server(true) - .compile_protos(&["brain.proto"], &["proto"]) + .compile_protos(&["snp.proto"], &["proto"]) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); } diff --git a/brain.proto b/snp.proto similarity index 65% rename from brain.proto rename to snp.proto index a93883b..89f637b 100644 --- a/brain.proto +++ b/snp.proto @@ -1,81 +1,14 @@ syntax = "proto3"; -package brain; +package snp_proto; message Empty { } -message NodePubkey { - string node_pubkey = 1; +message Pubkey { + string pubkey = 1; } -message RegisterNodeReq { - string node_pubkey = 1; - string owner_pubkey = 2; -} - -message NodeResourceReq { - string node_pubkey = 1; - uint32 avail_ports = 2; - uint32 avail_ipv4 = 3; - uint32 avail_ipv6 = 4; - uint32 avail_vcpus = 5; - uint32 avail_memory_mb = 6; - uint32 avail_storage_gb = 7; - uint32 max_ports_per_vm = 8; -} - -message MeasurementArgs { - // this will be IP:Port of the dtrfs API - // actually not a measurement arg, but needed for the injector - string dtrfs_api_endpoint = 1; - repeated uint32 exposed_ports = 2; - string ovmf_hash = 5; - // This is needed to allow the CLI to build the kernel params from known data. - // The CLI will use the kernel params to get the measurement. - repeated NewVmRespIP ips = 6; -} - -message NewVMReq { - string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID - string hostname = 2; - string admin_pubkey = 3; - string node_pubkey = 4; - repeated uint32 extra_ports = 5; - bool public_ipv4 = 6; - bool public_ipv6 = 7; - uint32 disk_size_gb = 8; - uint32 vcpus = 9; - uint32 memory_mb = 10; - string kernel_url = 11; - string kernel_sha = 12; - string dtrfs_url = 13; - string dtrfs_sha = 14; -} - -message NewVMResp { - string uuid = 1; - string error = 2; - MeasurementArgs args = 3; -} - -message UpdateVMReq { - string uuid = 1; - uint32 disk_size_gb = 2; - uint32 vcpus = 3; - uint32 memory_mb = 4; - string kernel_url = 5; - string kernel_sha = 6; - string dtrfs_url = 7; - string dtrfs_sha = 8; -} - -message UpdateVMResp { - string uuid = 1; - string error = 2; - MeasurementArgs args = 3; -} - -message VMContract { +message Contract { string uuid = 1; string hostname = 2; string admin_pubkey = 3; @@ -92,33 +25,115 @@ message VMContract { string updated_at = 14; } -message ListVMContractsReq { - string admin_pubkey = 1; - string node_pubkey = 2; - string uuid = 3; +message MeasurementArgs { + // this will be IP:Port of the dtrfs API + // actually not a measurement arg, but needed for the injector + string dtrfs_api_endpoint = 1; + repeated uint32 exposed_ports = 2; + string ovmf_hash = 5; + // This is needed to allow the CLI to build the kernel params from known data. + // The CLI will use the kernel params to get the measurement. + repeated MeasurementIP ips = 6; } -message NewVmRespIP { +message MeasurementIP { uint32 nic_index = 1; string address = 2; string mask = 3; string gateway = 4; } -message DeleteVMReq { +message RegisterNodeReq { + string node_pubkey = 1; + string owner_pubkey = 2; + string main_ip = 3; + string country = 7; + string region = 8; + string city = 9; +} + +message NodeResources { + string node_pubkey = 1; + uint32 avail_ports = 2; + uint32 avail_ipv4 = 3; + uint32 avail_ipv6 = 4; + uint32 avail_vcpus = 5; + uint32 avail_memory_mb = 6; + uint32 avail_storage_gb = 7; + uint32 max_ports_per_vm = 8; +} + +message NewVmReq { + string uuid = 1; + string hostname = 2; + string admin_pubkey = 3; + string node_pubkey = 4; + repeated uint32 extra_ports = 5; + bool public_ipv4 = 6; + bool public_ipv6 = 7; + uint32 disk_size_gb = 8; + uint32 vcpus = 9; + uint32 memory_mb = 10; + string kernel_url = 11; + string kernel_sha = 12; + string dtrfs_url = 13; + string dtrfs_sha = 14; +} + +message NewVmResp { + string uuid = 1; + string error = 2; + MeasurementArgs args = 3; +} + +message UpdateVmReq { + string uuid = 1; + uint32 disk_size_gb = 2; + uint32 vcpus = 3; + uint32 memory_mb = 4; + string kernel_url = 5; + string kernel_sha = 6; + string dtrfs_url = 7; + string dtrfs_sha = 8; +} + +message UpdateVmResp { + string uuid = 1; + string error = 2; + MeasurementArgs args = 3; +} + +message DeleteVmReq { string uuid = 1; } -service BrainDaemonService { - rpc RegisterNode (RegisterNodeReq) returns (Empty); - rpc SendNodeResources (stream NodeResourceReq) returns (Empty); - rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); - rpc SendNewVMResp (stream NewVMResp) returns (Empty); - rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); - rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); - rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); - rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); - //rpc GetMeasurementArgs (ListVMContractsReq) returns (stream MeasurementArgs); +message BrainMessage { + oneof Msg { + NewVmReq new_vm_req = 1; + UpdateVmReq update_vm_req = 2; + DeleteVmReq delete_vm = 3; + } +} + +message DaemonMessage { + oneof Msg { + Pubkey pubkey = 1; + NewVmResp new_vm_resp = 2; + UpdateVmResp update_vm_resp = 3; + NodeResources node_resources = 4; + } +} + +service BrainDaemon { + rpc RegisterNode (RegisterNodeReq) returns (stream Contract); + rpc BrainMessages (Pubkey) returns (stream BrainMessage); + rpc DaemonMessages (stream DaemonMessage) returns (Empty); +} + +message ListContractsReq { + string admin_pubkey = 1; + string node_pubkey = 2; + string uuid = 3; } message NodeFilters { @@ -144,12 +159,11 @@ message NodeListResp { uint32 provider_rating = 7; } -service BrainCliService { - rpc CreateVMContract (NewVMReq) returns (NewVMResp); - rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); +service BrainCli { + rpc NewVm (NewVmReq) returns (NewVmResp); + rpc ListContracts (ListContractsReq) returns (stream Contract); rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc GetOneNode (NodeFilters) returns (NodeListResp); - rpc DeleteVM (DeleteVMReq) returns (Empty); - rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); - //rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs); + rpc DeleteVm (DeleteVmReq) returns (Empty); + rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp); } diff --git a/src/config.rs b/src/config.rs index 925efb9..0dc7664 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use anyhow::Result; use serde::Deserialize; use std::{ @@ -57,17 +56,9 @@ pub struct Config { } mod range_format { - use serde::{Deserialize, Deserializer, Serialize, Serializer}; + use serde::{Deserialize, Deserializer, Serialize}; use std::ops::Range; - pub fn serialize(range: &Range, serializer: S) -> Result - where - S: Serializer, - { - let range_repr = RangeRepr { start: range.start, end: range.end }; - range_repr.serialize(serializer) - } - pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de>, diff --git a/src/constants.rs b/src/constants.rs deleted file mode 100644 index 74b763f..0000000 --- a/src/constants.rs +++ /dev/null @@ -1,16 +0,0 @@ -#![allow(dead_code)] - -pub(crate) const DEFAULT_OVMF: &str = "/usr/share/edk2/ovmf/OVMF.amdsev.fd"; -pub(crate) const VM_BOOT_DIR: &str = "/var/lib/detee/boot/"; -pub(crate) const USED_RESOURCES: &str = "/etc/detee/daemon/used_resources.yaml"; -pub(crate) const VM_CONFIG_DIR: &str = "/etc/detee/daemon/vms/"; -pub(crate) const SECRET_KEY_PATH: &str = "/etc/detee/daemon/node_secret_key.pem"; -pub(crate) const DAEMON_CONFIG_PATH: &str = "/etc/detee/daemon/config.yaml"; -pub(crate) const START_VM_SCRIPT: &str = "/usr/local/bin/detee/start_qemu_vm.sh"; -// TODO: research if other CPU types provide better performance -pub(crate) const QEMU_VM_CPU_TYPE: &str = "EPYC-v4"; -// If you modify this, also modify scripts/start_qemu_vm.sh -pub(crate) const OVMF_HASH: &str = - "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76"; -pub(crate) const OVMF_URL: &str = - "https://drive.google.com/uc?export=download&id=1V-vLkaiLaGmFSjrN84Z6nELQOxKNAoSJ"; diff --git a/src/global.rs b/src/global.rs new file mode 100644 index 0000000..c3fff0c --- /dev/null +++ b/src/global.rs @@ -0,0 +1,75 @@ +use anyhow::Result; +use lazy_static::lazy_static; +use log::{info, warn}; +use std::{fs::File, io::Write}; + +pub(crate) const VM_BOOT_DIR: &str = "/var/lib/detee/boot/"; +pub(crate) const USED_RESOURCES: &str = "/etc/detee/daemon/used_resources.yaml"; +pub(crate) const VM_CONFIG_DIR: &str = "/etc/detee/daemon/vms/"; +pub(crate) const SECRET_KEY_PATH: &str = "/etc/detee/daemon/node_secret_key.pem"; +pub(crate) const DAEMON_CONFIG_PATH: &str = "/etc/detee/daemon/config.yaml"; +pub(crate) const START_VM_SCRIPT: &str = "/usr/local/bin/detee/start_qemu_vm.sh"; +// TODO: research if other CPU types provide better performance +pub(crate) const QEMU_VM_CPU_TYPE: &str = "EPYC-v4"; +// If you modify this, also modify scripts/start_qemu_vm.sh +pub(crate) const OVMF_HASH: &str = + "0346619257269b9a61ee003e197d521b8e2283483070d163a34940d6a1d40d76"; +pub(crate) const OVMF_URL: &str = + "https://drive.google.com/uc?export=download&id=1V-vLkaiLaGmFSjrN84Z6nELQOxKNAoSJ"; + +lazy_static! { + pub static ref PUBLIC_KEY: String = get_public_key(); + pub static ref IP_INFO: IPInfo = get_ip_info().unwrap(); +} + +fn create_secret_key() -> Result { + use ed25519_dalek::pkcs8::{spki::der::pem::LineEnding, EncodePrivateKey}; + let key_path = SECRET_KEY_PATH; + info!("Creating new secret key at {}", key_path); + let sk = ed25519_dalek::SigningKey::generate(&mut rand_core::OsRng); + let sk_pem = sk.to_pkcs8_pem(LineEnding::default()).unwrap(); + let mut file = File::create(key_path)?; + file.write_all(sk_pem.as_bytes())?; + Ok(sk) +} + +fn load_secret_key() -> Result { + use ed25519_dalek::pkcs8::DecodePrivateKey; + let secret_key_pem = match std::fs::read_to_string(SECRET_KEY_PATH) { + Ok(secret_key_pem) => secret_key_pem, + Err(e) => { + warn!("Could not load secret key due to error: {e:?}"); + return Ok(create_secret_key()?); + } + }; + Ok(ed25519_dalek::SigningKey::from_pkcs8_pem(&secret_key_pem)?) +} + +pub fn get_public_key() -> String { + use ed25519_dalek::pkcs8::{spki::der::pem::LineEnding, EncodePublicKey}; + let pubkey = load_secret_key() + .unwrap() + .verifying_key() + .to_public_key_pem(LineEnding::default()) + .unwrap() + .lines() + .nth(1) + .unwrap() + .to_string(); + log::info!("Loaded the following public key: {pubkey}"); + pubkey +} + +#[derive(serde::Deserialize, Clone)] +pub struct IPInfo { + pub country: String, + pub region: String, + pub city: String, + pub ip: String, +} + +fn get_ip_info() -> anyhow::Result { + let body = reqwest::blocking::get("https://ipinfo.io/".to_string())?.text()?; + info!("Got the following data from ipinfo.io: {body}"); + Ok(serde_json::de::from_str(&body)?) +} diff --git a/src/grpc.rs b/src/grpc.rs index 94a8975..81be01a 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,76 +1,53 @@ -#![allow(dead_code)] -pub mod brain { - tonic::include_proto!("brain"); -} - +use crate::snp_proto::DaemonMessage; use anyhow::Result; -use brain::{ - brain_daemon_service_client::BrainDaemonServiceClient, DeleteVmReq, ListVmContractsReq, - NewVmReq, NewVmResp, NodePubkey, NodeResourceReq, RegisterNodeReq, UpdateVmReq, UpdateVmResp, - VmContract, +use log::{debug, info, warn}; +use snp_proto::{ + brain_daemon_client::BrainDaemonClient, BrainMessage, Contract, Pubkey, RegisterNodeReq, }; -use lazy_static::lazy_static; -use log::{debug, error, info, warn}; -use std::{fs::File, io::Write}; use tokio::{ sync::mpsc::{Receiver, Sender}, task::JoinSet, }; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::transport::Channel; +use crate::global::*; -lazy_static! { - static ref PUBLIC_KEY: String = get_public_key(); +pub mod snp_proto { + tonic::include_proto!("snp_proto"); } -fn create_secret_key() -> Result { - use ed25519_dalek::pkcs8::{spki::der::pem::LineEnding, EncodePrivateKey}; - let key_path = crate::constants::SECRET_KEY_PATH; - info!("Creating new secret key at {}", key_path); - let sk = ed25519_dalek::SigningKey::generate(&mut rand_core::OsRng); - let sk_pem = sk.to_pkcs8_pem(LineEnding::default()).unwrap(); - let mut file = File::create(key_path)?; - file.write_all(sk_pem.as_bytes())?; - Ok(sk) +impl From for snp_proto::DaemonMessage { + fn from(value: snp_proto::NewVmResp) -> Self { + snp_proto::DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::NewVmResp(value)) } + } } -fn load_secret_key() -> Result { - use ed25519_dalek::pkcs8::DecodePrivateKey; - let secret_key_pem = match std::fs::read_to_string(crate::constants::SECRET_KEY_PATH) { - Ok(secret_key_pem) => secret_key_pem, - Err(e) => { - warn!("Could not load secret key due to error: {e:?}"); - return Ok(create_secret_key()?); - } +impl From for snp_proto::DaemonMessage { + fn from(value: snp_proto::UpdateVmResp) -> Self { + snp_proto::DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::UpdateVmResp(value)) } + } +} + +impl From for snp_proto::DaemonMessage { + fn from(value: snp_proto::NodeResources) -> Self { + snp_proto::DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::NodeResources(value)) } + } +} + +pub async fn register_node(brain_url: String) -> Result> { + let mut client = BrainDaemonClient::connect(brain_url).await?; + debug!("Starting node registration..."); + let ip_info = IP_INFO.clone(); + let req = RegisterNodeReq { + node_pubkey: PUBLIC_KEY.clone(), + owner_pubkey: "IamTheOwnerOf".to_string() + &PUBLIC_KEY, + main_ip: ip_info.ip, + country: ip_info.country, + region: ip_info.region, + city: ip_info.city, }; - Ok(ed25519_dalek::SigningKey::from_pkcs8_pem(&secret_key_pem)?) -} - -pub fn get_public_key() -> String { - use ed25519_dalek::pkcs8::{spki::der::pem::LineEnding, EncodePublicKey}; - let pubkey = load_secret_key() - .unwrap() - .verifying_key() - .to_public_key_pem(LineEnding::default()) - .unwrap() - .lines() - .nth(1) - .unwrap() - .to_string(); - log::info!("Loaded the following public key: {pubkey}"); - pubkey -} - -pub async fn list_contracts(brain_url: String) -> Result> { - let mut client = BrainDaemonServiceClient::connect(brain_url).await?; let mut contracts = Vec::new(); - let mut grpc_stream = client - .list_vm_contracts(ListVmContractsReq { - node_pubkey: PUBLIC_KEY.to_string(), - ..Default::default() - }) - .await? - .into_inner(); + let mut grpc_stream = client.register_node(req).await?.into_inner(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(node) => { @@ -86,18 +63,18 @@ pub async fn list_contracts(brain_url: String) -> Result> { Ok(contracts) } -async fn listen_for_new_vm_reqs( - mut client: BrainDaemonServiceClient, - tx: Sender, +async fn receive_messages( + mut client: BrainDaemonClient, + tx: Sender, ) -> Result<()> { - debug!("starting listen_for_new_vm_reqs"); - let node_pubkey = PUBLIC_KEY.clone(); - let mut grpc_stream = client.get_new_vm_reqs(NodePubkey { node_pubkey }).await?.into_inner(); + debug!("starting to listen for messages from brain"); + let pubkey = PUBLIC_KEY.clone(); + let mut grpc_stream = client.brain_messages(Pubkey { pubkey }).await?.into_inner(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { - Ok(req) => { - info!("Received new vm request: {req:?}"); - let _ = tx.send(req).await; + Ok(msg) => { + info!("Received message from brain: {msg:?}"); + let _ = tx.send(msg).await; } Err(e) => { warn!("Brain disconnected from listen_for_new_vm_reqs: {e}"); @@ -108,121 +85,34 @@ async fn listen_for_new_vm_reqs( Ok(()) } -async fn send_newvm_resp( - mut client: BrainDaemonServiceClient, - rx: Receiver, +async fn send_messages( + mut client: BrainDaemonClient, + rx: Receiver, + tx: Sender, ) -> Result<()> { - debug!("starting send_newvm_resp stream"); + debug!("starting daemon message stream to brain"); + let pubkey = PUBLIC_KEY.clone(); let rx_stream = ReceiverStream::new(rx); - client.send_new_vm_resp(rx_stream).await?; + tx.send(DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::Pubkey(Pubkey { pubkey })) }) + .await?; + client.daemon_messages(rx_stream).await?; debug!("send_newvm_resp is about to exit"); Ok(()) } -async fn send_node_resources( - mut client: BrainDaemonServiceClient, - rx: Receiver, -) -> Result<()> { - debug!("starting send_newvm_resp stream"); - let rx_stream = ReceiverStream::new(rx).map(|mut node_resources| { - node_resources.node_pubkey = get_public_key(); - node_resources - }); - client.send_node_resources(rx_stream).await?; - debug!("send_newvm_resp is about to exit"); - Ok(()) -} - -async fn register_node(mut client: BrainDaemonServiceClient) { - debug!("Starting node registration..."); - let req = RegisterNodeReq { - node_pubkey: PUBLIC_KEY.clone(), - owner_pubkey: "IamTheOwnerOf".to_string() + &PUBLIC_KEY, - }; - match client.register_node(req).await { - Ok(_) => { - info!("Registered as 10.0.10.1 from Bruma/Cyrodiil with ID {}", PUBLIC_KEY.clone()) - } - Err(e) => error!("Could not register node data: {e:?}"), - }; -} - -async fn listen_for_deleted_vms( - mut client: BrainDaemonServiceClient, - tx: Sender, -) -> Result<()> { - debug!("starting listen_for_new_vm_reqs"); - let node_pubkey = PUBLIC_KEY.clone(); - let mut grpc_stream = client.get_delete_vm_req(NodePubkey { node_pubkey }).await?.into_inner(); - while let Some(stream_update) = grpc_stream.next().await { - match stream_update { - Ok(req) => { - info!("Received delete vm request: {req:?}"); - let _ = tx.send(req).await; - } - Err(e) => { - warn!("Brain disconnected from listen_for_deleted_vms: {e}"); - } - } - } - debug!("listen_for_new_vm_reqs is about to exit"); - Ok(()) -} - -async fn listen_for_update_vm_reqs( - mut client: BrainDaemonServiceClient, - tx: Sender, -) -> Result<()> { - debug!("starting listen_for_update_vm_reqs"); - let node_pubkey = PUBLIC_KEY.clone(); - let mut grpc_stream = client.get_update_vm_req(NodePubkey { node_pubkey }).await?.into_inner(); - while let Some(stream_update) = grpc_stream.next().await { - match stream_update { - Ok(req) => { - info!("Received update vm request: {req:?}"); - let _ = tx.send(req).await; - } - Err(e) => { - warn!("Brain disconnected from listen_for_update_vm_reqs: {e}"); - } - } - } - debug!("listen_for_update_vm_reqs is about to exit"); - Ok(()) -} - -async fn send_updatevm_resp( - mut client: BrainDaemonServiceClient, - rx: Receiver, -) -> Result<()> { - debug!("starting send_updatevm_resp stream"); - let rx_stream = ReceiverStream::new(rx); - client.send_update_vm_resp(rx_stream).await?; - debug!("send_updatevm_resp is about to exit"); - Ok(()) -} - pub struct ConnectionData { pub brain_url: String, - pub newvm_tx: Sender, - pub confirm_vm_rx: Receiver, - pub updatevm_tx: Sender, - pub confirm_updatevm_rx: Receiver, - pub delete_vm_tx: Sender, - pub resources_rx: Receiver, + pub brain_msg_tx: Sender, + pub daemon_msg_rx: Receiver, + pub daemon_msg_tx: Sender, } pub async fn connect_and_run(cd: ConnectionData) -> Result<()> { - let client = BrainDaemonServiceClient::connect(cd.brain_url).await?; + let client = BrainDaemonClient::connect(cd.brain_url).await?; let mut streaming_tasks = JoinSet::new(); - register_node(client.clone()).await; - streaming_tasks.spawn(listen_for_new_vm_reqs(client.clone(), cd.newvm_tx)); - streaming_tasks.spawn(send_newvm_resp(client.clone(), cd.confirm_vm_rx)); - streaming_tasks.spawn(listen_for_update_vm_reqs(client.clone(), cd.updatevm_tx)); - streaming_tasks.spawn(send_updatevm_resp(client.clone(), cd.confirm_updatevm_rx)); - streaming_tasks.spawn(listen_for_deleted_vms(client.clone(), cd.delete_vm_tx)); - streaming_tasks.spawn(send_node_resources(client.clone(), cd.resources_rx)); + streaming_tasks.spawn(receive_messages(client.clone(), cd.brain_msg_tx)); + streaming_tasks.spawn(send_messages(client.clone(), cd.daemon_msg_rx, cd.daemon_msg_tx)); let task_output = streaming_tasks.join_next().await; warn!("One stream exited: {task_output:?}"); diff --git a/src/main.rs b/src/main.rs index 04bd53c..2e2a898 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,10 @@ -#[allow(dead_code)] mod config; -mod constants; +mod global; mod grpc; mod state; -use crate::{config::Config, grpc::brain}; +use crate::global::*; +use crate::{config::Config, grpc::snp_proto}; use anyhow::Result; use log::{debug, info, warn}; use tokio::{ @@ -14,12 +14,8 @@ use tokio::{ #[allow(dead_code)] struct VMHandler { - new_vm_req_chan: Receiver, - new_vm_resp_chan: Sender, - update_vm_req_chan: Receiver, - update_vm_resp_chan: Sender, - delete_vm_chan: Receiver, - resources_chan: Sender, + receiver: Receiver, + sender: Sender, config: Config, res: state::Resources, } @@ -27,14 +23,10 @@ struct VMHandler { #[allow(dead_code)] impl VMHandler { fn new( - new_vm_req_chan: Receiver, - new_vm_resp_chan: Sender, - update_vm_req_chan: Receiver, - update_vm_resp_chan: Sender, - delete_vm_chan: Receiver, - resources_chan: Sender, + receiver: Receiver, + sender: Sender, ) -> Self { - let config = match Config::load_from_disk(crate::constants::DAEMON_CONFIG_PATH) { + let config = match Config::load_from_disk(DAEMON_CONFIG_PATH) { Ok(config) => config, Err(e) => panic!("Could not load config: {e:?}"), }; @@ -46,16 +38,7 @@ impl VMHandler { state::Resources::new(&config.volumes) } }; - Self { - new_vm_req_chan, - new_vm_resp_chan, - update_vm_req_chan, - update_vm_resp_chan, - delete_vm_chan, - resources_chan, - config, - res, - } + Self { receiver, sender, config, res } } fn get_available_ips(&self) -> (u32, u32) { @@ -85,8 +68,8 @@ impl VMHandler { } } let avail_storage_gb = avail_storage_gb as u32; - let res = brain::NodeResourceReq { - node_pubkey: String::new(), + let res = snp_proto::NodeResources { + node_pubkey: PUBLIC_KEY.clone(), avail_ports: (self.config.public_port_range.len() - self.res.reserved_ports.len()) as u32, avail_ipv4, @@ -97,29 +80,33 @@ impl VMHandler { max_ports_per_vm: self.config.max_ports_per_vm as u32, }; debug!("sending node resources on brain: {res:?}"); - let _ = self.resources_chan.send(res).await; + let _ = self.sender.send(res.into()).await; } - async fn handle_new_vm_req(&mut self, new_vm_req: brain::NewVmReq) { + async fn handle_new_vm_req(&mut self, new_vm_req: snp_proto::NewVmReq) { debug!("Processing new vm request: {new_vm_req:?}"); let uuid = new_vm_req.uuid.clone(); match state::VM::new(new_vm_req.into(), &self.config, &mut self.res) { Ok(vm) => match vm.start() { Ok(_) => { info!("Succesfully started VM {uuid}"); - let _ = self.new_vm_resp_chan.send(vm.into()).await; + let vm: snp_proto::NewVmResp = vm.into(); + let _ = self.sender.send(vm.into()).await; self.send_node_resources().await; } Err(e) => { log::error!("Could not start VM {uuid}: {e:?}"); let _ = self - .new_vm_resp_chan - .send(brain::NewVmResp { - uuid, - error: "This node has an internal error. Choose another node." - .to_string(), - ..Default::default() - }) + .sender + .send( + snp_proto::NewVmResp { + uuid, + error: "This node has an internal error. Choose another node." + .to_string(), + ..Default::default() + } + .into(), + ) .await; } }, @@ -129,54 +116,62 @@ impl VMHandler { "Got NewVmReq for VM {}, that already exist. Will send NewVmResp.", vm.uuid ); - let _ = self.new_vm_resp_chan.send(vm.into()).await; + let vm: snp_proto::NewVmResp = vm.into(); + let _ = self.sender.send(vm.into()).await; } _ => { warn!("Refusing to service vm {uuid} due to error: {e:?}"); let _ = self - .new_vm_resp_chan - .send(brain::NewVmResp { - uuid, - error: format!("{e:?}"), - ..Default::default() - }) + .sender + .send( + snp_proto::NewVmResp { + uuid, + error: format!("{e:?}"), + ..Default::default() + } + .into(), + ) .await; } }, } } - async fn handle_update_vm_req(&mut self, update_vm_req: brain::UpdateVmReq) -> Result<()> { + async fn handle_update_vm_req(&mut self, update_vm_req: snp_proto::UpdateVmReq) -> Result<()> { debug!("Processing update vm request: {update_vm_req:?}"); let vm_id = update_vm_req.uuid.clone(); let content = - std::fs::read_to_string(constants::VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; + std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; let mut vm: state::VM = serde_yaml::from_str(&content)?; match vm.update(update_vm_req.into(), &self.config, &mut self.res) { Ok(_) => { info!("Succesfully updated VM {vm_id}"); - let _ = self.update_vm_resp_chan.send(vm.into()).await; + let vm: snp_proto::UpdateVmResp = vm.into(); + let _ = self.sender.send(vm.into()).await; self.send_node_resources().await; } Err(e) => { debug!("Unable to update vm {vm_id} due to error: {e:?}"); let _ = self - .update_vm_resp_chan - .send(brain::UpdateVmResp { - uuid: vm_id, - error: format!("{e:?}"), - ..Default::default() - }) + .sender + .send( + snp_proto::UpdateVmResp { + uuid: vm_id, + error: format!("{e:?}"), + ..Default::default() + } + .into(), + ) .await; } }; Ok(()) } - fn handle_delete_vm(&mut self, delete_vm_req: brain::DeleteVmReq) -> Result<()> { + fn handle_delete_vm(&mut self, delete_vm_req: snp_proto::DeleteVmReq) -> Result<()> { let vm_id = delete_vm_req.uuid; let content = - std::fs::read_to_string(constants::VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; + std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; let vm: state::VM = serde_yaml::from_str(&content)?; vm.delete(&mut self.res)?; Ok(()) @@ -184,17 +179,17 @@ impl VMHandler { async fn run(mut self) { self.send_node_resources().await; - loop { - tokio::select! { - Some(new_vm_req) = self.new_vm_req_chan.recv() => { + while let Some(brain_msg) = self.receiver.recv().await { + match brain_msg.msg { + Some(snp_proto::brain_message::Msg::NewVmReq(new_vm_req)) => { self.handle_new_vm_req(new_vm_req).await; } - Some(update_vm_req) = self.update_vm_req_chan.recv() => { + Some(snp_proto::brain_message::Msg::UpdateVmReq(update_vm_req)) => { if let Err(e) = self.handle_update_vm_req(update_vm_req).await { log::error!("Could not update vm: {e:?}"); } } - Some(delete_vm_req) = self.delete_vm_chan.recv() => { + Some(snp_proto::brain_message::Msg::DeleteVm(delete_vm_req)) => { let uuid = delete_vm_req.uuid.clone(); if let Err(e) = self.handle_delete_vm(delete_vm_req) { log::error!("Could not delete vm {uuid}: {e:?}"); @@ -202,20 +197,17 @@ impl VMHandler { self.send_node_resources().await; } } - else => { - log::error!("All data channels closed."); - return; - } + None => debug!("Received None from the Brain."), } } } - fn clear_deleted_contracts(&mut self, contracts: Vec) { + fn clear_deleted_contracts(&mut self, contracts: Vec) { for uuid in self.res.existing_vms.clone() { if contracts.iter().find(|c| c.uuid == uuid).is_none() { info!("VM {uuid} exists locally but not found in brain. Deleting..."); let content = match std::fs::read_to_string( - crate::constants::VM_CONFIG_DIR.to_string() + &uuid + ".yaml", + VM_CONFIG_DIR.to_string() + &uuid + ".yaml", ) { Ok(content) => content, Err(e) => { @@ -244,25 +236,14 @@ async fn main() { env_logger::builder().filter_level(log::LevelFilter::Debug).init(); loop { - let (newvm_tx, newvm_rx) = tokio::sync::mpsc::channel(6); - let (confirm_vm_tx, confirm_vm_rx) = tokio::sync::mpsc::channel(6); - let (updatevm_tx, updatevm_rx) = tokio::sync::mpsc::channel(6); - let (confirm_updatevm_tx, confirm_updatevm_rx) = tokio::sync::mpsc::channel(6); - let (delete_vm_tx, delete_vm_rx) = tokio::sync::mpsc::channel(6); - let (resources_tx, resources_rx) = tokio::sync::mpsc::channel(6); + let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6); + let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6); - let mut vm_handler = VMHandler::new( - newvm_rx, - confirm_vm_tx, - updatevm_rx, - confirm_updatevm_tx, - delete_vm_rx, - resources_tx, - ); + let mut vm_handler = VMHandler::new(brain_msg_rx, daemon_msg_tx.clone()); let brain_url = vm_handler.config.brain_url.clone(); - info!("Trying to get VM Contracts from Brain to see if some Contracts got removed..."); - match grpc::list_contracts(brain_url.clone()).await { + info!("Registering with the brain and getting back VM Contracts (if they exist)."); + match grpc::register_node(brain_url.clone()).await { Ok(contracts) => vm_handler.clear_deleted_contracts(contracts), Err(e) => log::error!("Could not get contracts from brain: {e:?}"), }; @@ -274,12 +255,9 @@ async fn main() { info!("Connecting to brain..."); if let Err(e) = grpc::connect_and_run(grpc::ConnectionData { brain_url, - newvm_tx, - confirm_vm_rx, - updatevm_tx, - confirm_updatevm_rx, - delete_vm_tx, - resources_rx, + brain_msg_tx, + daemon_msg_rx, + daemon_msg_tx, }) .await { diff --git a/src/state.rs b/src/state.rs index a03f73e..d0ef295 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,9 +1,5 @@ #![allow(dead_code)] -use crate::{ - config::Config, - constants::*, - grpc::brain, -}; +use crate::{config::Config, global::*, grpc::snp_proto}; use anyhow::{anyhow, Result}; use log::info; use serde::{Deserialize, Serialize}; @@ -366,7 +362,7 @@ pub struct VM { } impl VM { - fn to_brain_vm_resp(self, build_resp: impl FnOnce(brain::MeasurementArgs) -> T) -> T { + fn to_grpc_response(self, build_resp: impl FnOnce(snp_proto::MeasurementArgs) -> T) -> T { let mut nic_index: u32 = if self.fw_ports.is_empty() { 0 } else { 1 }; let mut ips = Vec::new(); let mut dtrfs_api_endpoint = String::new(); @@ -377,7 +373,7 @@ impl VM { if ip.address.parse::().is_ok() { dtrfs_api_endpoint = ip.address.clone(); } - ips.push(brain::NewVmRespIp { + ips.push(snp_proto::MeasurementIp { nic_index, address: ip.address, mask: ip.mask, @@ -390,24 +386,24 @@ impl VM { if !dtrfs_api_endpoint.is_empty() { dtrfs_api_endpoint += ":22"; } else { - dtrfs_api_endpoint += &format!(":{}", self.fw_ports[0].0); + dtrfs_api_endpoint += &format!("{}:{}", IP_INFO.ip, self.fw_ports[0].0); } - let args = brain::MeasurementArgs { + let args = snp_proto::MeasurementArgs { dtrfs_api_endpoint, exposed_ports: self.fw_ports.iter().map(|(host_port, _)| *host_port as u32).collect(), ips, - ovmf_hash: crate::constants::OVMF_HASH.to_string(), + ovmf_hash: OVMF_HASH.to_string(), }; build_resp(args) } } -impl Into for VM { - fn into(self) -> brain::NewVmResp { +impl Into for VM { + fn into(self) -> snp_proto::NewVmResp { let uuid = self.uuid.clone(); - self.to_brain_vm_resp(|args| brain::NewVmResp { + self.to_grpc_response(|args| snp_proto::NewVmResp { uuid, args: Some(args), error: "".to_string(), @@ -415,10 +411,10 @@ impl Into for VM { } } -impl Into for VM { - fn into(self) -> brain::UpdateVmResp { +impl Into for VM { + fn into(self) -> snp_proto::UpdateVmResp { let uuid = self.uuid.clone(); - self.to_brain_vm_resp(|args| brain::UpdateVmResp { + self.to_grpc_response(|args| snp_proto::UpdateVmResp { uuid, args: Some(args), error: "".to_string(), @@ -443,8 +439,8 @@ pub struct NewVMRequest { dtrfs_sha: String, } -impl From for NewVMRequest { - fn from(req: brain::NewVmReq) -> Self { +impl From for NewVMRequest { + fn from(req: snp_proto::NewVmReq) -> Self { Self { uuid: req.uuid, hostname: req.hostname, @@ -476,8 +472,8 @@ pub struct UpdateVMReq { dtrfs_sha: String, } -impl From for UpdateVMReq { - fn from(req: brain::UpdateVmReq) -> Self { +impl From for UpdateVMReq { + fn from(req: snp_proto::UpdateVmReq) -> Self { Self { uuid: req.uuid, vcpus: req.vcpus as usize, @@ -537,10 +533,9 @@ impl VM { return Err(VMCreationErrors::DiskTooSmall); } - if let Err(ovmf_err) = res.find_or_download_file( - crate::constants::OVMF_URL.to_string(), - crate::constants::OVMF_HASH.to_string(), - ) { + if let Err(ovmf_err) = + res.find_or_download_file(OVMF_URL.to_string(), OVMF_HASH.to_string()) + { return Err(VMCreationErrors::BootFileError(format!( "Could not get OVMF: {ovmf_err:?}" )));