diff --git a/rewrite/src/datastore.rs b/rewrite/src/datastore.rs index 2f62e7d..273c87f 100644 --- a/rewrite/src/datastore.rs +++ b/rewrite/src/datastore.rs @@ -1,9 +1,7 @@ #![allow(dead_code)] use crate::solana::Client as SolClient; -use dashmap::DashMap; -use dashmap::DashSet; -use std::time::Duration; -use std::time::SystemTime; +use dashmap::{DashMap, DashSet}; +use std::time::{Duration, SystemTime}; type IP = String; pub const LOCALHOST: &str = "localhost"; @@ -21,8 +19,12 @@ pub struct NodeInfo { impl NodeInfo { pub fn newer_than(&self, other_node: &Self) -> bool { - if self.keepalive > other_node.keepalive - || self.mint_requests > other_node.mint_requests + if let Ok(duration) = self.keepalive.duration_since(other_node.keepalive) { + if duration > Duration::from_secs(30) { + return true; + } + } + if self.mint_requests > other_node.mint_requests || self.ratls_attacks > other_node.ratls_attacks || self.ratls_conns > other_node.ratls_conns || self.mints > other_node.mints @@ -45,11 +47,7 @@ pub struct Store { impl Store { pub fn init(sol_client: SolClient) -> Self { - let store = Self { - sol_client, - nodes: DashMap::new(), - conns: DashSet::new(), - }; + let store = Self { sol_client, nodes: DashMap::new(), conns: DashSet::new() }; store.nodes.insert( LOCALHOST.to_string(), NodeInfo { @@ -124,6 +122,7 @@ impl Store { } node_info.public = node_info.public || old_node.public; } + println!("Inserting: {}, {:?}", ip, node_info); self.nodes.insert(ip, node_info); true } @@ -150,10 +149,8 @@ impl Store { pub fn remove_inactive_nodes(&self) { self.nodes.retain(|_, v| { - let age = SystemTime::now() - .duration_since(v.keepalive) - .unwrap_or(Duration::ZERO) - .as_secs(); + let age = + SystemTime::now().duration_since(v.keepalive).unwrap_or(Duration::ZERO).as_secs(); if age > 600 { false } else { diff --git a/rewrite/src/grpc/client.rs b/rewrite/src/grpc/client.rs index 57dca41..6fd8a13 100644 --- a/rewrite/src/grpc/client.rs +++ b/rewrite/src/grpc/client.rs @@ -1,17 +1,16 @@ #![allow(dead_code)] use super::challenge::NodeUpdate; -use crate::datastore::Store; -use crate::datastore::LOCALHOST; -use crate::grpc::challenge::update_client::UpdateClient; -use crate::grpc::challenge::Empty; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::keypair::Keypair; -use std::str::FromStr; -use std::sync::Arc; -use tokio::sync::broadcast::Sender; -use tokio::time::{sleep, Duration}; -use tokio_stream::wrappers::BroadcastStream; -use tokio_stream::StreamExt; +use crate::{ + datastore::{Store, LOCALHOST}, + grpc::challenge::{update_client::UpdateClient, Empty}, +}; +use solana_sdk::{pubkey::Pubkey, signature::keypair::Keypair}; +use std::{str::FromStr, sync::Arc}; +use tokio::{ + sync::broadcast::Sender, + time::{sleep, Duration}, +}; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; #[derive(Clone)] pub struct ConnManager { @@ -57,9 +56,7 @@ impl ConnManager { let response = client.get_updates(rx_stream).await?; let mut resp_stream = response.into_inner(); - let _ = self - .tx - .send((LOCALHOST.to_string(), self.ds.get_localhost()).into()); + let _ = self.tx.send((LOCALHOST.to_string(), self.ds.get_localhost()).into()); while let Some(mut update) = resp_stream.message().await? { // "localhost" IPs need to be changed to the real IP of the counterpart diff --git a/rewrite/src/grpc/server.rs b/rewrite/src/grpc/server.rs index 5dcb7be..0e52ba4 100644 --- a/rewrite/src/grpc/server.rs +++ b/rewrite/src/grpc/server.rs @@ -1,11 +1,8 @@ #![allow(dead_code)] -use super::challenge::update_server::UpdateServer; -use super::challenge::{Empty, Keys, NodeUpdate}; -use crate::datastore::Store; -use crate::grpc::challenge::update_server::Update; -use std::pin::Pin; -use std::sync::Arc; +use super::challenge::{update_server::UpdateServer, Empty, Keys, NodeUpdate}; +use crate::{datastore::Store, grpc::challenge::update_server::Update}; +use std::{pin::Pin, sync::Arc}; use tokio::sync::broadcast::Sender; use tokio_stream::{Stream, StreamExt}; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -22,11 +19,7 @@ impl MyServer { pub async fn start(self) { let addr = "0.0.0.0:31373".parse().unwrap(); - if let Err(e) = Server::builder() - .add_service(UpdateServer::new(self)) - .serve(addr) - .await - { + if let Err(e) = Server::builder().add_service(UpdateServer::new(self)).serve(addr).await { println!("gRPC server failed: {e:?}"); }; }