From fe3d2e70abfda5de2749dc6cd40f124e9c408c7d Mon Sep 17 00:00:00 2001 From: ghe0 Date: Mon, 19 Aug 2024 02:30:03 +0300 Subject: [PATCH] improved integrations communication between server and datstore improved - added full upgrades - propagate updated only when data is new --- src/datastore.rs | 67 +++++++++++++++++++++++++++++++-- src/grpc/client.rs | 0 src/grpc/mod.rs | 6 +++ src/{grpc.rs => grpc/server.rs} | 31 ++++++++++----- 4 files changed, 91 insertions(+), 13 deletions(-) create mode 100644 src/grpc/client.rs create mode 100644 src/grpc/mod.rs rename src/{grpc.rs => grpc/server.rs} (69%) diff --git a/src/datastore.rs b/src/datastore.rs index 500765c..5d4e7c6 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -1,9 +1,11 @@ #![allow(dead_code)] +use crate::grpc::challenge::NodeUpdate; use ed25519_dalek::{Signer, SigningKey, VerifyingKey}; use rand::rngs::OsRng; use std::collections::HashMap; use std::time::Duration; use std::time::SystemTime; +use std::time::UNIX_EPOCH; use tabled::{Table, Tabled}; use tokio::sync::Mutex; @@ -136,11 +138,54 @@ impl Store { /// /// On a side note, there are two types of people in this worlds: /// 1. Those that can extrapolate... - pub async fn update_node(&self, ip: String, info: NodeInfo) -> bool { + pub async fn process_grpc_update(&self, node: NodeUpdate) -> bool { + let key_bytes = match hex::decode(node.keypair.clone()) { + Ok(k) => k, + Err(_) => return false, + }; + let privkey = SigningKey::from_bytes(match &key_bytes.as_slice().try_into() { + Ok(p) => p, + Err(_) => return false, + }); + let pubkey = privkey.verifying_key(); + let updated_at_std: std::time::SystemTime = match node.updated_at { + Some(ts) => { + let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); + UNIX_EPOCH + .checked_add(duration) + .unwrap_or(SystemTime::now()) + } + None => SystemTime::now(), + }; + + if let Some(old_pubkey) = self + .update_node( + node.ip, + NodeInfo { + pubkey, + updated_at: updated_at_std, + online: node.online, + }, + ) + .await + { + self.remove_key(&old_pubkey).await; + self.add_key(pubkey, privkey).await; + true + } else { + false + } + } + + // returns old pubkey if node got updated + async fn update_node(&self, ip: String, info: NodeInfo) -> Option { let mut nodes = self.nodes.lock().await; match nodes.insert(ip, info.clone()) { - Some(old_info) => old_info.ne(&info), - None => false, + Some(old_info) => match old_info.pubkey.ne(&info.pubkey) { + true => Some(old_info.pubkey), + false => None, + }, + None => None, } } @@ -169,4 +214,20 @@ impl Store { .await; self.add_key(privkey.verifying_key(), privkey).await; } + + pub async fn get_full_node_list(&self) -> Vec { + let nodes = self.nodes.lock().await; + let keys = self.keys.lock().await; + nodes + .iter() + .filter_map(|(ip, node_info)| { + keys.get(&node_info.pubkey).map(|signing_key| NodeUpdate { + ip: ip.to_string(), + keypair: hex::encode(signing_key.as_bytes()), + updated_at: Some(prost_types::Timestamp::from(node_info.updated_at)), + online: node_info.online, + }) + }) + .collect() + } } diff --git a/src/grpc/client.rs b/src/grpc/client.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs new file mode 100644 index 0000000..65e5cbc --- /dev/null +++ b/src/grpc/mod.rs @@ -0,0 +1,6 @@ +mod server; +mod client; + +pub mod challenge { + tonic::include_proto!("challenge"); +} diff --git a/src/grpc.rs b/src/grpc/server.rs similarity index 69% rename from src/grpc.rs rename to src/grpc/server.rs index 52d35ed..5d45618 100644 --- a/src/grpc.rs +++ b/src/grpc/server.rs @@ -1,7 +1,9 @@ #![allow(dead_code)] +use super::challenge::update_server::Update; +use super::challenge::update_server::UpdateServer; +use super::challenge::NodeUpdate; use crate::datastore::Store; -use challenge::NodeUpdate; use std::pin::Pin; use std::sync::Arc; use std::thread::JoinHandle; @@ -9,10 +11,6 @@ use tokio::sync::broadcast::Sender; use tokio_stream::{Stream, StreamExt}; use tonic::{transport::Server, Request, Response, Status, Streaming}; -pub mod challenge { - tonic::include_proto!("challenge"); -} - pub struct MyServer { ds: Arc, tx: Sender, @@ -28,7 +26,7 @@ impl MyServer { tokio::runtime::Runtime::new().unwrap().block_on(async { let addr = "0.0.0.0:31373".parse().unwrap(); if let Err(e) = Server::builder() - .add_service(challenge::update_server::UpdateServer::new(self)) + .add_service(UpdateServer::new(self)) .serve(addr) .await { @@ -40,7 +38,7 @@ impl MyServer { } #[tonic::async_trait] -impl challenge::update_server::Update for MyServer { +impl Update for MyServer { type GetUpdatesStream = Pin> + Send>>; async fn get_updates( @@ -51,16 +49,29 @@ impl challenge::update_server::Update for MyServer { let tx = self.tx.clone(); let mut rx = self.tx.subscribe(); let mut inbound = req.into_inner(); + let ds = self.ds.clone(); let stream = async_stream::stream! { + let full_update_list = ds.get_full_node_list().await; + for update in full_update_list { + yield Ok(update); + } + + if tx.receiver_count() > 10 { + yield Err(Status::internal("Already have too many clients. Connect to another server.")); + return; + } + loop { tokio::select! { Some(msg) = inbound.next() => { match msg { Ok(update) => { - if let Err(_) = tx.send(update.clone()) { - yield Err(Status::internal("Failed to broadcast update")); - } + if ds.process_grpc_update(update.clone()).await { + if let Err(_) = tx.send(update.clone()) { + println!("tokio broadcast receivers had an issue consuming the channel"); + } + }; } Err(e) => { yield Err(Status::internal(format!("Error receiving client stream: {}", e)));