added updates every 60s and mint request counter

This commit is contained in:
ghe0 2024-09-19 03:22:15 +03:00
parent cdb88f4b16
commit 216b08be57
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
6 changed files with 160 additions and 44 deletions

@ -7,10 +7,11 @@ message NodeUpdate {
string ip = 1; string ip = 1;
google.protobuf.Timestamp started_at = 2; google.protobuf.Timestamp started_at = 2;
google.protobuf.Timestamp keepalive = 3; google.protobuf.Timestamp keepalive = 3;
uint64 total_mints = 4; uint64 mint_requests = 4;
uint64 ratls_conns = 5; uint64 mints = 5;
uint64 ratls_attacks = 6; uint64 ratls_conns = 6;
bool public = 7; uint64 ratls_attacks = 7;
bool public = 8;
} }
message Keypair { message Keypair {

@ -4,20 +4,49 @@ use dashmap::DashMap;
use dashmap::DashSet; use dashmap::DashSet;
use solana_sdk::signature::keypair::Keypair; use solana_sdk::signature::keypair::Keypair;
use std::time::SystemTime; use std::time::SystemTime;
use std::time::{Duration, UNIX_EPOCH};
type IP = String; type IP = String;
pub const LOCALHOST: &str = "localhost";
#[derive(Clone, PartialEq, Debug)] #[derive(Clone, PartialEq, Debug)]
pub struct NodeInfo { pub struct NodeInfo {
pub started_at: SystemTime, pub started_at: SystemTime,
pub keepalive: SystemTime, pub keepalive: SystemTime,
pub mint_requests: u64, pub mint_requests: u64,
pub total_mints: u64, pub mints: u64,
pub ratls_conns: u64, pub ratls_conns: u64,
pub ratls_attacks: u64, pub ratls_attacks: u64,
pub public: bool, pub public: bool,
} }
impl NodeInfo {
pub fn newer_than(&self, other_node: &Self) -> bool {
if self.keepalive > other_node.keepalive
|| self.mint_requests > other_node.mint_requests
|| self.ratls_attacks > other_node.ratls_attacks
|| self.ratls_conns > other_node.ratls_conns
|| self.mints > other_node.mints
{
return true;
}
false
}
pub fn to_node_update(self, ip: &str) -> NodeUpdate {
NodeUpdate {
ip: ip.to_string(),
started_at: Some(prost_types::Timestamp::from(self.started_at)),
keepalive: Some(prost_types::Timestamp::from(self.keepalive)),
mint_requests: self.mint_requests,
mints: self.mints,
ratls_conns: self.ratls_conns,
ratls_attacks: self.ratls_attacks,
public: false,
}
}
}
/// Keypair must already be known when creating a Store /// Keypair must already be known when creating a Store
/// This means the first node of the network creates the key /// This means the first node of the network creates the key
/// Second node will grab the key from the first node /// Second node will grab the key from the first node
@ -31,56 +60,106 @@ pub struct Store {
impl Store { impl Store {
pub fn init(key: Keypair) -> Self { pub fn init(key: Keypair) -> Self {
Self { let store = Self {
key, key,
nodes: DashMap::new(), nodes: DashMap::new(),
conns: DashSet::new(), conns: DashSet::new(),
} };
store.nodes.insert(
LOCALHOST.to_string(),
NodeInfo {
started_at: SystemTime::now(),
keepalive: SystemTime::now(),
mint_requests: 0,
mints: 0,
ratls_conns: 0,
ratls_attacks: 0,
public: false,
},
);
store
} }
pub fn get_keypair_base58(&self) -> String { pub fn get_keypair_base58(&self) -> String {
self.key.to_base58_string() self.key.to_base58_string()
} }
pub async fn add_conn(&self, ip: &str) { pub fn add_conn(&self, ip: &str) {
self.conns.insert(ip.to_string()); self.conns.insert(ip.to_string());
} }
pub async fn delete_conn(&self, ip: &str) { pub fn delete_conn(&self, ip: &str) {
self.conns.remove(ip); self.conns.remove(ip);
} }
pub async fn get_localhost(&self) -> NodeUpdate { pub fn increase_mint_requests(&self) {
// TODO trigger reset_localhost_keys on error instead of expects if let Some(mut localhost_info) = self.nodes.get_mut(LOCALHOST) {
let node = self.nodes.get("localhost").expect("no localhost node"); localhost_info.mint_requests += 1;
NodeUpdate {
ip: "localhost".to_string(),
started_at: Some(prost_types::Timestamp::from(node.started_at)),
keepalive: Some(prost_types::Timestamp::from(SystemTime::now())),
total_mints: node.total_mints,
ratls_conns: node.ratls_conns,
ratls_attacks: node.ratls_attacks,
public: false,
} }
} }
pub fn increase_mints(&self) {
if let Some(mut localhost_info) = self.nodes.get_mut(LOCALHOST) {
localhost_info.mints += 1;
}
}
pub fn get_localhost(&self) -> NodeInfo {
let mut localhost = self.nodes.get_mut(LOCALHOST).expect("no localhost node");
localhost.keepalive = SystemTime::now();
localhost.clone()
}
/// This returns true if NodeInfo got modified. /// This returns true if NodeInfo got modified.
/// ///
/// On a side note, there are two types of people in this world: /// On a side note, there are two types of people in this world:
/// 1. Those that can extrapolate... WAT? /// 1. Those that can extrapolate... WAT?
pub async fn process_node_update(&self, _node: NodeUpdate) -> bool { pub async fn process_node_update(&self, node: NodeUpdate) -> bool {
// TODO: write this function let ip = node.ip;
let started_at: SystemTime = match node.started_at {
Some(ts) => {
let duration = Duration::new(ts.seconds as u64, ts.nanos as u32);
UNIX_EPOCH
.checked_add(duration)
.unwrap_or(SystemTime::now())
}
None => SystemTime::now(),
};
let keepalive: SystemTime = match node.keepalive {
Some(ts) => {
let duration = Duration::new(ts.seconds as u64, ts.nanos as u32);
UNIX_EPOCH
.checked_add(duration)
.unwrap_or(SystemTime::now())
}
None => SystemTime::now(),
};
let node_info = NodeInfo {
started_at,
keepalive,
mint_requests: node.mint_requests,
mints: node.mints,
ratls_conns: node.ratls_conns,
ratls_attacks: node.ratls_attacks,
public: node.public,
};
if let Some(old_node) = self.nodes.get(&ip) {
if !node_info.newer_than(&old_node) {
return false;
}
}
self.nodes.insert(ip, node_info);
true true
} }
pub async fn get_http_node_list(&self) -> Vec<crate::http_server::NodesResp> { pub fn get_http_node_list(&self) -> Vec<crate::http_server::NodesResp> {
self.nodes self.nodes
.iter() .iter()
.map(|node| crate::http_server::NodesResp { .map(|node| crate::http_server::NodesResp {
ip: node.key().to_string(), ip: node.key().to_string(),
joined_at: node.value().started_at, joined_at: node.value().started_at,
last_keepalive: node.value().keepalive, last_keepalive: node.value().keepalive,
mints: node.value().total_mints, mints: node.value().mints,
ratls_connections: node.value().ratls_conns, ratls_connections: node.value().ratls_conns,
ratls_attacks: node.value().ratls_attacks, ratls_attacks: node.value().ratls_attacks,
public: node.value().public, public: node.value().public,
@ -89,14 +168,15 @@ impl Store {
.collect() .collect()
} }
pub async fn get_grpc_node_list(&self) -> Vec<NodeUpdate> { pub fn get_grpc_node_list(&self) -> Vec<NodeUpdate> {
self.nodes self.nodes
.iter() .iter()
.map(|node| NodeUpdate { .map(|node| NodeUpdate {
ip: node.key().to_string(), ip: node.key().to_string(),
started_at: Some(prost_types::Timestamp::from(node.value().started_at)), started_at: Some(prost_types::Timestamp::from(node.value().started_at)),
keepalive: Some(prost_types::Timestamp::from(node.value().keepalive)), keepalive: Some(prost_types::Timestamp::from(node.value().keepalive)),
total_mints: node.value().total_mints, mint_requests: node.value().mint_requests,
mints: node.value().mints,
ratls_conns: node.value().ratls_conns, ratls_conns: node.value().ratls_conns,
ratls_attacks: node.value().ratls_attacks, ratls_attacks: node.value().ratls_attacks,
public: node.value().public, public: node.value().public,
@ -105,7 +185,7 @@ impl Store {
} }
// returns a random node that does not have an active connection // returns a random node that does not have an active connection
pub async fn get_random_node(&self) -> Option<String> { pub fn get_random_node(&self) -> Option<String> {
use rand::{rngs::OsRng, RngCore}; use rand::{rngs::OsRng, RngCore};
let len = self.nodes.len(); let len = self.nodes.len();
if len == 0 { if len == 0 {
@ -119,4 +199,18 @@ impl Store {
.skip(skip) .skip(skip)
.find(|k| !self.conns.contains(k)) .find(|k| !self.conns.contains(k))
} }
pub fn remove_inactive_nodes(&self) {
self.nodes.retain(|_, v| {
let age = SystemTime::now()
.duration_since(v.keepalive)
.unwrap_or(Duration::ZERO)
.as_secs();
if age > 600 {
false
} else {
true
}
});
}
} }

@ -9,6 +9,7 @@ use tokio::sync::broadcast::Sender;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use crate::datastore::LOCALHOST;
#[derive(Clone)] #[derive(Clone)]
pub struct ConnManager { pub struct ConnManager {
@ -27,8 +28,8 @@ impl ConnManager {
pub async fn start(self) { pub async fn start(self) {
loop { loop {
if let Some(node) = self.ds.get_random_node().await { if let Some(node) = self.ds.get_random_node() {
if node != "localhost" { if node != LOCALHOST {
self.connect_wrapper(node.clone()).await; self.connect_wrapper(node.clone()).await;
} }
} }
@ -38,11 +39,11 @@ impl ConnManager {
async fn connect_wrapper(&self, node_ip: String) { async fn connect_wrapper(&self, node_ip: String) {
let ds = self.ds.clone(); let ds = self.ds.clone();
ds.add_conn(&node_ip).await; ds.add_conn(&node_ip);
if let Err(e) = self.connect(node_ip.clone()).await { if let Err(e) = self.connect(node_ip.clone()).await {
println!("Client connection for {node_ip} failed: {e:?}"); println!("Client connection for {node_ip} failed: {e:?}");
} }
ds.delete_conn(&node_ip).await; ds.delete_conn(&node_ip);
} }
async fn connect(&self, node_ip: String) -> Result<(), Box<dyn std::error::Error>> { async fn connect(&self, node_ip: String) -> Result<(), Box<dyn std::error::Error>> {
@ -54,11 +55,11 @@ impl ConnManager {
let response = client.get_updates(rx_stream).await?; let response = client.get_updates(rx_stream).await?;
let mut resp_stream = response.into_inner(); let mut resp_stream = response.into_inner();
let _ = self.tx.send(self.ds.get_localhost().await); let _ = self.tx.send(self.ds.get_localhost().to_node_update(LOCALHOST));
while let Some(mut update) = resp_stream.message().await? { while let Some(mut update) = resp_stream.message().await? {
// "localhost" IPs need to be changed to the real IP of the counterpart // "localhost" IPs need to be changed to the real IP of the counterpart
if update.ip == "localhost" { if update.ip == LOCALHOST {
update.ip = node_ip.clone(); update.ip = node_ip.clone();
// since we are connecting TO this server, we have a guarantee that this // since we are connecting TO this server, we have a guarantee that this
// server is not behind NAT, so we can set it public // server is not behind NAT, so we can set it public
@ -81,10 +82,9 @@ pub async fn key_grabber(node_ip: String) -> Result<Keypair, Box<dyn std::error:
let mut client = UpdateClient::connect(format!("http://{node_ip}:31373")).await?; let mut client = UpdateClient::connect(format!("http://{node_ip}:31373")).await?;
let response = client.get_keypair(tonic::Request::new(Empty {})).await?; let response = client.get_keypair(tonic::Request::new(Empty {})).await?;
let response = &response.into_inner().keypair; let response = &response.into_inner().keypair;
let keypair = let keypair = match std::panic::catch_unwind(|| Keypair::from_base58_string(&response)) {
match std::panic::catch_unwind(|| Keypair::from_base58_string(&response)) { Ok(k) => k,
Ok(k) => k, Err(_) => return Err("Could not parse key".into()),
Err(_) => return Err("Could not parse key".into()), };
};
Ok(keypair) Ok(keypair)
} }

@ -47,7 +47,7 @@ impl Update for MyServer {
let ds = self.ds.clone(); let ds = self.ds.clone();
let stream = async_stream::stream! { let stream = async_stream::stream! {
let full_update_list = ds.get_grpc_node_list().await; let full_update_list = ds.get_grpc_node_list();
for update in full_update_list { for update in full_update_list {
yield Ok(update); yield Ok(update);
} }

@ -10,7 +10,6 @@ use actix_web::{
HttpServer, HttpServer,
Responder, Responder,
}; };
use rand::Rng;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use std::time::SystemTime; use std::time::SystemTime;
@ -57,7 +56,7 @@ pub struct NodesResp {
#[get("/nodes")] #[get("/nodes")]
async fn get_nodes(ds: web::Data<Arc<Store>>) -> HttpResponse { async fn get_nodes(ds: web::Data<Arc<Store>>) -> HttpResponse {
HttpResponse::Ok().json(ds.get_http_node_list().await) HttpResponse::Ok().json(ds.get_http_node_list())
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -66,7 +65,8 @@ struct MintReq {
} }
#[post("/mint")] #[post("/mint")]
async fn mint(_: web::Json<MintReq>) -> impl Responder { async fn mint(ds: web::Data<Arc<Store>>, _: web::Json<MintReq>) -> impl Responder {
ds.increase_mint_requests();
HttpResponse::Ok().json({}) HttpResponse::Ok().json({})
} }

@ -1,16 +1,30 @@
mod datastore; mod datastore;
mod grpc; mod grpc;
mod http_server; mod http_server;
use crate::datastore::LOCALHOST;
use crate::grpc::challenge::NodeUpdate;
use datastore::Store;
use solana_sdk::signature::keypair::Keypair; use solana_sdk::signature::keypair::Keypair;
use solana_sdk::signer::Signer;
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader}; use std::io::{BufRead, BufReader};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use tokio::time::{sleep, Duration};
use tokio::task::JoinSet; use tokio::task::JoinSet;
const INIT_NODES: &str = "detee_challenge_nodes"; const INIT_NODES: &str = "detee_challenge_nodes";
pub async fn localhost_cron(ds: Arc<Store>, tx: Sender<NodeUpdate>) {
loop {
sleep(Duration::from_secs(60)).await;
let _ = tx.send(ds.get_localhost().to_node_update(LOCALHOST));
ds.remove_inactive_nodes();
}
}
async fn get_keypair() -> Keypair { async fn get_keypair() -> Keypair {
let input = match File::open(INIT_NODES) { let input = match File::open(INIT_NODES) {
Ok(i) => i, Ok(i) => i,
@ -23,7 +37,13 @@ async fn get_keypair() -> Keypair {
let buffered = BufReader::new(input); let buffered = BufReader::new(input);
for line in buffered.lines() { for line in buffered.lines() {
match grpc::client::key_grabber(line.unwrap()).await { match grpc::client::key_grabber(line.unwrap()).await {
Ok(keypair) => return keypair, Ok(keypair) => {
println!(
"Got keypair from the network. Joining the network using wallet {}.",
keypair.pubkey()
);
return keypair;
}
Err(e) => { Err(e) => {
println!("Could not get keypair: {e:?}"); println!("Could not get keypair: {e:?}");
} }
@ -35,13 +55,14 @@ async fn get_keypair() -> Keypair {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let keypair = get_keypair().await; let keypair = get_keypair().await;
let ds = Arc::new(datastore::Store::init(keypair)); let ds = Arc::new(Store::init(keypair));
let (tx, mut _rx) = broadcast::channel(500); let (tx, mut _rx) = broadcast::channel(500);
let mut long_term_tasks = JoinSet::new(); let mut long_term_tasks = JoinSet::new();
let mut init_tasks = JoinSet::new(); let mut init_tasks = JoinSet::new();
long_term_tasks.spawn(localhost_cron(ds.clone(), tx.clone()));
long_term_tasks.spawn(http_server::init(ds.clone())); long_term_tasks.spawn(http_server::init(ds.clone()));
long_term_tasks.spawn(grpc::server::MyServer::init(ds.clone(), tx.clone()).start()); long_term_tasks.spawn(grpc::server::MyServer::init(ds.clone(), tx.clone()).start());