multiple streams on one connection

This commit is contained in:
ghe0 2024-12-22 04:40:36 +02:00
parent eae2aa210f
commit e48b9b0fef
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
5 changed files with 1426 additions and 2 deletions

1223
daemon-mock/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

@ -4,3 +4,14 @@ version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.94"
env_logger = "0.11.6"
log = "0.4.22"
prost = "0.13.4"
prost-types = "0.13.4"
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.17"
tonic = "0.12"
[build-dependencies]
tonic-build = "0.12"

109
daemon-mock/brain.proto Normal file

@ -0,0 +1,109 @@
syntax = "proto3";
package brain;
message Empty {
}
message NodePubkey {
string node_pubkey = 1;
}
message RegisterNodeRequest {
string node_pubkey = 1;
string owner_pubkey = 2;
string ip = 3;
string country = 4;
string city = 5;
uint32 avail_ports = 6;
uint32 avail_ipv4 = 7;
uint32 avail_ipv6 = 8;
uint32 avail_vcpus = 9;
uint32 avail_memory_mb = 10;
uint32 avail_storage_gb = 11;
uint32 max_ports_per_vm = 12;
}
message NewVMRequest {
string uuid = 1; // UUID is empty when CLI sends request; brain sets UUID
string hostname = 2;
string admin_pubkey = 3;
string node_pubkey = 4;
repeated uint32 extra_ports = 5;
bool public_ipv4 = 6;
bool public_ipv6 = 7;
uint32 disk_size_gb = 8;
uint32 vcpus = 9;
uint32 memory_mb = 10;
string kernel_url = 11;
string kernel_sha = 12;
string dtrfs_url = 13;
string dtrfs_sha = 14;
}
message VMContract {
string uuid = 1;
string hostname = 2;
string admin_pubkey = 3;
string node_pubkey = 4;
repeated uint32 exposed_ports = 5;
string public_ipv4 = 6;
string public_ipv6 = 7;
uint32 disk_size_gb = 8;
uint32 vcpus = 9;
uint32 memory_mb = 10;
string kernel_sha = 11;
string dtrfs_sha = 12;
string created_at = 13;
}
message ListVMContractsReq {
string admin_pubkey = 1;
string node_pubkey = 2;
}
message NewVMConfirmation {
string uuid = 1;
repeated uint32 exposed_ports = 2;
string public_ipv4 = 3;
string public_ipv6 = 4;
string error = 5;
}
message DeletedVMUpdate {
string uuid = 1;
}
service BrainDaemonService {
rpc RegisterNode (RegisterNodeRequest) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest);
rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty);
rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
}
message NodeFilters {
uint32 free_ports = 1;
bool offers_ipv4 = 2;
bool offers_ipv6 = 3;
uint32 vcpus = 4;
uint32 memory_mb = 5;
uint32 storage_gb = 6;
string country = 7;
}
message NodeListResp {
string node_pubkey = 1;
string country = 2;
string city = 3;
string ip = 4; // required for latency test
uint32 server_rating = 5;
uint32 provider_rating = 6;
}
service BrainCliService {
rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp);
rpc DeleteVM (DeletedVMUpdate) returns (Empty);
}

6
daemon-mock/build.rs Normal file

@ -0,0 +1,6 @@
fn main() {
tonic_build::configure()
.build_server(true)
.compile_protos(&["brain.proto"], &["proto"])
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
}

@ -1,3 +1,78 @@
fn main() {
println!("Hello, world!");
#![allow(dead_code)]
pub mod brain {
tonic::include_proto!("brain");
}
use anyhow::Result;
use brain::{
brain_daemon_service_client::BrainDaemonServiceClient, NewVmConfirmation, NewVmRequest,
NodePubkey,
};
use log::{error, debug, warn, info};
use tokio::time::{sleep, Duration};
use tokio::{sync::mpsc::Receiver, sync::mpsc::Sender, task::JoinSet};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
async fn listen_for_new_vm_reqs(
mut client: BrainDaemonServiceClient<Channel>,
tx: Sender<NewVmRequest>,
) -> Result<()> {
let node_pubkey = "somePublicKey".to_string();
let mut grpc_stream = client
.get_new_vm_reqs(NodePubkey { node_pubkey })
.await?
.into_inner();
while let Some(newvmreq) = grpc_stream.next().await {
info!("Received new vm request: {newvmreq:?}");
let _ = tx.send(newvmreq?).await;
}
debug!("listen_for_new_vm_reqs is about to exit");
Ok(())
}
async fn send_confirmations(
mut client: BrainDaemonServiceClient<Channel>,
rx: Receiver<NewVmConfirmation>,
) -> Result<()> {
let rx_stream = ReceiverStream::new(rx);
client.send_vm_confirmations(rx_stream).await?;
debug!("send_confirmations is about to exit");
Ok(())
}
async fn connect_and_run() -> Result<()> {
let client = BrainDaemonServiceClient::connect("http://[::1]:31337").await?;
let mut streaming_tasks = JoinSet::new();
let newvm_client = client.clone();
let (tx, _newvm_rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(listen_for_new_vm_reqs(newvm_client, tx));
let confirm_client = client.clone();
let (_confirm_tx, rx) = tokio::sync::mpsc::channel(6);
streaming_tasks.spawn(send_confirmations(confirm_client, rx));
let task_output = streaming_tasks.join_next().await;
warn!("One stream exited: {task_output:?}");
Ok(())
}
async fn connection_wrapper() -> ! {
loop {
info!("Connecting to brain...");
if let Err(e) = connect_and_run().await {
error!("The connection broke: {e}");
}
sleep(Duration::from_secs(3)).await;
}
}
#[tokio::main]
async fn main() {
env_logger::builder()
.filter_level(log::LevelFilter::Debug)
.init();
connection_wrapper().await;
}