102 lines
3.5 KiB
Rust
102 lines
3.5 KiB
Rust
#![allow(dead_code)]
|
|
|
|
use super::challenge::update_server::UpdateServer;
|
|
use super::challenge::{Empty, Keypair, NodeUpdate};
|
|
use crate::datastore::Store;
|
|
use crate::grpc::challenge::update_server::Update;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use tokio::sync::broadcast::Sender;
|
|
use tokio_stream::{Stream, StreamExt};
|
|
use tonic::{transport::Server, Request, Response, Status, Streaming};
|
|
|
|
pub struct MyServer {
|
|
ds: Arc<Store>,
|
|
tx: Sender<NodeUpdate>,
|
|
}
|
|
|
|
impl MyServer {
|
|
pub fn init(ds: Arc<Store>, tx: Sender<NodeUpdate>) -> Self {
|
|
Self { ds, tx }
|
|
}
|
|
|
|
pub async fn start(self) {
|
|
let addr = "0.0.0.0:31373".parse().unwrap();
|
|
if let Err(e) = Server::builder()
|
|
.add_service(UpdateServer::new(self))
|
|
.serve(addr)
|
|
.await
|
|
{
|
|
println!("gRPC server failed: {e:?}");
|
|
};
|
|
}
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl Update for MyServer {
|
|
type GetUpdatesStream = Pin<Box<dyn Stream<Item = Result<NodeUpdate, Status>> + Send>>;
|
|
|
|
async fn get_updates(
|
|
&self,
|
|
req: Request<Streaming<NodeUpdate>>,
|
|
) -> Result<Response<Self::GetUpdatesStream>, Status> {
|
|
let remote_ip = req.remote_addr().unwrap().ip().to_string();
|
|
let tx = self.tx.clone();
|
|
let mut rx = self.tx.subscribe();
|
|
let mut inbound = req.into_inner();
|
|
let ds = self.ds.clone();
|
|
|
|
let stream = async_stream::stream! {
|
|
let full_update_list = ds.get_full_node_list().await;
|
|
for update in full_update_list {
|
|
yield Ok(update);
|
|
}
|
|
|
|
loop {
|
|
tokio::select! {
|
|
Some(msg) = inbound.next() => {
|
|
match msg {
|
|
Ok(mut update) => {
|
|
if update.ip == "localhost" {
|
|
update.ip = remote_ip.clone();
|
|
// note that we don't set this node online,
|
|
// as it can be behind NAT
|
|
}
|
|
if update.ip != "127.0.0.1" && ds.process_node_update(update.clone()).await {
|
|
if let Err(_) = tx.send(update.clone()) {
|
|
println!("tokio broadcast receivers had an issue consuming the channel");
|
|
}
|
|
};
|
|
}
|
|
Err(e) => {
|
|
yield Err(Status::internal(format!("Error receiving client stream: {}", e)));
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
Ok(update) = rx.recv() => {
|
|
yield Ok(update);
|
|
// disconnect client if too many connections are active
|
|
if tx.receiver_count() > 9 {
|
|
yield Err(Status::internal("Already have too many clients. Connect to another server."));
|
|
return;
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
};
|
|
|
|
Ok(Response::new(Box::pin(stream) as Self::GetUpdatesStream))
|
|
}
|
|
|
|
async fn get_keypair(&self, request: Request<Empty>) -> Result<Response<Keypair>, Status> {
|
|
println!("Got a request from {:?}", request.remote_addr());
|
|
|
|
let reply = Keypair {
|
|
keypair: self.ds.get_keypair_base58(),
|
|
};
|
|
Ok(Response::new(reply))
|
|
}
|
|
}
|