From 3b5b55e44ea75ae96221034bcbd38c4cbd53ef70 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Fri, 20 Sep 2024 04:56:44 +0300 Subject: [PATCH] dependency inversion principle --- rewrite/src/datastore.rs | 85 ++------------------------------------ rewrite/src/grpc/client.rs | 4 +- rewrite/src/grpc/mod.rs | 56 ++++++++++++++++++++++++- rewrite/src/grpc/server.rs | 4 +- rewrite/src/http_server.rs | 29 ++++++++++++- rewrite/src/main.rs | 2 +- 6 files changed, 91 insertions(+), 89 deletions(-) diff --git a/rewrite/src/datastore.rs b/rewrite/src/datastore.rs index 9adb4c5..274061e 100644 --- a/rewrite/src/datastore.rs +++ b/rewrite/src/datastore.rs @@ -1,11 +1,9 @@ #![allow(dead_code)] -use crate::grpc::challenge::NodeUpdate; -use chrono::{DateTime, Utc}; use dashmap::DashMap; use dashmap::DashSet; use solana_sdk::signature::keypair::Keypair; +use std::time::Duration; use std::time::SystemTime; -use std::time::{Duration, UNIX_EPOCH}; type IP = String; pub const LOCALHOST: &str = "localhost"; @@ -34,19 +32,6 @@ impl NodeInfo { } false } - - pub fn to_node_update(self, ip: &str) -> NodeUpdate { - NodeUpdate { - ip: ip.to_string(), - started_at: Some(prost_types::Timestamp::from(self.started_at)), - keepalive: Some(prost_types::Timestamp::from(self.keepalive)), - mint_requests: self.mint_requests, - mints: self.mints, - ratls_conns: self.ratls_conns, - ratls_attacks: self.ratls_attacks, - public: false, - } - } } /// Keypair must already be known when creating a Store @@ -116,35 +101,7 @@ impl Store { /// /// On a side note, there are two types of people in this world: /// 1. Those that can extrapolate... WAT? - pub async fn process_node_update(&self, node: NodeUpdate) -> bool { - let ip = node.ip; - let started_at: SystemTime = match node.started_at { - Some(ts) => { - let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); - UNIX_EPOCH - .checked_add(duration) - .unwrap_or(SystemTime::now()) - } - None => SystemTime::now(), - }; - let keepalive: SystemTime = match node.keepalive { - Some(ts) => { - let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); - UNIX_EPOCH - .checked_add(duration) - .unwrap_or(SystemTime::now()) - } - None => SystemTime::now(), - }; - let mut node_info = NodeInfo { - started_at, - keepalive, - mint_requests: node.mint_requests, - mints: node.mints, - ratls_conns: node.ratls_conns, - ratls_attacks: node.ratls_attacks, - public: node.public, - }; + pub async fn process_node_update(&self, (ip, mut node_info): (String, NodeInfo)) -> bool { if let Some(old_node) = self.nodes.get(&ip) { if !node_info.newer_than(&old_node) { return false; @@ -155,42 +112,8 @@ impl Store { true } - pub fn get_http_node_list(&self) -> Vec { - self.nodes - .iter() - .map(|node| { - let joined_at: DateTime = node.value().started_at.into(); - let last_keepalive: DateTime = node.value().keepalive.into(); - let joined_at = joined_at.format("%Y-%m-%d %H:%M:%S").to_string(); - let last_keepalive = last_keepalive.format("%Y-%m-%d %H:%M:%S").to_string(); - crate::http_server::NodesResp { - ip: node.key().to_string(), - joined_at, - last_keepalive, - mints: node.value().mints, - ratls_connections: node.value().ratls_conns, - ratls_attacks: node.value().ratls_attacks, - public: node.value().public, - mint_requests: node.value().mint_requests, - } - }) - .collect() - } - - pub fn get_grpc_node_list(&self) -> Vec { - self.nodes - .iter() - .map(|node| NodeUpdate { - ip: node.key().to_string(), - started_at: Some(prost_types::Timestamp::from(node.value().started_at)), - keepalive: Some(prost_types::Timestamp::from(node.value().keepalive)), - mint_requests: node.value().mint_requests, - mints: node.value().mints, - ratls_conns: node.value().ratls_conns, - ratls_attacks: node.value().ratls_attacks, - public: node.value().public, - }) - .collect() + pub fn get_node_list(&self) -> Vec<(String, NodeInfo)> { + self.nodes.clone().into_iter().collect() } // returns a random node that does not have an active connection diff --git a/rewrite/src/grpc/client.rs b/rewrite/src/grpc/client.rs index 48c9209..4c78d33 100644 --- a/rewrite/src/grpc/client.rs +++ b/rewrite/src/grpc/client.rs @@ -55,7 +55,7 @@ impl ConnManager { let response = client.get_updates(rx_stream).await?; let mut resp_stream = response.into_inner(); - let _ = self.tx.send(self.ds.get_localhost().to_node_update(LOCALHOST)); + let _ = self.tx.send((LOCALHOST.to_string(), self.ds.get_localhost()).into()); while let Some(mut update) = resp_stream.message().await? { // "localhost" IPs need to be changed to the real IP of the counterpart @@ -67,7 +67,7 @@ impl ConnManager { } // update the entire network in case the information is new - if self.ds.process_node_update(update.clone()).await { + if self.ds.process_node_update(update.clone().into()).await { if let Err(_) = self.tx.send(update.clone()) { println!("tokio broadcast receivers had an issue consuming the channel"); } diff --git a/rewrite/src/grpc/mod.rs b/rewrite/src/grpc/mod.rs index 15cc0bb..c695121 100644 --- a/rewrite/src/grpc/mod.rs +++ b/rewrite/src/grpc/mod.rs @@ -1,6 +1,60 @@ -pub mod server; pub mod client; +pub mod server; +use crate::datastore; +use crate::datastore::NodeInfo; +use crate::NodeUpdate; +use std::time::SystemTime; +use std::time::{Duration, UNIX_EPOCH}; pub mod challenge { tonic::include_proto!("challenge"); } + +impl From<(String, datastore::NodeInfo)> for NodeUpdate { + fn from((ip, info): (String, datastore::NodeInfo)) -> Self { + NodeUpdate { + ip: ip.to_string(), + started_at: Some(prost_types::Timestamp::from(info.started_at)), + keepalive: Some(prost_types::Timestamp::from(info.keepalive)), + mint_requests: info.mint_requests, + mints: info.mints, + ratls_conns: info.ratls_conns, + ratls_attacks: info.ratls_attacks, + public: info.public, + } + } +} + +impl Into<(String, datastore::NodeInfo)> for NodeUpdate { + fn into(self) -> (String, datastore::NodeInfo) { + let ip = self.ip; + let started_at: SystemTime = match self.started_at { + Some(ts) => { + let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); + UNIX_EPOCH + .checked_add(duration) + .unwrap_or(SystemTime::now()) + } + None => SystemTime::now(), + }; + let keepalive: SystemTime = match self.keepalive { + Some(ts) => { + let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); + UNIX_EPOCH + .checked_add(duration) + .unwrap_or(SystemTime::now()) + } + None => SystemTime::now(), + }; + let self_info = NodeInfo { + started_at, + keepalive, + mint_requests: self.mint_requests, + mints: self.mints, + ratls_conns: self.ratls_conns, + ratls_attacks: self.ratls_attacks, + public: self.public, + }; + (ip, self_info) + } +} diff --git a/rewrite/src/grpc/server.rs b/rewrite/src/grpc/server.rs index 1b8d036..14aff8c 100644 --- a/rewrite/src/grpc/server.rs +++ b/rewrite/src/grpc/server.rs @@ -47,7 +47,7 @@ impl Update for MyServer { let ds = self.ds.clone(); let stream = async_stream::stream! { - let full_update_list = ds.get_grpc_node_list(); + let full_update_list: Vec = ds.get_node_list().into_iter().map(Into::::into).collect(); for update in full_update_list { yield Ok(update); } @@ -62,7 +62,7 @@ impl Update for MyServer { // note that we don't set this node online, // as it can be behind NAT } - if update.ip != "127.0.0.1" && ds.process_node_update(update.clone()).await { + if update.ip != "127.0.0.1" && ds.process_node_update(update.clone().into()).await { if let Err(_) = tx.send(update.clone()) { println!("tokio broadcast receivers had an issue consuming the channel"); } diff --git a/rewrite/src/http_server.rs b/rewrite/src/http_server.rs index 2828ce0..dcc9de3 100644 --- a/rewrite/src/http_server.rs +++ b/rewrite/src/http_server.rs @@ -1,4 +1,5 @@ #![allow(dead_code)] +use crate::datastore; use crate::datastore::Store; use actix_web::{ // http::StatusCode, error::ResponseError, Result, @@ -47,15 +48,39 @@ pub struct NodesResp { pub joined_at: String, pub last_keepalive: String, pub mint_requests: u64, + pub mints: u64, pub ratls_attacks: u64, pub ratls_connections: u64, pub public: bool, - pub mints: u64, +} + +impl From<(String, datastore::NodeInfo)> for NodesResp { + fn from((ip, node_info): (String, datastore::NodeInfo)) -> Self { + let joined_at: DateTime = node_info.started_at.into(); + let last_keepalive: DateTime = node_info.keepalive.into(); + let joined_at = joined_at.format("%Y-%m-%d %H:%M:%S").to_string(); + let last_keepalive = last_keepalive.format("%Y-%m-%d %H:%M:%S").to_string(); + crate::http_server::NodesResp { + ip, + joined_at, + last_keepalive, + mints: node_info.mints, + ratls_connections: node_info.ratls_conns, + ratls_attacks: node_info.ratls_attacks, + public: node_info.public, + mint_requests: node_info.mint_requests, + } + } } #[get("/nodes")] async fn get_nodes(ds: web::Data>) -> HttpResponse { - HttpResponse::Ok().json(ds.get_http_node_list()) + HttpResponse::Ok().json( + ds.get_node_list() + .into_iter() + .map(Into::::into) + .collect::>(), + ) } #[derive(Deserialize)] diff --git a/rewrite/src/main.rs b/rewrite/src/main.rs index d6160a5..e9978a0 100644 --- a/rewrite/src/main.rs +++ b/rewrite/src/main.rs @@ -20,7 +20,7 @@ const INIT_NODES: &str = "detee_challenge_nodes"; pub async fn localhost_cron(ds: Arc, tx: Sender) { loop { sleep(Duration::from_secs(60)).await; - let _ = tx.send(ds.get_localhost().to_node_update(LOCALHOST)); + let _ = tx.send((LOCALHOST.to_string(), ds.get_localhost()).into()); ds.remove_inactive_nodes(); } }