Dear Noor, Dear Ram, Please take into consideration I am Co-CEO and I am allowed to use language like this in commits messages. You should normally follow my lead, but not in this case. My advice is to not use language like this in commit messages. Yours trully, The G
101 lines
3.6 KiB
Rust
101 lines
3.6 KiB
Rust
#![allow(dead_code)]
|
|
|
|
use super::challenge::update_server::UpdateServer;
|
|
use super::challenge::{Empty, Keys, NodeUpdate};
|
|
use crate::datastore::Store;
|
|
use crate::grpc::challenge::update_server::Update;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use tokio::sync::broadcast::Sender;
|
|
use tokio_stream::{Stream, StreamExt};
|
|
use tonic::{transport::Server, Request, Response, Status, Streaming};
|
|
|
|
pub struct MyServer {
|
|
ds: Arc<Store>,
|
|
tx: Sender<NodeUpdate>,
|
|
}
|
|
|
|
impl MyServer {
|
|
pub fn init(ds: Arc<Store>, tx: Sender<NodeUpdate>) -> Self {
|
|
Self { ds, tx }
|
|
}
|
|
|
|
pub async fn start(self) {
|
|
let addr = "0.0.0.0:31373".parse().unwrap();
|
|
if let Err(e) = Server::builder()
|
|
.add_service(UpdateServer::new(self))
|
|
.serve(addr)
|
|
.await
|
|
{
|
|
println!("gRPC server failed: {e:?}");
|
|
};
|
|
}
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl Update for MyServer {
|
|
type GetUpdatesStream = Pin<Box<dyn Stream<Item = Result<NodeUpdate, Status>> + Send>>;
|
|
|
|
async fn get_updates(
|
|
&self,
|
|
req: Request<Streaming<NodeUpdate>>,
|
|
) -> Result<Response<Self::GetUpdatesStream>, Status> {
|
|
let remote_ip = req.remote_addr().unwrap().ip().to_string();
|
|
let tx = self.tx.clone();
|
|
let mut rx = self.tx.subscribe();
|
|
let mut inbound = req.into_inner();
|
|
let ds = self.ds.clone();
|
|
|
|
let stream = async_stream::stream! {
|
|
let full_update_list: Vec<NodeUpdate> = ds.get_node_list().into_iter().map(Into::<NodeUpdate>::into).collect();
|
|
for update in full_update_list {
|
|
yield Ok(update);
|
|
}
|
|
|
|
loop {
|
|
tokio::select! {
|
|
Some(msg) = inbound.next() => {
|
|
match msg {
|
|
Ok(mut update) => {
|
|
if update.ip == "localhost" {
|
|
update.ip = remote_ip.clone();
|
|
// note that we don't set this node online,
|
|
// as it can be behind NAT
|
|
}
|
|
if update.ip != "127.0.0.1" && ds.process_node_update(update.clone().into()).await {
|
|
if let Err(_) = tx.send(update.clone()) {
|
|
println!("tokio broadcast receivers had an issue consuming the channel");
|
|
}
|
|
};
|
|
}
|
|
Err(e) => {
|
|
yield Err(Status::internal(format!("Error receiving client stream: {}", e)));
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
Ok(update) = rx.recv() => {
|
|
yield Ok(update);
|
|
// disconnect client if too many connections are active
|
|
if tx.receiver_count() > 9 {
|
|
yield Err(Status::internal("Already have too many clients. Connect to another server."));
|
|
return;
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
};
|
|
|
|
Ok(Response::new(Box::pin(stream) as Self::GetUpdatesStream))
|
|
}
|
|
|
|
async fn get_keys(&self, _request: Request<Empty>) -> Result<Response<Keys>, Status> {
|
|
let reply = Keys {
|
|
keypair: self.ds.get_keypair_base58(),
|
|
token_address: self.ds.get_token_address(),
|
|
};
|
|
Ok(Response::new(reply))
|
|
}
|
|
}
|