rename structs so that they say "VM"

This commit is contained in:
ghe0 2025-02-09 00:30:01 +02:00
parent 0ed21fdf98
commit 76c88810a6
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
4 changed files with 67 additions and 53 deletions

@ -1,6 +1,6 @@
fn main() { fn main() {
tonic_build::configure() tonic_build::configure()
.build_server(true) .build_server(true)
.compile_protos(&["snp.proto"], &["proto"]) .compile_protos(&["vm.proto"], &["proto"])
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
} }

@ -1,8 +1,8 @@
use crate::global::*; use crate::global::*;
use crate::snp_proto::DaemonMessage; use crate::snp_proto::VmDaemonMessage;
use anyhow::Result; use anyhow::Result;
use log::{debug, info, warn}; use log::{debug, info, warn};
use snp_proto::{brain_daemon_client::BrainDaemonClient, BrainMessage, Contract, RegisterNodeReq}; use snp_proto::{brain_vm_daemon_client::BrainVmDaemonClient, BrainVmMessage, VmContract, RegisterVmNodeReq};
use tokio::{ use tokio::{
sync::mpsc::{Receiver, Sender}, sync::mpsc::{Receiver, Sender},
task::JoinSet, task::JoinSet,
@ -11,34 +11,34 @@ use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tonic::transport::Channel; use tonic::transport::Channel;
pub mod snp_proto { pub mod snp_proto {
tonic::include_proto!("snp_proto"); tonic::include_proto!("vm_proto");
} }
impl From<snp_proto::NewVmResp> for snp_proto::DaemonMessage { impl From<snp_proto::NewVmResp> for snp_proto::VmDaemonMessage {
fn from(value: snp_proto::NewVmResp) -> Self { fn from(value: snp_proto::NewVmResp) -> Self {
snp_proto::DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::NewVmResp(value)) } snp_proto::VmDaemonMessage { msg: Some(snp_proto::vm_daemon_message::Msg::NewVmResp(value)) }
} }
} }
impl From<snp_proto::UpdateVmResp> for snp_proto::DaemonMessage { impl From<snp_proto::UpdateVmResp> for snp_proto::VmDaemonMessage {
fn from(value: snp_proto::UpdateVmResp) -> Self { fn from(value: snp_proto::UpdateVmResp) -> Self {
snp_proto::DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::UpdateVmResp(value)) } snp_proto::VmDaemonMessage { msg: Some(snp_proto::vm_daemon_message::Msg::UpdateVmResp(value)) }
} }
} }
impl From<snp_proto::NodeResources> for snp_proto::DaemonMessage { impl From<snp_proto::VmNodeResources> for snp_proto::VmDaemonMessage {
fn from(value: snp_proto::NodeResources) -> Self { fn from(value: snp_proto::VmNodeResources) -> Self {
snp_proto::DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::NodeResources(value)) } snp_proto::VmDaemonMessage { msg: Some(snp_proto::vm_daemon_message::Msg::VmNodeResources(value)) }
} }
} }
pub async fn register_node(config: &crate::config::Config) -> Result<Vec<Contract>> { pub async fn register_node(config: &crate::config::Config) -> Result<Vec<VmContract>> {
use tonic::metadata::AsciiMetadataValue; use tonic::metadata::AsciiMetadataValue;
use tonic::Request; use tonic::Request;
let mut client = BrainDaemonClient::connect(config.brain_url.clone()).await?; let mut client = BrainVmDaemonClient::connect(config.brain_url.clone()).await?;
debug!("Starting node registration..."); debug!("Starting node registration...");
let ip_info = IP_INFO.clone(); let ip_info = IP_INFO.clone();
let req = RegisterNodeReq { let req = RegisterVmNodeReq {
node_pubkey: PUBLIC_KEY.clone(), node_pubkey: PUBLIC_KEY.clone(),
owner_pubkey: config.owner_wallet.clone(), owner_pubkey: config.owner_wallet.clone(),
main_ip: ip_info.ip, main_ip: ip_info.ip,
@ -60,7 +60,7 @@ pub async fn register_node(config: &crate::config::Config) -> Result<Vec<Contrac
req.metadata_mut().insert("request-signature", signature); req.metadata_mut().insert("request-signature", signature);
let mut contracts = Vec::new(); let mut contracts = Vec::new();
let mut grpc_stream = client.register_node(req).await?.into_inner(); let mut grpc_stream = client.register_vm_node(req).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await { while let Some(stream_update) = grpc_stream.next().await {
match stream_update { match stream_update {
Ok(node) => { Ok(node) => {
@ -85,9 +85,9 @@ fn sign_stream_auth(contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAut
} }
async fn receive_messages( async fn receive_messages(
mut client: BrainDaemonClient<Channel>, mut client: BrainVmDaemonClient<Channel>,
contracts: Vec<String>, contracts: Vec<String>,
tx: Sender<BrainMessage>, tx: Sender<BrainVmMessage>,
) -> Result<()> { ) -> Result<()> {
debug!("starting to listen for messages from brain"); debug!("starting to listen for messages from brain");
let mut grpc_stream = client.brain_messages(sign_stream_auth(contracts)?).await?.into_inner(); let mut grpc_stream = client.brain_messages(sign_stream_auth(contracts)?).await?.into_inner();
@ -107,15 +107,15 @@ async fn receive_messages(
} }
async fn send_messages( async fn send_messages(
mut client: BrainDaemonClient<Channel>, mut client: BrainVmDaemonClient<Channel>,
contracts: Vec<String>, contracts: Vec<String>,
rx: Receiver<DaemonMessage>, rx: Receiver<VmDaemonMessage>,
tx: Sender<DaemonMessage>, tx: Sender<VmDaemonMessage>,
) -> Result<()> { ) -> Result<()> {
debug!("starting daemon message stream to brain"); debug!("starting daemon message stream to brain");
let rx_stream = ReceiverStream::new(rx); let rx_stream = ReceiverStream::new(rx);
tx.send(DaemonMessage { tx.send(VmDaemonMessage {
msg: Some(snp_proto::daemon_message::Msg::Auth(sign_stream_auth(contracts)?)), msg: Some(snp_proto::vm_daemon_message::Msg::Auth(sign_stream_auth(contracts)?)),
}) })
.await?; .await?;
client.daemon_messages(rx_stream).await?; client.daemon_messages(rx_stream).await?;
@ -126,13 +126,13 @@ async fn send_messages(
pub struct ConnectionData { pub struct ConnectionData {
pub contracts: Vec<String>, pub contracts: Vec<String>,
pub brain_url: String, pub brain_url: String,
pub brain_msg_tx: Sender<BrainMessage>, pub brain_msg_tx: Sender<BrainVmMessage>,
pub daemon_msg_rx: Receiver<DaemonMessage>, pub daemon_msg_rx: Receiver<VmDaemonMessage>,
pub daemon_msg_tx: Sender<DaemonMessage>, pub daemon_msg_tx: Sender<VmDaemonMessage>,
} }
pub async fn connect_and_run(cd: ConnectionData) -> Result<()> { pub async fn connect_and_run(cd: ConnectionData) -> Result<()> {
let client = BrainDaemonClient::connect(cd.brain_url).await?; let client = BrainVmDaemonClient::connect(cd.brain_url).await?;
let mut streaming_tasks = JoinSet::new(); let mut streaming_tasks = JoinSet::new();
streaming_tasks.spawn(receive_messages(client.clone(), cd.contracts.clone(), cd.brain_msg_tx)); streaming_tasks.spawn(receive_messages(client.clone(), cd.contracts.clone(), cd.brain_msg_tx));

@ -14,8 +14,8 @@ use tokio::{
#[allow(dead_code)] #[allow(dead_code)]
struct VMHandler { struct VMHandler {
receiver: Receiver<snp_proto::BrainMessage>, receiver: Receiver<snp_proto::BrainVmMessage>,
sender: Sender<snp_proto::DaemonMessage>, sender: Sender<snp_proto::VmDaemonMessage>,
config: Config, config: Config,
res: state::Resources, res: state::Resources,
} }
@ -23,8 +23,8 @@ struct VMHandler {
#[allow(dead_code)] #[allow(dead_code)]
impl VMHandler { impl VMHandler {
fn new( fn new(
receiver: Receiver<snp_proto::BrainMessage>, receiver: Receiver<snp_proto::BrainVmMessage>,
sender: Sender<snp_proto::DaemonMessage>, sender: Sender<snp_proto::VmDaemonMessage>,
) -> Self { ) -> Self {
let config = match Config::load_from_disk(DAEMON_CONFIG_PATH) { let config = match Config::load_from_disk(DAEMON_CONFIG_PATH) {
Ok(config) => config, Ok(config) => config,
@ -68,7 +68,7 @@ impl VMHandler {
} }
} }
let avail_storage_gb = avail_storage_gb as u32; let avail_storage_gb = avail_storage_gb as u32;
let res = snp_proto::NodeResources { let res = snp_proto::VmNodeResources {
node_pubkey: PUBLIC_KEY.clone(), node_pubkey: PUBLIC_KEY.clone(),
avail_ports: (self.config.public_port_range.len() - self.res.reserved_ports.len()) avail_ports: (self.config.public_port_range.len() - self.res.reserved_ports.len())
as u32, as u32,
@ -180,15 +180,15 @@ impl VMHandler {
self.send_node_resources().await; self.send_node_resources().await;
while let Some(brain_msg) = self.receiver.recv().await { while let Some(brain_msg) = self.receiver.recv().await {
match brain_msg.msg { match brain_msg.msg {
Some(snp_proto::brain_message::Msg::NewVmReq(new_vm_req)) => { Some(snp_proto::brain_vm_message::Msg::NewVmReq(new_vm_req)) => {
self.handle_new_vm_req(new_vm_req).await; self.handle_new_vm_req(new_vm_req).await;
} }
Some(snp_proto::brain_message::Msg::UpdateVmReq(update_vm_req)) => { Some(snp_proto::brain_vm_message::Msg::UpdateVmReq(update_vm_req)) => {
if let Err(e) = self.handle_update_vm_req(update_vm_req).await { if let Err(e) = self.handle_update_vm_req(update_vm_req).await {
log::error!("Could not update vm: {e:?}"); log::error!("Could not update vm: {e:?}");
} }
} }
Some(snp_proto::brain_message::Msg::DeleteVm(delete_vm_req)) => { Some(snp_proto::brain_vm_message::Msg::DeleteVm(delete_vm_req)) => {
let uuid = delete_vm_req.uuid.clone(); let uuid = delete_vm_req.uuid.clone();
if let Err(e) = self.handle_delete_vm(delete_vm_req) { if let Err(e) = self.handle_delete_vm(delete_vm_req) {
log::error!("Could not delete vm {uuid}: {e:?}"); log::error!("Could not delete vm {uuid}: {e:?}");
@ -201,7 +201,7 @@ impl VMHandler {
} }
} }
fn clear_deleted_contracts(&mut self, contracts: Vec<snp_proto::Contract>) { fn clear_deleted_contracts(&mut self, contracts: Vec<snp_proto::VmContract>) {
for uuid in self.res.existing_vms.clone() { for uuid in self.res.existing_vms.clone() {
if contracts.iter().find(|c| c.uuid == uuid).is_none() { if contracts.iter().find(|c| c.uuid == uuid).is_none() {
info!("VM {uuid} exists locally but not found in brain. Deleting..."); info!("VM {uuid} exists locally but not found in brain. Deleting...");

@ -1,5 +1,5 @@
syntax = "proto3"; syntax = "proto3";
package snp_proto; package vm_proto;
message Empty { message Empty {
} }
@ -13,7 +13,7 @@ message AccountBalance {
uint64 tmp_locked = 2; uint64 tmp_locked = 2;
} }
message Contract { message VmContract {
string uuid = 1; string uuid = 1;
string hostname = 2; string hostname = 2;
string admin_pubkey = 3; string admin_pubkey = 3;
@ -53,7 +53,7 @@ message MeasurementIP {
} }
// This should also include a block hash or similar, for auth // This should also include a block hash or similar, for auth
message RegisterNodeReq { message RegisterVmNodeReq {
string node_pubkey = 1; string node_pubkey = 1;
string owner_pubkey = 2; string owner_pubkey = 2;
string main_ip = 3; string main_ip = 3;
@ -64,7 +64,7 @@ message RegisterNodeReq {
uint64 price = 7; uint64 price = 7;
} }
message NodeResources { message VmNodeResources {
string node_pubkey = 1; string node_pubkey = 1;
uint32 avail_ports = 2; uint32 avail_ports = 2;
uint32 avail_ipv4 = 3; uint32 avail_ipv4 = 3;
@ -123,7 +123,7 @@ message DeleteVmReq {
string admin_pubkey = 2; string admin_pubkey = 2;
} }
message BrainMessage { message BrainVmMessage {
oneof Msg { oneof Msg {
NewVmReq new_vm_req = 1; NewVmReq new_vm_req = 1;
UpdateVmReq update_vm_req = 2; UpdateVmReq update_vm_req = 2;
@ -138,28 +138,28 @@ message DaemonStreamAuth {
string signature = 4; string signature = 4;
} }
message DaemonMessage { message VmDaemonMessage {
oneof Msg { oneof Msg {
DaemonStreamAuth auth = 1; DaemonStreamAuth auth = 1;
NewVmResp new_vm_resp = 2; NewVmResp new_vm_resp = 2;
UpdateVmResp update_vm_resp = 3; UpdateVmResp update_vm_resp = 3;
NodeResources node_resources = 4; VmNodeResources vm_node_resources = 4;
} }
} }
service BrainDaemon { service BrainVmDaemon {
rpc RegisterNode (RegisterNodeReq) returns (stream Contract); rpc RegisterVmNode (RegisterVmNodeReq) returns (stream VmContract);
rpc BrainMessages (DaemonStreamAuth) returns (stream BrainMessage); rpc BrainMessages (DaemonStreamAuth) returns (stream BrainVmMessage);
rpc DaemonMessages (stream DaemonMessage) returns (Empty); rpc DaemonMessages (stream VmDaemonMessage) returns (Empty);
} }
message ListContractsReq { message ListVmContractsReq {
string admin_pubkey = 1; string admin_pubkey = 1;
string node_pubkey = 2; string node_pubkey = 2;
string uuid = 3; string uuid = 3;
} }
message NodeFilters { message VmNodeFilters {
uint32 free_ports = 1; uint32 free_ports = 1;
bool offers_ipv4 = 2; bool offers_ipv4 = 2;
bool offers_ipv6 = 3; bool offers_ipv6 = 3;
@ -173,7 +173,7 @@ message NodeFilters {
string node_pubkey = 11; string node_pubkey = 11;
} }
message NodeListResp { message VmNodeListResp {
string node_pubkey = 1; string node_pubkey = 1;
string country = 2; string country = 2;
string region = 3; string region = 3;
@ -191,14 +191,28 @@ message ExtendVmReq {
uint64 locked_nano = 3; uint64 locked_nano = 3;
} }
message AirdropReq {
string pubkey = 1;
uint64 tokens = 2;
}
message Account {
string pubkey = 1;
uint64 balance = 2;
uint64 tmp_locked = 3;
}
service BrainCli { service BrainCli {
rpc GetAirdrop (Pubkey) returns (Empty);
rpc GetBalance (Pubkey) returns (AccountBalance); rpc GetBalance (Pubkey) returns (AccountBalance);
rpc NewVm (NewVmReq) returns (NewVmResp); rpc NewVm (NewVmReq) returns (NewVmResp);
rpc ListContracts (ListContractsReq) returns (stream Contract); rpc ListVmContracts (ListVmContractsReq) returns (stream VmContract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc ListVmNodes (VmNodeFilters) returns (stream VmNodeListResp);
rpc GetOneNode (NodeFilters) returns (NodeListResp); rpc GetOneVmNode (VmNodeFilters) returns (VmNodeListResp);
rpc DeleteVm (DeleteVmReq) returns (Empty); rpc DeleteVm (DeleteVmReq) returns (Empty);
rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp); rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp);
rpc ExtendVm (ExtendVmReq) returns (Empty); rpc ExtendVm (ExtendVmReq) returns (Empty);
// admin commands
rpc Airdrop (AirdropReq) returns (Empty);
rpc ListAllVmContracts (Empty) returns (stream VmContract);
rpc ListAccounts (Empty) returns (stream Account);
} }