modified proto based on feadback form the team

This commit is contained in:
ghe0 2025-01-17 22:43:39 +02:00
parent 0e85165240
commit c9a7ec9f68
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
5 changed files with 214 additions and 435 deletions

@ -1,6 +1,6 @@
fn main() {
tonic_build::configure()
.build_server(true)
.compile_protos(&["brain.proto"], &["proto"])
.compile_protos(&["snp.proto"], &["proto"])
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
}

@ -1,81 +1,14 @@
syntax = "proto3";
package brain;
package snp_proto;
message Empty {
}
message NodePubkey {
string node_pubkey = 1;
message Pubkey {
string pubkey = 1;
}
message RegisterNodeReq {
string node_pubkey = 1;
string owner_pubkey = 2;
}
message NodeResourceReq {
string node_pubkey = 1;
uint32 avail_ports = 2;
uint32 avail_ipv4 = 3;
uint32 avail_ipv6 = 4;
uint32 avail_vcpus = 5;
uint32 avail_memory_mb = 6;
uint32 avail_storage_gb = 7;
uint32 max_ports_per_vm = 8;
}
message MeasurementArgs {
// this will be IP:Port of the dtrfs API
// actually not a measurement arg, but needed for the injector
string dtrfs_api_endpoint = 1;
repeated uint32 exposed_ports = 2;
string ovmf_hash = 5;
// This is needed to allow the CLI to build the kernel params from known data.
// The CLI will use the kernel params to get the measurement.
repeated NewVmRespIP ips = 6;
}
message NewVMReq {
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
string hostname = 2;
string admin_pubkey = 3;
string node_pubkey = 4;
repeated uint32 extra_ports = 5;
bool public_ipv4 = 6;
bool public_ipv6 = 7;
uint32 disk_size_gb = 8;
uint32 vcpus = 9;
uint32 memory_mb = 10;
string kernel_url = 11;
string kernel_sha = 12;
string dtrfs_url = 13;
string dtrfs_sha = 14;
}
message NewVMResp {
string uuid = 1;
string error = 2;
MeasurementArgs args = 3;
}
message UpdateVMReq {
string uuid = 1;
uint32 disk_size_gb = 2;
uint32 vcpus = 3;
uint32 memory_mb = 4;
string kernel_url = 5;
string kernel_sha = 6;
string dtrfs_url = 7;
string dtrfs_sha = 8;
}
message UpdateVMResp {
string uuid = 1;
string error = 2;
MeasurementArgs args = 3;
}
message VMContract {
message Contract {
string uuid = 1;
string hostname = 2;
string admin_pubkey = 3;
@ -92,33 +25,115 @@ message VMContract {
string updated_at = 14;
}
message ListVMContractsReq {
string admin_pubkey = 1;
string node_pubkey = 2;
string uuid = 3;
message MeasurementArgs {
// this will be IP:Port of the dtrfs API
// actually not a measurement arg, but needed for the injector
string dtrfs_api_endpoint = 1;
repeated uint32 exposed_ports = 2;
string ovmf_hash = 5;
// This is needed to allow the CLI to build the kernel params from known data.
// The CLI will use the kernel params to get the measurement.
repeated MeasurementIP ips = 6;
}
message NewVmRespIP {
message MeasurementIP {
uint32 nic_index = 1;
string address = 2;
string mask = 3;
string gateway = 4;
}
message DeleteVMReq {
message RegisterNodeReq {
string node_pubkey = 1;
string owner_pubkey = 2;
string main_ip = 3;
string country = 7;
string region = 8;
string city = 9;
}
message NodeResources {
string node_pubkey = 1;
uint32 avail_ports = 2;
uint32 avail_ipv4 = 3;
uint32 avail_ipv6 = 4;
uint32 avail_vcpus = 5;
uint32 avail_memory_mb = 6;
uint32 avail_storage_gb = 7;
uint32 max_ports_per_vm = 8;
}
message NewVmReq {
string uuid = 1;
string hostname = 2;
string admin_pubkey = 3;
string node_pubkey = 4;
repeated uint32 extra_ports = 5;
bool public_ipv4 = 6;
bool public_ipv6 = 7;
uint32 disk_size_gb = 8;
uint32 vcpus = 9;
uint32 memory_mb = 10;
string kernel_url = 11;
string kernel_sha = 12;
string dtrfs_url = 13;
string dtrfs_sha = 14;
}
message NewVmResp {
string uuid = 1;
string error = 2;
MeasurementArgs args = 3;
}
message UpdateVmReq {
string uuid = 1;
uint32 disk_size_gb = 2;
uint32 vcpus = 3;
uint32 memory_mb = 4;
string kernel_url = 5;
string kernel_sha = 6;
string dtrfs_url = 7;
string dtrfs_sha = 8;
}
message UpdateVmResp {
string uuid = 1;
string error = 2;
MeasurementArgs args = 3;
}
message DeleteVmReq {
string uuid = 1;
}
service BrainDaemonService {
rpc RegisterNode (RegisterNodeReq) returns (Empty);
rpc SendNodeResources (stream NodeResourceReq) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMReq);
rpc SendNewVMResp (stream NewVMResp) returns (Empty);
rpc GetDeleteVMReq (NodePubkey) returns (stream DeleteVMReq);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc GetUpdateVMReq (NodePubkey) returns (stream UpdateVMReq);
rpc SendUpdateVMResp (stream UpdateVMResp) returns (Empty);
//rpc GetMeasurementArgs (ListVMContractsReq) returns (stream MeasurementArgs);
message BrainMessage {
oneof Msg {
NewVmReq new_vm_req = 1;
UpdateVmReq update_vm_req = 2;
DeleteVmReq delete_vm = 3;
}
}
message DaemonMessage {
oneof Msg {
Pubkey pubkey = 1;
NewVmResp new_vm_resp = 2;
UpdateVmResp update_vm_resp = 3;
NodeResources node_resources = 4;
}
}
service BrainDaemon {
rpc RegisterNode (RegisterNodeReq) returns (stream Contract);
rpc BrainMessages (Pubkey) returns (stream BrainMessage);
rpc DaemonMessages (stream DaemonMessage) returns (Empty);
}
message ListContractsReq {
string admin_pubkey = 1;
string node_pubkey = 2;
string uuid = 3;
}
message NodeFilters {
@ -144,12 +159,11 @@ message NodeListResp {
uint32 provider_rating = 7;
}
service BrainCliService {
rpc CreateVMContract (NewVMReq) returns (NewVMResp);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
service BrainCli {
rpc NewVm (NewVmReq) returns (NewVmResp);
rpc ListContracts (ListContractsReq) returns (stream Contract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp);
rpc GetOneNode (NodeFilters) returns (NodeListResp);
rpc DeleteVM (DeleteVMReq) returns (Empty);
rpc UpdateVM (UpdateVMReq) returns (UpdateVMResp);
//rpc GetMeasurementArgs (ListVMContractsReq) returns (MeasurementArgs);
rpc DeleteVm (DeleteVmReq) returns (Empty);
rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp);
}

@ -1,5 +1,5 @@
#![allow(dead_code)]
use crate::grpc::brain::{self as grpc};
use crate::grpc::snp_proto::{self as grpc};
use chrono::Utc;
use dashmap::DashMap;
use log::{debug, info, warn};
@ -57,9 +57,9 @@ pub struct Contract {
pub updated_at: String,
}
impl Into<grpc::VmContract> for Contract {
fn into(self) -> grpc::VmContract {
grpc::VmContract {
impl Into<grpc::Contract> for Contract {
fn into(self) -> grpc::Contract {
grpc::Contract {
uuid: self.uuid,
hostname: self.hostname,
admin_pubkey: self.admin_pubkey,
@ -84,9 +84,7 @@ pub struct BrainData {
contracts: RwLock<Vec<Contract>>,
tmp_newvm_reqs: DashMap<String, (grpc::NewVmReq, OneshotSender<grpc::NewVmResp>)>,
tmp_updatevm_reqs: DashMap<String, (grpc::UpdateVmReq, OneshotSender<grpc::UpdateVmResp>)>,
daemon_deletevm_tx: DashMap<String, Sender<grpc::DeleteVmReq>>,
daemon_newvm_tx: DashMap<String, Sender<grpc::NewVmReq>>,
daemon_updatevm_tx: DashMap<String, Sender<grpc::UpdateVmReq>>,
daemon_tx: DashMap<String, Sender<grpc::BrainMessage>>,
}
#[derive(Debug)]
@ -103,9 +101,7 @@ impl BrainData {
contracts: RwLock::new(Vec::new()),
tmp_newvm_reqs: DashMap::new(),
tmp_updatevm_reqs: DashMap::new(),
daemon_deletevm_tx: DashMap::new(),
daemon_newvm_tx: DashMap::new(),
daemon_updatevm_tx: DashMap::new(),
daemon_tx: DashMap::new(),
}
}
@ -123,7 +119,7 @@ impl BrainData {
nodes.push(node);
}
pub fn submit_node_resources(&self, res: grpc::NodeResourceReq) {
pub fn submit_node_resources(&self, res: grpc::NodeResources) {
let mut nodes = self.nodes.write().unwrap();
for n in nodes.iter_mut() {
if n.public_key == res.node_pubkey {
@ -148,29 +144,32 @@ impl BrainData {
debug!("Node list:\n{:?}", nodes);
}
pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::DeleteVmReq>) {
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
pub fn add_daemon_tx(&self, node_pubkey: &str, tx: Sender<grpc::BrainMessage>) {
self.daemon_tx.insert(node_pubkey.to_string(), tx);
}
pub fn add_daemon_updatevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::UpdateVmReq>) {
log::debug!("Adding daemon updatevm tx for node_pubkey: {}", node_pubkey);
self.daemon_updatevm_tx.insert(node_pubkey.to_string(), tx);
info!("Added daemon TX for {}", node_pubkey);
pub fn del_daemon_tx(&self, node_pubkey: &str) {
self.daemon_tx.remove(node_pubkey);
}
pub async fn delete_vm(&self, delete_vm: grpc::DeleteVmReq) {
if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) {
info!("Found vm {}. Deleting...", delete_vm.uuid);
if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) {
if let Some(daemon_tx) = self.daemon_tx.get(&contract.node_pubkey) {
debug!(
"TX for daemon {} found. Informing daemon about deletion of {}.",
contract.node_pubkey, delete_vm.uuid
);
if daemon_tx.send(delete_vm.clone()).await.is_err() {
let msg = grpc::BrainMessage {
msg: Some(grpc::brain_message::Msg::DeleteVm(delete_vm.clone())),
};
if let Err(e) = daemon_tx.send(msg).await {
warn!(
"Failed to send deletion request to {}. Triggering memory cleanup.",
"Failed to send deletion request to {} due to error: {e:?}",
contract.node_pubkey
);
info!("Deleting daemon TX for {}", contract.node_pubkey);
self.del_daemon_tx(&contract.node_pubkey);
}
}
let mut contracts = self.contracts.write().unwrap();
@ -178,13 +177,7 @@ impl BrainData {
}
}
pub async fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmReq>) {
self.tmp_newvm_reqs
.retain(|_, req| req.0.node_pubkey != node_pubkey);
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
}
pub async fn submit_newvm_resp(&self, mut new_vm_resp: grpc::NewVmResp) {
pub async fn submit_newvm_resp(&self, new_vm_resp: grpc::NewVmResp) {
let new_vm_req = match self.tmp_newvm_reqs.remove(&new_vm_resp.uuid) {
Some((_, r)) => r,
None => {
@ -192,35 +185,9 @@ impl BrainData {
"Received confirmation for ghost NewVMReq {}",
new_vm_resp.uuid
);
new_vm_resp.error = "Received confirmation for ghost NewVMReq.".to_string();
return;
}
};
match new_vm_resp.args {
Some(ref mut args) => {
if args.dtrfs_api_endpoint.starts_with(':') {
match self.find_nodes_by_pubkey(&new_vm_req.0.node_pubkey) {
Some(node) => {
args.dtrfs_api_endpoint =
format!("{}{}", node.ip, args.dtrfs_api_endpoint);
}
None => {
log::error!("Node not found for pubkey {}", new_vm_req.0.node_pubkey);
new_vm_resp.error = "Node not found.".to_string();
return;
}
}
}
}
None => {
log::error!(
"NewVmResp does not contain MeasurementArgs for {}",
new_vm_resp.uuid
);
new_vm_resp.error = "Daemon did not return measurement args.".to_string();
return;
}
}
if let Err(_) = new_vm_req.1.send(new_vm_resp.clone()) {
log::error!(
"CLI RX for {} dropped before receiving confirmation {:?}.",
@ -284,52 +251,21 @@ impl BrainData {
if update_vm_resp.error != "" {
return;
}
let mut contracts = self.contracts.write().unwrap();
match contracts.iter_mut().find(|c| c.uuid == update_vm_resp.uuid) {
Some(contract) => {
match update_vm_resp.args {
Some(ref mut args) => {
if args.dtrfs_api_endpoint.starts_with(':') {
match self.find_nodes_by_pubkey(&contract.node_pubkey) {
Some(node) => {
args.dtrfs_api_endpoint =
format!("{}{}", node.ip, args.dtrfs_api_endpoint);
}
None => {
log::error!(
"Node not found for pubkey {}",
contract.node_pubkey
);
update_vm_resp.error = "Node not found.".to_string();
return;
}
}
}
}
None => {
log::error!(
"NewVmResp does not contain MeasurementArgs for {}",
update_vm_resp.uuid
);
update_vm_resp.error =
"Daemon did not return measurement args.".to_string();
return;
}
}
contract.disk_size_gb = update_vm_req.0.disk_size_gb;
contract.vcpus = update_vm_req.0.vcpus;
contract.memory_mb = update_vm_req.0.memory_mb;
if !update_vm_req.0.kernel_sha.is_empty() {
info!(
debug!(
"Updating kernel sha for {} to {}",
contract.uuid, update_vm_req.0.kernel_sha
);
contract.kernel_sha = update_vm_req.0.kernel_sha;
}
if !update_vm_req.0.dtrfs_sha.is_empty() {
info!(
debug!(
"Updating dtrfs sha for {} to {}",
contract.uuid, update_vm_req.0.dtrfs_sha
);
@ -342,15 +278,14 @@ impl BrainData {
update_vm_resp.error = "Contract not found.".to_string();
}
}
if let Err(_) = update_vm_req.1.send(update_vm_resp.clone()) {
log::error!(
"CLI RX dropped before receiving UpdateVMResp {:?}.",
update_vm_resp
if let Err(e) = update_vm_req.1.send(update_vm_resp.clone()) {
log::warn!(
"CLI RX dropped before receiving UpdateVMResp {update_vm_resp:?}. Error: {e:?}"
);
}
}
pub async fn submit_newvmrequest(
pub async fn submit_newvm_req(
&self,
mut req: grpc::NewVmReq,
tx: OneshotSender<grpc::NewVmResp>,
@ -359,18 +294,21 @@ impl BrainData {
info!("Inserting new vm request in memory: {req:?}");
self.tmp_newvm_reqs
.insert(req.uuid.clone(), (req.clone(), tx));
if let Some(server_tx) = self.daemon_newvm_tx.get(&req.node_pubkey) {
if let Some(daemon_tx) = self.daemon_tx.get(&req.node_pubkey) {
debug!(
"Found daemon TX for {}. Sending newVMReq {}",
req.node_pubkey, req.uuid
);
if server_tx.send(req.clone()).await.is_ok() {
return;
} else {
let msg = grpc::BrainMessage {
msg: Some(grpc::brain_message::Msg::NewVmReq(req.clone())),
};
if let Err(e) = daemon_tx.send(msg).await {
warn!(
"Daemon {} RX dropped before sending update. Cleaning memory...",
"Failed to send new VM request to {} due to error: {e:?}",
req.node_pubkey
);
info!("Deleting daemon TX for {}", req.node_pubkey);
self.del_daemon_tx(&req.node_pubkey);
self.submit_newvm_resp(grpc::NewVmResp {
error: "Daemon is offline.".to_string(),
uuid: req.uuid,
@ -405,21 +343,23 @@ impl BrainData {
};
self.tmp_updatevm_reqs
.insert(req.uuid.clone(), (req.clone(), tx));
if let Some(server_tx) = self.daemon_updatevm_tx.get(&node_pubkey) {
if let Some(server_tx) = self.daemon_tx.get(&node_pubkey) {
debug!(
"Found daemon TX for {}. Sending updateVMReq {}",
node_pubkey, req.uuid
);
match server_tx.send(req.clone()).await {
let msg = grpc::BrainMessage {
msg: Some(grpc::brain_message::Msg::UpdateVmReq(req.clone())),
};
match server_tx.send(msg).await {
Ok(_) => {
debug!("Successfully sent updateVMReq to {}", node_pubkey);
return;
}
Err(e) => {
warn!(
"Failed to send updateVMReq to {}: {}. Cleaning memory...",
node_pubkey, e
);
warn!("Failed to send update VM request to {node_pubkey} due to error {e}");
info!("Deleting daemon TX for {}", node_pubkey);
self.del_daemon_tx(&node_pubkey);
self.submit_updatevm_resp(grpc::UpdateVmResp {
uuid,
error: "Daemon is offline.".to_string(),
@ -454,7 +394,7 @@ impl BrainData {
nodes.iter().cloned().find(|n| n.owner_key == owner_key)
}
pub fn find_nodes_by_filters(&self, filters: &crate::grpc::brain::NodeFilters) -> Vec<Node> {
pub fn find_nodes_by_filters(&self, filters: &crate::grpc::snp_proto::NodeFilters) -> Vec<Node> {
let nodes = self.nodes.read().unwrap();
nodes
.iter()
@ -477,7 +417,7 @@ impl BrainData {
// TODO: sort by rating
pub fn get_one_node_by_filters(
&self,
filters: &crate::grpc::brain::NodeFilters,
filters: &crate::grpc::snp_proto::NodeFilters,
) -> Option<Node> {
let nodes = self.nodes.read().unwrap();
nodes

@ -1,40 +1,20 @@
#![allow(dead_code)]
pub mod brain {
tonic::include_proto!("brain");
pub mod snp_proto {
tonic::include_proto!("snp_proto");
}
use crate::data::BrainData;
use brain::brain_cli_service_server::BrainCliService;
use brain::brain_daemon_service_server::BrainDaemonService;
use brain::*;
use log::debug;
use snp_proto::brain_cli_server::BrainCli;
use snp_proto::brain_daemon_server::BrainDaemon;
use snp_proto::*;
use log::info;
use log::warn;
use serde::Deserialize;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};
#[derive(Deserialize)]
pub struct ServerLocation {
country: String,
region: String,
city: String,
}
impl ServerLocation {
async fn try_from_ip(ip: &str) -> anyhow::Result<Self> {
let body = reqwest::get("https://ipinfo.io/".to_string() + ip)
.await?
.text()
.await?;
Ok(serde_json::de::from_str(&body)?)
}
}
pub struct BrainDaemonMock {
data: Arc<BrainData>,
}
@ -56,168 +36,26 @@ impl BrainCliMock {
}
#[tonic::async_trait]
impl BrainDaemonService for BrainDaemonMock {
impl BrainDaemon for BrainDaemonMock {
type RegisterNodeStream = Pin<Box<dyn Stream<Item = Result<Contract, Status>> + Send>>;
async fn register_node(
&self,
req: Request<RegisterNodeReq>,
) -> Result<Response<Empty>, Status> {
let mut ip = match req.remote_addr() {
Some(ip) => ip.ip().to_string(),
None => {
return Err(Status::aborted(
"Server was not capable to obtain your IP. Please report this bug at https://detee.ltd",
));
}
};
if cfg!(debug_assertions) {
ip = "83.213.100.17".to_string();
}
let location = match ServerLocation::try_from_ip(&ip).await {
Ok(location) => location,
Err(e) => {
log::error!("Failed to contact ipinfo.io: {e:?}");
return Err(Status::cancelled(
"Server could not obtain information about your IP address.",
));
}
};
) -> Result<Response<Self::RegisterNodeStream>, Status> {
let req = req.into_inner();
info!("Starting registration process for {:?}", req);
let node = crate::data::Node {
public_key: req.node_pubkey,
public_key: req.node_pubkey.clone(),
owner_key: req.owner_pubkey,
country: location.country,
region: location.region,
city: location.city,
ip,
country: req.country,
region: req.region,
city: req.city,
ip: req.main_ip,
..Default::default()
};
self.data.insert_node(node);
Ok(Response::new(Empty {}))
}
async fn send_node_resources(
&self,
req: Request<Streaming<NodeResourceReq>>,
) -> Result<Response<Empty>, Status> {
debug!("Some node connected to stream NodeResourceReq");
let mut resp_stream = req.into_inner();
// Don't do this in prod.
let mut node_pubkey = String::new();
while let Some(new_vm_resp) = resp_stream.next().await {
match new_vm_resp {
Ok(r) => {
node_pubkey = r.node_pubkey.clone();
info!("Received new resources from daemon: {r:?}");
self.data.submit_node_resources(r);
}
Err(e) => {
self.data.submit_node_resources(NodeResourceReq {
node_pubkey: node_pubkey.clone(),
..Default::default()
});
log::warn!("Daemon disconnected from Streaming<NodeResourceReq>: {e:?}");
}
}
}
Ok(Response::new(Empty {}))
}
type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmReq, Status>> + Send>>;
async fn get_new_vm_reqs(
&self,
req: Request<NodePubkey>,
) -> Result<Response<Self::GetNewVMReqsStream>, Status> {
let req = req.into_inner();
info!("Daemon {} requested GetNewVMReqsStream", req.node_pubkey);
let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6);
self.data
.add_daemon_newvm_tx(&req.node_pubkey, data_tx)
.await;
let data = self.data.clone();
tokio::spawn(async move {
while let Some(newvmreq) = data_rx.recv().await {
let uuid = newvmreq.uuid.clone();
debug!("Sending NewVMRequest to {}: {newvmreq:?}", req.node_pubkey);
if let Err(e) = grpc_tx.send(Ok(newvmreq)).await {
warn!("Could not send NewVMRequest to {}: {e:?}", req.node_pubkey);
data.submit_newvm_resp(NewVmResp {
error: "Daemon not connected.".to_string(),
uuid,
..Default::default()
})
.await;
break;
}
}
});
let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new(
Box::pin(output_stream) as Self::GetNewVMReqsStream
))
}
async fn send_new_vm_resp(
&self,
req: Request<Streaming<NewVmResp>>,
) -> Result<Response<Empty>, Status> {
debug!("Some node connected to stream NewVMResp");
let mut resp_stream = req.into_inner();
while let Some(new_vm_resp) = resp_stream.next().await {
match new_vm_resp {
Ok(r) => {
info!("Received NewVmResp from daemon: {r:?}");
self.data.submit_newvm_resp(r).await;
}
Err(e) => {
log::warn!("Daemon disconnected from Streaming<NewVMResp>: {e:?}")
}
}
}
Ok(Response::new(Empty {}))
}
type GetDeleteVMReqStream = Pin<Box<dyn Stream<Item = Result<DeleteVmReq, Status>> + Send>>;
async fn get_delete_vm_req(
&self,
req: Request<NodePubkey>,
) -> Result<Response<Self::GetDeleteVMReqStream>, Status> {
let node_pubkey = req.into_inner().node_pubkey;
info!("Daemon {node_pubkey} requested GetDeleteVMReqStream");
let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6);
self.data
.clone()
.add_daemon_deletevm_tx(&node_pubkey, data_tx);
tokio::spawn(async move {
while let Some(deleted_vm) = data_rx.recv().await {
match grpc_tx.send(Ok(deleted_vm.clone())).await {
Ok(_) => debug!(
"Sent delete_vm confirmation to {}: {:?}",
node_pubkey, deleted_vm
),
Err(e) => log::error!(
"Could not send delete_vm confirmation {:?} to {} because of error: {:?}",
deleted_vm,
node_pubkey,
e
),
}
}
});
let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new(
Box::pin(output_stream) as Self::GetDeleteVMReqStream
))
}
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn list_vm_contracts(
&self,
req: Request<ListVmContractsReq>,
) -> Result<Response<Self::ListVMContractsStream>, Status> {
let req = req.into_inner();
info!("Node {} requested ListVMContractsStream", req.node_pubkey);
info!("Sending existing contracts to {}", req.node_pubkey);
let contracts = self.data.find_contracts_by_node_pubkey(&req.node_pubkey);
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
@ -227,62 +65,52 @@ impl BrainDaemonService for BrainDaemonMock {
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListVMContractsStream
Box::pin(output_stream) as Self::RegisterNodeStream
))
}
type GetUpdateVMReqStream = Pin<Box<dyn Stream<Item = Result<UpdateVmReq, Status>> + Send>>;
async fn get_update_vm_req(
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessage, Status>> + Send>>;
async fn brain_messages(
&self,
req: Request<NodePubkey>,
) -> Result<Response<Self::GetUpdateVMReqStream>, Status> {
req: Request<Pubkey>,
) -> Result<Response<Self::BrainMessagesStream>, Status> {
let req = req.into_inner();
info!("Daemon {} requested GetUpdateVMStream", req.node_pubkey);
let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6);
self.data.add_daemon_updatevm_tx(&req.node_pubkey, data_tx);
let data = self.data.clone();
tokio::spawn(async move {
while let Some(updatevmreq) = data_rx.recv().await {
debug!(
"Sending UpdateVMRequest to {}: {updatevmreq:?}",
req.node_pubkey
);
if let Err(e) = grpc_tx.send(Ok(updatevmreq.clone())).await {
warn!(
"Could not send UpdateVMRequest to {}: {e:?}",
req.node_pubkey
);
data.submit_updatevm_resp(UpdateVmResp {
error: "Daemon not connected.".to_string(),
uuid: updatevmreq.uuid,
args: None,
})
.await;
break;
}
}
});
let output_stream = ReceiverStream::new(grpc_rx);
info!("Daemon {} connected to receive brain messages", req.pubkey);
let (tx, rx) = mpsc::channel(6);
self.data.add_daemon_tx(&req.pubkey, tx);
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
Ok(Response::new(
Box::pin(output_stream) as Self::GetUpdateVMReqStream
Box::pin(output_stream) as Self::BrainMessagesStream
))
}
async fn send_update_vm_resp(
async fn daemon_messages(
&self,
req: Request<Streaming<UpdateVmResp>>,
req: Request<Streaming<DaemonMessage>>,
) -> Result<Response<Empty>, Status> {
debug!("Some node connected to stream UpdateVmResp");
let mut resp_stream = req.into_inner();
while let Some(update_vm_resp) = resp_stream.next().await {
match update_vm_resp {
Ok(c) => {
info!("Received confirmation from daemon: {c:?}");
self.data.submit_updatevm_resp(c).await;
let mut req_stream = req.into_inner();
let mut pubkey = String::new();
while let Some(daemon_message) = req_stream.next().await {
info!("Received a message from daemon {pubkey}: {daemon_message:?}");
match daemon_message {
Ok(msg) => match msg.msg {
Some(daemon_message::Msg::Pubkey(p)) => {
pubkey = p.pubkey;
}
Some(daemon_message::Msg::NewVmResp(new_vm_resp)) => {
self.data.submit_newvm_resp(new_vm_resp).await;
}
Some(daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {
self.data.submit_updatevm_resp(update_vm_resp).await;
}
Some(daemon_message::Msg::NodeResources(node_resources)) => {
self.data.submit_node_resources(node_resources);
}
None => {}
},
Err(e) => {
log::warn!("Daemon disconnected from Streaming<NewVMResp>: {e:?}")
log::warn!("Daemon disconnected: {e:?}");
self.data.del_daemon_tx(&pubkey);
}
}
}
@ -291,16 +119,13 @@ impl BrainDaemonService for BrainDaemonMock {
}
#[tonic::async_trait]
impl BrainCliService for BrainCliMock {
async fn create_vm_contract(
&self,
req: Request<NewVmReq>,
) -> Result<Response<NewVmResp>, Status> {
impl BrainCli for BrainCliMock {
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
let req = req.into_inner();
info!("New VM requested via CLI: {req:?}");
let admin_pubkey = req.admin_pubkey.clone();
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
self.data.submit_newvmrequest(req, oneshot_tx).await;
self.data.submit_newvm_req(req, oneshot_tx).await;
match oneshot_rx.await {
Ok(response) => {
info!("Sending VM confirmation to {admin_pubkey}: {response:?}");
@ -334,11 +159,11 @@ impl BrainCliService for BrainCliMock {
}
}
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn list_vm_contracts(
type ListContractsStream = Pin<Box<dyn Stream<Item = Result<Contract, Status>> + Send>>;
async fn list_contracts(
&self,
req: Request<ListVmContractsReq>,
) -> Result<Response<Self::ListVMContractsStream>, Status> {
req: Request<ListContractsReq>,
) -> Result<Response<Self::ListContractsStream>, Status> {
let req = req.into_inner();
info!("CLI {} requested ListVMContractsStream", req.admin_pubkey);
let contracts = match req.uuid.is_empty() {
@ -356,7 +181,7 @@ impl BrainCliService for BrainCliMock {
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListVMContractsStream
Box::pin(output_stream) as Self::ListContractsStream
))
}

@ -2,8 +2,8 @@ mod data;
mod grpc;
use data::BrainData;
use grpc::brain::brain_cli_service_server::BrainCliServiceServer;
use grpc::brain::brain_daemon_service_server::BrainDaemonServiceServer;
use grpc::snp_proto::brain_cli_server::BrainCliServer;
use grpc::snp_proto::brain_daemon_server::BrainDaemonServer;
use grpc::BrainCliMock;
use grpc::BrainDaemonMock;
use std::sync::Arc;
@ -17,8 +17,8 @@ async fn main() {
let data = Arc::new(BrainData::new());
let addr = "0.0.0.0:31337".parse().unwrap();
let daemon_server = BrainDaemonServiceServer::new(BrainDaemonMock::new(data.clone()));
let cli_server = BrainCliServiceServer::new(BrainCliMock::new(data.clone()));
let daemon_server = BrainDaemonServer::new(BrainDaemonMock::new(data.clone()));
let cli_server = BrainCliServer::new(BrainCliMock::new(data.clone()));
Server::builder()
.add_service(daemon_server)