diff --git a/build.rs b/build.rs index aa8f7eb..d1b9140 100644 --- a/build.rs +++ b/build.rs @@ -1,6 +1,6 @@ fn main() { tonic_build::configure() .build_server(true) - .compile_protos(&["brain.proto"], &["proto"]) + .compile_protos(&["snp.proto"], &["proto"]) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); } diff --git a/brain.proto b/snp.proto similarity index 65% rename from brain.proto rename to snp.proto index 7253420..89f637b 100644 --- a/brain.proto +++ b/snp.proto @@ -1,81 +1,14 @@ syntax = "proto3"; -package brain; +package snp_proto; message Empty { } -message NodePubkey { - string node_pubkey = 1; +message Pubkey { + string pubkey = 1; } -message RegisterNodeReq { - string node_pubkey = 1; - string owner_pubkey = 2; -} - -message NodeResourceReq { - string node_pubkey = 1; - uint32 avail_ports = 2; - uint32 avail_ipv4 = 3; - uint32 avail_ipv6 = 4; - uint32 avail_vcpus = 5; - uint32 avail_memory_mb = 6; - uint32 avail_storage_gb = 7; - uint32 max_ports_per_vm = 8; -} - -message MeasurementArgs { - // this will be IP:Port of the dtrfs API - // actually not a measurement arg, but needed for the injector - string dtrfs_api_endpoint = 1; - repeated uint32 exposed_ports = 2; - string ovmf_hash = 5; - // This is needed to allow the CLI to build the kernel params from known data. - // The CLI will use the kernel params to get the measurement. - repeated NewVmRespIP ips = 6; -} - -message NewVMReq { - string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID - string hostname = 2; - string admin_pubkey = 3; - string node_pubkey = 4; - repeated uint32 extra_ports = 5; - bool public_ipv4 = 6; - bool public_ipv6 = 7; - uint32 disk_size_gb = 8; - uint32 vcpus = 9; - uint32 memory_mb = 10; - string kernel_url = 11; - string kernel_sha = 12; - string dtrfs_url = 13; - string dtrfs_sha = 14; -} - -message NewVMResp { - string uuid = 1; - string error = 2; - MeasurementArgs args = 3; -} - -message UpdateVMReq { - string uuid = 1; - uint32 disk_size_gb = 2; - uint32 vcpus = 3; - uint32 memory_mb = 4; - string kernel_url = 5; - string kernel_sha = 6; - string dtrfs_url = 7; - string dtrfs_sha = 8; -} - -message UpdateVMResp { - string uuid = 1; - string error = 2; - MeasurementArgs args = 3; -} - -message VMContract { +message Contract { string uuid = 1; string hostname = 2; string admin_pubkey = 3; @@ -92,33 +25,115 @@ message VMContract { string updated_at = 14; } -message ListVMContractsReq { - string admin_pubkey = 1; - string node_pubkey = 2; - string uuid = 3; +message MeasurementArgs { + // this will be IP:Port of the dtrfs API + // actually not a measurement arg, but needed for the injector + string dtrfs_api_endpoint = 1; + repeated uint32 exposed_ports = 2; + string ovmf_hash = 5; + // This is needed to allow the CLI to build the kernel params from known data. + // The CLI will use the kernel params to get the measurement. + repeated MeasurementIP ips = 6; } -message NewVmRespIP { +message MeasurementIP { uint32 nic_index = 1; string address = 2; string mask = 3; string gateway = 4; } -message DeleteVMReq { +message RegisterNodeReq { + string node_pubkey = 1; + string owner_pubkey = 2; + string main_ip = 3; + string country = 7; + string region = 8; + string city = 9; +} + +message NodeResources { + string node_pubkey = 1; + uint32 avail_ports = 2; + uint32 avail_ipv4 = 3; + uint32 avail_ipv6 = 4; + uint32 avail_vcpus = 5; + uint32 avail_memory_mb = 6; + uint32 avail_storage_gb = 7; + uint32 max_ports_per_vm = 8; +} + +message NewVmReq { + string uuid = 1; + string hostname = 2; + string admin_pubkey = 3; + string node_pubkey = 4; + repeated uint32 extra_ports = 5; + bool public_ipv4 = 6; + bool public_ipv6 = 7; + uint32 disk_size_gb = 8; + uint32 vcpus = 9; + uint32 memory_mb = 10; + string kernel_url = 11; + string kernel_sha = 12; + string dtrfs_url = 13; + string dtrfs_sha = 14; +} + +message NewVmResp { + string uuid = 1; + string error = 2; + MeasurementArgs args = 3; +} + +message UpdateVmReq { + string uuid = 1; + uint32 disk_size_gb = 2; + uint32 vcpus = 3; + uint32 memory_mb = 4; + string kernel_url = 5; + string kernel_sha = 6; + string dtrfs_url = 7; + string dtrfs_sha = 8; +} + +message UpdateVmResp { + string uuid = 1; + string error = 2; + MeasurementArgs args = 3; +} + +message DeleteVmReq { string uuid = 1; } -service BrainDaemonService { - rpc RegisterNode (RegisterNodeReq) returns (Empty); - rpc SendNodeResources (stream NodeResourceReq) returns (Empty); - rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq); - rpc SendNewVMResp (stream NewVMResp) returns (Empty); - rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq); - rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); - rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq); - rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty); - //rpc GetMeasurementArgs (ListVMContractsReq) returns (stream MeasurementArgs); +message BrainMessage { + oneof Msg { + NewVmReq new_vm_req = 1; + UpdateVmReq update_vm_req = 2; + DeleteVmReq delete_vm = 3; + } +} + +message DaemonMessage { + oneof Msg { + Pubkey pubkey = 1; + NewVmResp new_vm_resp = 2; + UpdateVmResp update_vm_resp = 3; + NodeResources node_resources = 4; + } +} + +service BrainDaemon { + rpc RegisterNode (RegisterNodeReq) returns (stream Contract); + rpc BrainMessages (Pubkey) returns (stream BrainMessage); + rpc DaemonMessages (stream DaemonMessage) returns (Empty); +} + +message ListContractsReq { + string admin_pubkey = 1; + string node_pubkey = 2; + string uuid = 3; } message NodeFilters { @@ -144,12 +159,11 @@ message NodeListResp { uint32 provider_rating = 7; } -service BrainCliService { - rpc CreateVMContract (NewVMReq) returns (NewVMResp); - rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); +service BrainCli { + rpc NewVm (NewVmReq) returns (NewVmResp); + rpc ListContracts (ListContractsReq) returns (stream Contract); rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc GetOneNode (NodeFilters) returns (NodeListResp); - rpc DeleteVM (DeleteVMReq) returns (Empty); - rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp); - //rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs); -} \ No newline at end of file + rpc DeleteVm (DeleteVmReq) returns (Empty); + rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp); +} diff --git a/src/data.rs b/src/data.rs index bad12ad..ac021aa 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,5 +1,5 @@ #![allow(dead_code)] -use crate::grpc::brain::{self as grpc}; +use crate::grpc::snp_proto::{self as grpc}; use chrono::Utc; use dashmap::DashMap; use log::{debug, info, warn}; @@ -57,9 +57,9 @@ pub struct Contract { pub updated_at: String, } -impl Into for Contract { - fn into(self) -> grpc::VmContract { - grpc::VmContract { +impl Into for Contract { + fn into(self) -> grpc::Contract { + grpc::Contract { uuid: self.uuid, hostname: self.hostname, admin_pubkey: self.admin_pubkey, @@ -84,9 +84,7 @@ pub struct BrainData { contracts: RwLock>, tmp_newvm_reqs: DashMap)>, tmp_updatevm_reqs: DashMap)>, - daemon_deletevm_tx: DashMap>, - daemon_newvm_tx: DashMap>, - daemon_updatevm_tx: DashMap>, + daemon_tx: DashMap>, } #[derive(Debug)] @@ -103,9 +101,7 @@ impl BrainData { contracts: RwLock::new(Vec::new()), tmp_newvm_reqs: DashMap::new(), tmp_updatevm_reqs: DashMap::new(), - daemon_deletevm_tx: DashMap::new(), - daemon_newvm_tx: DashMap::new(), - daemon_updatevm_tx: DashMap::new(), + daemon_tx: DashMap::new(), } } @@ -123,7 +119,7 @@ impl BrainData { nodes.push(node); } - pub fn submit_node_resources(&self, res: grpc::NodeResourceReq) { + pub fn submit_node_resources(&self, res: grpc::NodeResources) { let mut nodes = self.nodes.write().unwrap(); for n in nodes.iter_mut() { if n.public_key == res.node_pubkey { @@ -148,29 +144,32 @@ impl BrainData { debug!("Node list:\n{:?}", nodes); } - pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender) { - self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); + pub fn add_daemon_tx(&self, node_pubkey: &str, tx: Sender) { + self.daemon_tx.insert(node_pubkey.to_string(), tx); } - pub fn add_daemon_updatevm_tx(&self, node_pubkey: &str, tx: Sender) { - log::debug!("Adding daemon updatevm tx for node_pubkey: {}", node_pubkey); - self.daemon_updatevm_tx.insert(node_pubkey.to_string(), tx); - info!("Added daemon TX for {}", node_pubkey); + pub fn del_daemon_tx(&self, node_pubkey: &str) { + self.daemon_tx.remove(node_pubkey); } pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) { if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) { info!("Found vm {}. Deleting...", delete_vm.uuid); - if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) { + if let Some(daemon_tx) = self.daemon_tx.get(&contract.node_pubkey) { debug!( "TX for daemon {} found. Informing daemon about deletion of {}.", contract.node_pubkey, delete_vm.uuid ); - if daemon_tx.send(delete_vm.clone()).await.is_err() { + let msg = grpc::BrainMessage { + msg: Some(grpc::brain_message::Msg::DeleteVm(delete_vm.clone())), + }; + if let Err(e) = daemon_tx.send(msg).await { warn!( - "Failed to send deletion request to {}. Triggering memory cleanup.", + "Failed to send deletion request to {} due to error: {e:?}", contract.node_pubkey ); + info!("Deleting daemon TX for {}", contract.node_pubkey); + self.del_daemon_tx(&contract.node_pubkey); } } let mut contracts = self.contracts.write().unwrap(); @@ -178,13 +177,7 @@ impl BrainData { } } - pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender) { - self.tmp_newvm_reqs - .retain(|_, req| req.0.node_pubkey != node_pubkey); - self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); - } - - pub async fn submit_newvm_resp(&self, mut new_vm_resp: grpc::NewVmResp) { + pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) { let new_vm_req = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) { Some((_, r)) => r, None => { @@ -192,35 +185,9 @@ impl BrainData { "Received confirmation for ghost NewVMReq {}", new_vm_resp.uuid ); - new_vm_resp.error = "Received confirmation for ghost NewVMReq.".to_string(); return; } }; - match new_vm_resp.args { - Some(ref mut args) => { - if args.dtrfs_api_endpoint.starts_with(':') { - match self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) { - Some(node) => { - args.dtrfs_api_endpoint = - format!("{}{}", node.ip, args.dtrfs_api_endpoint); - } - None => { - log::error!("Node not found for pubkey {}", new_vm_req.0.node_pubkey); - new_vm_resp.error = "Node not found.".to_string(); - return; - } - } - } - } - None => { - log::error!( - "NewVmResp does not contain MeasurementArgs for {}", - new_vm_resp.uuid - ); - new_vm_resp.error = "Daemon did not return measurement args.".to_string(); - return; - } - } if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) { log::error!( "CLI RX for {} dropped before receiving confirmation {:?}.", @@ -284,52 +251,21 @@ impl BrainData { if update_vm_resp.error != "" { return; } - let mut contracts = self.contracts.write().unwrap(); match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) { Some(contract) => { - match update_vm_resp.args { - Some(ref mut args) => { - if args.dtrfs_api_endpoint.starts_with(':') { - match self.find_nodes_by_pubkey(&contract.node_pubkey) { - Some(node) => { - args.dtrfs_api_endpoint = - format!("{}{}", node.ip, args.dtrfs_api_endpoint); - } - None => { - log::error!( - "Node not found for pubkey {}", - contract.node_pubkey - ); - update_vm_resp.error = "Node not found.".to_string(); - return; - } - } - } - } - None => { - log::error!( - "NewVmResp does not contain MeasurementArgs for {}", - update_vm_resp.uuid - ); - update_vm_resp.error = - "Daemon did not return measurement args.".to_string(); - return; - } - } - contract.disk_size_gb = update_vm_req.0.disk_size_gb; contract.vcpus = update_vm_req.0.vcpus; contract.memory_mb = update_vm_req.0.memory_mb; if !update_vm_req.0.kernel_sha.is_empty() { - info!( + debug!( "Updating kernel sha for {} to {}", contract.uuid, update_vm_req.0.kernel_sha ); contract.kernel_sha = update_vm_req.0.kernel_sha; } if !update_vm_req.0.dtrfs_sha.is_empty() { - info!( + debug!( "Updating dtrfs sha for {} to {}", contract.uuid, update_vm_req.0.dtrfs_sha ); @@ -342,15 +278,14 @@ impl BrainData { update_vm_resp.error = "Contract not found.".to_string(); } } - if let Err(_) = update_vm_req.1.send(update_vm_resp.clone()) { - log::error!( - "CLI RX dropped before receiving UpdateVMResp {:?}.", - update_vm_resp + if let Err(e) = update_vm_req.1.send(update_vm_resp.clone()) { + log::warn!( + "CLI RX dropped before receiving UpdateVMResp {update_vm_resp:?}. Error: {e:?}" ); } } - pub async fn submit_newvmrequest( + pub async fn submit_newvm_req( &self, mut req: grpc::NewVmReq, tx: OneshotSender, @@ -359,18 +294,21 @@ impl BrainData { info!("Inserting new vm request in memory: {req:?}"); self.tmp_newvm_reqs .insert(req.uuid.clone(), (req.clone(), tx)); - if let Some(server_tx) = self.daemon_newvm_tx.get(&req.node_pubkey) { + if let Some(daemon_tx) = self.daemon_tx.get(&req.node_pubkey) { debug!( "Found daemon TX for {}. Sending newVMReq {}", req.node_pubkey, req.uuid ); - if server_tx.send(req.clone()).await.is_ok() { - return; - } else { + let msg = grpc::BrainMessage { + msg: Some(grpc::brain_message::Msg::NewVmReq(req.clone())), + }; + if let Err(e) = daemon_tx.send(msg).await { warn!( - "Daemon {} RX dropped before sending update. Cleaning memory...", + "Failed to send new VM request to {} due to error: {e:?}", req.node_pubkey ); + info!("Deleting daemon TX for {}", req.node_pubkey); + self.del_daemon_tx(&req.node_pubkey); self.submit_newvm_resp(grpc::NewVmResp { error: "Daemon is offline.".to_string(), uuid: req.uuid, @@ -405,21 +343,23 @@ impl BrainData { }; self.tmp_updatevm_reqs .insert(req.uuid.clone(), (req.clone(), tx)); - if let Some(server_tx) = self.daemon_updatevm_tx.get(&node_pubkey) { + if let Some(server_tx) = self.daemon_tx.get(&node_pubkey) { debug!( "Found daemon TX for {}. Sending updateVMReq {}", node_pubkey, req.uuid ); - match server_tx.send(req.clone()).await { + let msg = grpc::BrainMessage { + msg: Some(grpc::brain_message::Msg::UpdateVmReq(req.clone())), + }; + match server_tx.send(msg).await { Ok(_) => { debug!("Successfully sent updateVMReq to {}", node_pubkey); return; } Err(e) => { - warn!( - "Failed to send updateVMReq to {}: {}. Cleaning memory...", - node_pubkey, e - ); + warn!("Failed to send update VM request to {node_pubkey} due to error {e}"); + info!("Deleting daemon TX for {}", node_pubkey); + self.del_daemon_tx(&node_pubkey); self.submit_updatevm_resp(grpc::UpdateVmResp { uuid, error: "Daemon is offline.".to_string(), @@ -454,7 +394,7 @@ impl BrainData { nodes.iter().cloned().find(|n| n.owner_key == owner_key) } - pub fn find_nodes_by_filters(&self, filters: &crate::grpc::brain::NodeFilters) -> Vec { + pub fn find_nodes_by_filters(&self, filters: &crate::grpc::snp_proto::NodeFilters) -> Vec { let nodes = self.nodes.read().unwrap(); nodes .iter() @@ -477,7 +417,7 @@ impl BrainData { // TODO: sort by rating pub fn get_one_node_by_filters( &self, - filters: &crate::grpc::brain::NodeFilters, + filters: &crate::grpc::snp_proto::NodeFilters, ) -> Option { let nodes = self.nodes.read().unwrap(); nodes diff --git a/src/grpc.rs b/src/grpc.rs index adb280e..127652c 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,40 +1,20 @@ #![allow(dead_code)] -pub mod brain { - tonic::include_proto!("brain"); +pub mod snp_proto { + tonic::include_proto!("snp_proto"); } use crate::data::BrainData; -use brain::brain_cli_service_server::BrainCliService; -use brain::brain_daemon_service_server::BrainDaemonService; -use brain::*; -use log::debug; +use snp_proto::brain_cli_server::BrainCli; +use snp_proto::brain_daemon_server::BrainDaemon; +use snp_proto::*; use log::info; -use log::warn; -use serde::Deserialize; use std::pin::Pin; use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; -#[derive(Deserialize)] -pub struct ServerLocation { - country: String, - region: String, - city: String, -} - -impl ServerLocation { - async fn try_from_ip(ip: &str) -> anyhow::Result { - let body = reqwest::get("https://ipinfo.io/".to_string() + ip) - .await? - .text() - .await?; - Ok(serde_json::de::from_str(&body)?) - } -} - pub struct BrainDaemonMock { data: Arc, } @@ -56,168 +36,26 @@ impl BrainCliMock { } #[tonic::async_trait] -impl BrainDaemonService for BrainDaemonMock { +impl BrainDaemon for BrainDaemonMock { + type RegisterNodeStream = Pin> + Send>>; async fn register_node( &self, req: Request, - ) -> Result, Status> { - let mut ip = match req.remote_addr() { - Some(ip) => ip.ip().to_string(), - None => { - return Err(Status::aborted( - "Server was not capable to obtain your IP. Please report this bug at https://detee.ltd", - )); - } - }; - if cfg!(debug_assertions) { - ip = "83.213.100.17".to_string(); - } - let location = match ServerLocation::try_from_ip(&ip).await { - Ok(location) => location, - Err(e) => { - log::error!("Failed to contact ipinfo.io: {e:?}"); - return Err(Status::cancelled( - "Server could not obtain information about your IP address.", - )); - } - }; + ) -> Result, Status> { let req = req.into_inner(); + info!("Starting registration process for {:?}", req); let node = crate::data::Node { - public_key: req.node_pubkey, + public_key: req.node_pubkey.clone(), owner_key: req.owner_pubkey, - country: location.country, - region: location.region, - city: location.city, - ip, + country: req.country, + region: req.region, + city: req.city, + ip: req.main_ip, ..Default::default() }; self.data.insert_node(node); - Ok(Response::new(Empty {})) - } - async fn send_node_resources( - &self, - req: Request>, - ) -> Result, Status> { - debug!("Some node connected to stream NodeResourceReq"); - let mut resp_stream = req.into_inner(); - // Don't do this in prod. - let mut node_pubkey = String::new(); - while let Some(new_vm_resp) = resp_stream.next().await { - match new_vm_resp { - Ok(r) => { - node_pubkey = r.node_pubkey.clone(); - info!("Received new resources from daemon: {r:?}"); - self.data.submit_node_resources(r); - } - Err(e) => { - self.data.submit_node_resources(NodeResourceReq { - node_pubkey: node_pubkey.clone(), - ..Default::default() - }); - log::warn!("Daemon disconnected from Streaming: {e:?}"); - } - } - } - Ok(Response::new(Empty {})) - } - - type GetNewVMReqsStream = Pin> + Send>>; - async fn get_new_vm_reqs( - &self, - req: Request, - ) -> Result, Status> { - let req = req.into_inner(); - info!("Daemon {} requested GetNewVMReqsStream", req.node_pubkey); - let (grpc_tx, grpc_rx) = mpsc::channel(6); - let (data_tx, mut data_rx) = mpsc::channel(6); - self.data - .add_daemon_newvm_tx(&req.node_pubkey, data_tx) - .await; - let data = self.data.clone(); - tokio::spawn(async move { - while let Some(newvmreq) = data_rx.recv().await { - let uuid = newvmreq.uuid.clone(); - debug!("Sending NewVMRequest to {}: {newvmreq:?}", req.node_pubkey); - if let Err(e) = grpc_tx.send(Ok(newvmreq)).await { - warn!("Could not send NewVMRequest to {}: {e:?}", req.node_pubkey); - data.submit_newvm_resp(NewVmResp { - error: "Daemon not connected.".to_string(), - uuid, - ..Default::default() - }) - .await; - break; - } - } - }); - let output_stream = ReceiverStream::new(grpc_rx); - Ok(Response::new( - Box::pin(output_stream) as Self::GetNewVMReqsStream - )) - } - - async fn send_new_vm_resp( - &self, - req: Request>, - ) -> Result, Status> { - debug!("Some node connected to stream NewVMResp"); - let mut resp_stream = req.into_inner(); - while let Some(new_vm_resp) = resp_stream.next().await { - match new_vm_resp { - Ok(r) => { - info!("Received NewVmResp from daemon: {r:?}"); - self.data.submit_newvm_resp(r).await; - } - Err(e) => { - log::warn!("Daemon disconnected from Streaming: {e:?}") - } - } - } - Ok(Response::new(Empty {})) - } - - type GetDeleteVMReqStream = Pin> + Send>>; - async fn get_delete_vm_req( - &self, - req: Request, - ) -> Result, Status> { - let node_pubkey = req.into_inner().node_pubkey; - info!("Daemon {node_pubkey} requested GetDeleteVMReqStream"); - let (grpc_tx, grpc_rx) = mpsc::channel(6); - let (data_tx, mut data_rx) = mpsc::channel(6); - self.data - .clone() - .add_daemon_deletevm_tx(&node_pubkey, data_tx); - tokio::spawn(async move { - while let Some(deleted_vm) = data_rx.recv().await { - match grpc_tx.send(Ok(deleted_vm.clone())).await { - Ok(_) => debug!( - "Sent delete_vm confirmation to {}: {:?}", - node_pubkey, deleted_vm - ), - Err(e) => log::error!( - "Could not send delete_vm confirmation {:?} to {} because of error: {:?}", - deleted_vm, - node_pubkey, - e - ), - } - } - }); - let output_stream = ReceiverStream::new(grpc_rx); - Ok(Response::new( - Box::pin(output_stream) as Self::GetDeleteVMReqStream - )) - } - - type ListVMContractsStream = Pin> + Send>>; - async fn list_vm_contracts( - &self, - req: Request, - ) -> Result, Status> { - let req = req.into_inner(); - info!("Node {} requested ListVMContractsStream", req.node_pubkey); + info!("Sending existing contracts to {}", req.node_pubkey); let contracts = self.data.find_contracts_by_node_pubkey(&req.node_pubkey); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { @@ -227,62 +65,52 @@ impl BrainDaemonService for BrainDaemonMock { }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( - Box::pin(output_stream) as Self::ListVMContractsStream + Box::pin(output_stream) as Self::RegisterNodeStream )) } - type GetUpdateVMReqStream = Pin> + Send>>; - async fn get_update_vm_req( + type BrainMessagesStream = Pin> + Send>>; + async fn brain_messages( &self, - req: Request, - ) -> Result, Status> { + req: Request, + ) -> Result, Status> { let req = req.into_inner(); - info!("Daemon {} requested GetUpdateVMStream", req.node_pubkey); - let (grpc_tx, grpc_rx) = mpsc::channel(6); - let (data_tx, mut data_rx) = mpsc::channel(6); - self.data.add_daemon_updatevm_tx(&req.node_pubkey, data_tx); - let data = self.data.clone(); - tokio::spawn(async move { - while let Some(updatevmreq) = data_rx.recv().await { - debug!( - "Sending UpdateVMRequest to {}: {updatevmreq:?}", - req.node_pubkey - ); - if let Err(e) = grpc_tx.send(Ok(updatevmreq.clone())).await { - warn!( - "Could not send UpdateVMRequest to {}: {e:?}", - req.node_pubkey - ); - data.submit_updatevm_resp(UpdateVmResp { - error: "Daemon not connected.".to_string(), - uuid: updatevmreq.uuid, - args: None, - }) - .await; - break; - } - } - }); - let output_stream = ReceiverStream::new(grpc_rx); + info!("Daemon {} connected to receive brain messages", req.pubkey); + let (tx, rx) = mpsc::channel(6); + self.data.add_daemon_tx(&req.pubkey, tx); + let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg)); Ok(Response::new( - Box::pin(output_stream) as Self::GetUpdateVMReqStream + Box::pin(output_stream) as Self::BrainMessagesStream )) } - async fn send_update_vm_resp( + async fn daemon_messages( &self, - req: Request>, + req: Request>, ) -> Result, Status> { - debug!("Some node connected to stream UpdateVmResp"); - let mut resp_stream = req.into_inner(); - while let Some(update_vm_resp) = resp_stream.next().await { - match update_vm_resp { - Ok(c) => { - info!("Received confirmation from daemon: {c:?}"); - self.data.submit_updatevm_resp(c).await; - } + let mut req_stream = req.into_inner(); + let mut pubkey = String::new(); + while let Some(daemon_message) = req_stream.next().await { + info!("Received a message from daemon {pubkey}: {daemon_message:?}"); + match daemon_message { + Ok(msg) => match msg.msg { + Some(daemon_message::Msg::Pubkey(p)) => { + pubkey = p.pubkey; + } + Some(daemon_message::Msg::NewVmResp(new_vm_resp)) => { + self.data.submit_newvm_resp(new_vm_resp).await; + } + Some(daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { + self.data.submit_updatevm_resp(update_vm_resp).await; + } + Some(daemon_message::Msg::NodeResources(node_resources)) => { + self.data.submit_node_resources(node_resources); + } + None => {} + }, Err(e) => { - log::warn!("Daemon disconnected from Streaming: {e:?}") + log::warn!("Daemon disconnected: {e:?}"); + self.data.del_daemon_tx(&pubkey); } } } @@ -291,16 +119,13 @@ impl BrainDaemonService for BrainDaemonMock { } #[tonic::async_trait] -impl BrainCliService for BrainCliMock { - async fn create_vm_contract( - &self, - req: Request, - ) -> Result, Status> { +impl BrainCli for BrainCliMock { + async fn new_vm(&self, req: Request) -> Result, Status> { let req = req.into_inner(); info!("New VM requested via CLI: {req:?}"); let admin_pubkey = req.admin_pubkey.clone(); let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); - self.data.submit_newvmrequest(req, oneshot_tx).await; + self.data.submit_newvm_req(req, oneshot_tx).await; match oneshot_rx.await { Ok(response) => { info!("Sending VM confirmation to {admin_pubkey}: {response:?}"); @@ -334,11 +159,11 @@ impl BrainCliService for BrainCliMock { } } - type ListVMContractsStream = Pin> + Send>>; - async fn list_vm_contracts( + type ListContractsStream = Pin> + Send>>; + async fn list_contracts( &self, - req: Request, - ) -> Result, Status> { + req: Request, + ) -> Result, Status> { let req = req.into_inner(); info!("CLI {} requested ListVMContractsStream", req.admin_pubkey); let contracts = match req.uuid.is_empty() { @@ -356,7 +181,7 @@ impl BrainCliService for BrainCliMock { }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( - Box::pin(output_stream) as Self::ListVMContractsStream + Box::pin(output_stream) as Self::ListContractsStream )) } diff --git a/src/main.rs b/src/main.rs index 5f3f08c..5ea9aff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,8 +2,8 @@ mod data; mod grpc; use data::BrainData; -use grpc::brain::brain_cli_service_server::BrainCliServiceServer; -use grpc::brain::brain_daemon_service_server::BrainDaemonServiceServer; +use grpc::snp_proto::brain_cli_server::BrainCliServer; +use grpc::snp_proto::brain_daemon_server::BrainDaemonServer; use grpc::BrainCliMock; use grpc::BrainDaemonMock; use std::sync::Arc; @@ -17,8 +17,8 @@ async fn main() { let data = Arc::new(BrainData::new()); let addr = "0.0.0.0:31337".parse().unwrap(); - let daemon_server = BrainDaemonServiceServer::new(BrainDaemonMock::new(data.clone())); - let cli_server = BrainCliServiceServer::new(BrainCliMock::new(data.clone())); + let daemon_server = BrainDaemonServer::new(BrainDaemonMock::new(data.clone())); + let cli_server = BrainCliServer::new(BrainCliMock::new(data.clone())); Server::builder() .add_service(daemon_server)