diff --git a/rewrite/proto/challenge.proto b/rewrite/proto/challenge.proto index 3b0c125..8b3802a 100644 --- a/rewrite/proto/challenge.proto +++ b/rewrite/proto/challenge.proto @@ -7,10 +7,11 @@ message NodeUpdate { string ip = 1; google.protobuf.Timestamp started_at = 2; google.protobuf.Timestamp keepalive = 3; - uint64 total_mints = 4; - uint64 ratls_conns = 5; - uint64 ratls_attacks = 6; - bool public = 7; + uint64 mint_requests = 4; + uint64 mints = 5; + uint64 ratls_conns = 6; + uint64 ratls_attacks = 7; + bool public = 8; } message Keypair { diff --git a/rewrite/src/datastore.rs b/rewrite/src/datastore.rs index 785a096..f32f09e 100644 --- a/rewrite/src/datastore.rs +++ b/rewrite/src/datastore.rs @@ -4,20 +4,49 @@ use dashmap::DashMap; use dashmap::DashSet; use solana_sdk::signature::keypair::Keypair; use std::time::SystemTime; +use std::time::{Duration, UNIX_EPOCH}; type IP = String; +pub const LOCALHOST: &str = "localhost"; #[derive(Clone, PartialEq, Debug)] pub struct NodeInfo { pub started_at: SystemTime, pub keepalive: SystemTime, pub mint_requests: u64, - pub total_mints: u64, + pub mints: u64, pub ratls_conns: u64, pub ratls_attacks: u64, pub public: bool, } +impl NodeInfo { + pub fn newer_than(&self, other_node: &Self) -> bool { + if self.keepalive > other_node.keepalive + || self.mint_requests > other_node.mint_requests + || self.ratls_attacks > other_node.ratls_attacks + || self.ratls_conns > other_node.ratls_conns + || self.mints > other_node.mints + { + return true; + } + false + } + + pub fn to_node_update(self, ip: &str) -> NodeUpdate { + NodeUpdate { + ip: ip.to_string(), + started_at: Some(prost_types::Timestamp::from(self.started_at)), + keepalive: Some(prost_types::Timestamp::from(self.keepalive)), + mint_requests: self.mint_requests, + mints: self.mints, + ratls_conns: self.ratls_conns, + ratls_attacks: self.ratls_attacks, + public: false, + } + } +} + /// Keypair must already be known when creating a Store /// This means the first node of the network creates the key /// Second node will grab the key from the first node @@ -31,56 +60,106 @@ pub struct Store { impl Store { pub fn init(key: Keypair) -> Self { - Self { + let store = Self { key, nodes: DashMap::new(), conns: DashSet::new(), - } + }; + store.nodes.insert( + LOCALHOST.to_string(), + NodeInfo { + started_at: SystemTime::now(), + keepalive: SystemTime::now(), + mint_requests: 0, + mints: 0, + ratls_conns: 0, + ratls_attacks: 0, + public: false, + }, + ); + store } pub fn get_keypair_base58(&self) -> String { self.key.to_base58_string() } - pub async fn add_conn(&self, ip: &str) { + pub fn add_conn(&self, ip: &str) { self.conns.insert(ip.to_string()); } - pub async fn delete_conn(&self, ip: &str) { + pub fn delete_conn(&self, ip: &str) { self.conns.remove(ip); } - pub async fn get_localhost(&self) -> NodeUpdate { - // TODO trigger reset_localhost_keys on error instead of expects - let node = self.nodes.get("localhost").expect("no localhost node"); - NodeUpdate { - ip: "localhost".to_string(), - started_at: Some(prost_types::Timestamp::from(node.started_at)), - keepalive: Some(prost_types::Timestamp::from(SystemTime::now())), - total_mints: node.total_mints, - ratls_conns: node.ratls_conns, - ratls_attacks: node.ratls_attacks, - public: false, + pub fn increase_mint_requests(&self) { + if let Some(mut localhost_info) = self.nodes.get_mut(LOCALHOST) { + localhost_info.mint_requests += 1; } } + pub fn increase_mints(&self) { + if let Some(mut localhost_info) = self.nodes.get_mut(LOCALHOST) { + localhost_info.mints += 1; + } + } + + pub fn get_localhost(&self) -> NodeInfo { + let mut localhost = self.nodes.get_mut(LOCALHOST).expect("no localhost node"); + localhost.keepalive = SystemTime::now(); + localhost.clone() + } + /// This returns true if NodeInfo got modified. /// /// On a side note, there are two types of people in this world: /// 1. Those that can extrapolate... WAT? - pub async fn process_node_update(&self, _node: NodeUpdate) -> bool { - // TODO: write this function + pub async fn process_node_update(&self, node: NodeUpdate) -> bool { + let ip = node.ip; + let started_at: SystemTime = match node.started_at { + Some(ts) => { + let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); + UNIX_EPOCH + .checked_add(duration) + .unwrap_or(SystemTime::now()) + } + None => SystemTime::now(), + }; + let keepalive: SystemTime = match node.keepalive { + Some(ts) => { + let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); + UNIX_EPOCH + .checked_add(duration) + .unwrap_or(SystemTime::now()) + } + None => SystemTime::now(), + }; + let node_info = NodeInfo { + started_at, + keepalive, + mint_requests: node.mint_requests, + mints: node.mints, + ratls_conns: node.ratls_conns, + ratls_attacks: node.ratls_attacks, + public: node.public, + }; + if let Some(old_node) = self.nodes.get(&ip) { + if !node_info.newer_than(&old_node) { + return false; + } + } + self.nodes.insert(ip, node_info); true } - pub async fn get_http_node_list(&self) -> Vec { + pub fn get_http_node_list(&self) -> Vec { self.nodes .iter() .map(|node| crate::http_server::NodesResp { ip: node.key().to_string(), joined_at: node.value().started_at, last_keepalive: node.value().keepalive, - mints: node.value().total_mints, + mints: node.value().mints, ratls_connections: node.value().ratls_conns, ratls_attacks: node.value().ratls_attacks, public: node.value().public, @@ -89,14 +168,15 @@ impl Store { .collect() } - pub async fn get_grpc_node_list(&self) -> Vec { + pub fn get_grpc_node_list(&self) -> Vec { self.nodes .iter() .map(|node| NodeUpdate { ip: node.key().to_string(), started_at: Some(prost_types::Timestamp::from(node.value().started_at)), keepalive: Some(prost_types::Timestamp::from(node.value().keepalive)), - total_mints: node.value().total_mints, + mint_requests: node.value().mint_requests, + mints: node.value().mints, ratls_conns: node.value().ratls_conns, ratls_attacks: node.value().ratls_attacks, public: node.value().public, @@ -105,7 +185,7 @@ impl Store { } // returns a random node that does not have an active connection - pub async fn get_random_node(&self) -> Option { + pub fn get_random_node(&self) -> Option { use rand::{rngs::OsRng, RngCore}; let len = self.nodes.len(); if len == 0 { @@ -119,4 +199,18 @@ impl Store { .skip(skip) .find(|k| !self.conns.contains(k)) } + + pub fn remove_inactive_nodes(&self) { + self.nodes.retain(|_, v| { + let age = SystemTime::now() + .duration_since(v.keepalive) + .unwrap_or(Duration::ZERO) + .as_secs(); + if age > 600 { + false + } else { + true + } + }); + } } diff --git a/rewrite/src/grpc/client.rs b/rewrite/src/grpc/client.rs index 62dc6d2..48c9209 100644 --- a/rewrite/src/grpc/client.rs +++ b/rewrite/src/grpc/client.rs @@ -9,6 +9,7 @@ use tokio::sync::broadcast::Sender; use tokio::time::{sleep, Duration}; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; +use crate::datastore::LOCALHOST; #[derive(Clone)] pub struct ConnManager { @@ -27,8 +28,8 @@ impl ConnManager { pub async fn start(self) { loop { - if let Some(node) = self.ds.get_random_node().await { - if node != "localhost" { + if let Some(node) = self.ds.get_random_node() { + if node != LOCALHOST { self.connect_wrapper(node.clone()).await; } } @@ -38,11 +39,11 @@ impl ConnManager { async fn connect_wrapper(&self, node_ip: String) { let ds = self.ds.clone(); - ds.add_conn(&node_ip).await; + ds.add_conn(&node_ip); if let Err(e) = self.connect(node_ip.clone()).await { println!("Client connection for {node_ip} failed: {e:?}"); } - ds.delete_conn(&node_ip).await; + ds.delete_conn(&node_ip); } async fn connect(&self, node_ip: String) -> Result<(), Box> { @@ -54,11 +55,11 @@ impl ConnManager { let response = client.get_updates(rx_stream).await?; let mut resp_stream = response.into_inner(); - let _ = self.tx.send(self.ds.get_localhost().await); + let _ = self.tx.send(self.ds.get_localhost().to_node_update(LOCALHOST)); while let Some(mut update) = resp_stream.message().await? { // "localhost" IPs need to be changed to the real IP of the counterpart - if update.ip == "localhost" { + if update.ip == LOCALHOST { update.ip = node_ip.clone(); // since we are connecting TO this server, we have a guarantee that this // server is not behind NAT, so we can set it public @@ -81,10 +82,9 @@ pub async fn key_grabber(node_ip: String) -> Result k, - Err(_) => return Err("Could not parse key".into()), - }; + let keypair = match std::panic::catch_unwind(|| Keypair::from_base58_string(&response)) { + Ok(k) => k, + Err(_) => return Err("Could not parse key".into()), + }; Ok(keypair) } diff --git a/rewrite/src/grpc/server.rs b/rewrite/src/grpc/server.rs index 2234537..1b8d036 100644 --- a/rewrite/src/grpc/server.rs +++ b/rewrite/src/grpc/server.rs @@ -47,7 +47,7 @@ impl Update for MyServer { let ds = self.ds.clone(); let stream = async_stream::stream! { - let full_update_list = ds.get_grpc_node_list().await; + let full_update_list = ds.get_grpc_node_list(); for update in full_update_list { yield Ok(update); } diff --git a/rewrite/src/http_server.rs b/rewrite/src/http_server.rs index 4bb8656..f6ed5c4 100644 --- a/rewrite/src/http_server.rs +++ b/rewrite/src/http_server.rs @@ -10,7 +10,6 @@ use actix_web::{ HttpServer, Responder, }; -use rand::Rng; use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::time::SystemTime; @@ -57,7 +56,7 @@ pub struct NodesResp { #[get("/nodes")] async fn get_nodes(ds: web::Data>) -> HttpResponse { - HttpResponse::Ok().json(ds.get_http_node_list().await) + HttpResponse::Ok().json(ds.get_http_node_list()) } #[derive(Deserialize)] @@ -66,7 +65,8 @@ struct MintReq { } #[post("/mint")] -async fn mint(_: web::Json) -> impl Responder { +async fn mint(ds: web::Data>, _: web::Json) -> impl Responder { + ds.increase_mint_requests(); HttpResponse::Ok().json({}) } diff --git a/rewrite/src/main.rs b/rewrite/src/main.rs index e958b10..f90f8d9 100644 --- a/rewrite/src/main.rs +++ b/rewrite/src/main.rs @@ -1,16 +1,30 @@ mod datastore; mod grpc; mod http_server; +use crate::datastore::LOCALHOST; +use crate::grpc::challenge::NodeUpdate; +use datastore::Store; use solana_sdk::signature::keypair::Keypair; +use solana_sdk::signer::Signer; use std::fs::File; use std::io::{BufRead, BufReader}; use std::sync::Arc; use tokio::sync::broadcast; +use tokio::sync::broadcast::Sender; +use tokio::time::{sleep, Duration}; use tokio::task::JoinSet; const INIT_NODES: &str = "detee_challenge_nodes"; +pub async fn localhost_cron(ds: Arc, tx: Sender) { + loop { + sleep(Duration::from_secs(60)).await; + let _ = tx.send(ds.get_localhost().to_node_update(LOCALHOST)); + ds.remove_inactive_nodes(); + } +} + async fn get_keypair() -> Keypair { let input = match File::open(INIT_NODES) { Ok(i) => i, @@ -23,7 +37,13 @@ async fn get_keypair() -> Keypair { let buffered = BufReader::new(input); for line in buffered.lines() { match grpc::client::key_grabber(line.unwrap()).await { - Ok(keypair) => return keypair, + Ok(keypair) => { + println!( + "Got keypair from the network. Joining the network using wallet {}.", + keypair.pubkey() + ); + return keypair; + } Err(e) => { println!("Could not get keypair: {e:?}"); } @@ -35,13 +55,14 @@ async fn get_keypair() -> Keypair { #[tokio::main] async fn main() { let keypair = get_keypair().await; - let ds = Arc::new(datastore::Store::init(keypair)); + let ds = Arc::new(Store::init(keypair)); let (tx, mut _rx) = broadcast::channel(500); let mut long_term_tasks = JoinSet::new(); let mut init_tasks = JoinSet::new(); + long_term_tasks.spawn(localhost_cron(ds.clone(), tx.clone())); long_term_tasks.spawn(http_server::init(ds.clone())); long_term_tasks.spawn(grpc::server::MyServer::init(ds.clone(), tx.clone()).start());