From cdb88f4b1619ac1723772d645a3d8b2dcc4c3256 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Wed, 18 Sep 2024 04:41:03 +0300 Subject: [PATCH] add code to start servers --- rewrite/src/datastore.rs | 19 +++++++++- rewrite/src/grpc/server.rs | 2 +- rewrite/src/http_server.rs | 62 +++++++++----------------------- rewrite/src/main.rs | 73 +++++++++++++++++++++++++++++++++----- 4 files changed, 100 insertions(+), 56 deletions(-) diff --git a/rewrite/src/datastore.rs b/rewrite/src/datastore.rs index 89eeea5..785a096 100644 --- a/rewrite/src/datastore.rs +++ b/rewrite/src/datastore.rs @@ -11,6 +11,7 @@ type IP = String; pub struct NodeInfo { pub started_at: SystemTime, pub keepalive: SystemTime, + pub mint_requests: u64, pub total_mints: u64, pub ratls_conns: u64, pub ratls_attacks: u64, @@ -72,7 +73,23 @@ impl Store { true } - pub async fn get_full_node_list(&self) -> Vec { + pub async fn get_http_node_list(&self) -> Vec { + self.nodes + .iter() + .map(|node| crate::http_server::NodesResp { + ip: node.key().to_string(), + joined_at: node.value().started_at, + last_keepalive: node.value().keepalive, + mints: node.value().total_mints, + ratls_connections: node.value().ratls_conns, + ratls_attacks: node.value().ratls_attacks, + public: node.value().public, + mint_requests: node.value().mint_requests, + }) + .collect() + } + + pub async fn get_grpc_node_list(&self) -> Vec { self.nodes .iter() .map(|node| NodeUpdate { diff --git a/rewrite/src/grpc/server.rs b/rewrite/src/grpc/server.rs index 3762c60..2234537 100644 --- a/rewrite/src/grpc/server.rs +++ b/rewrite/src/grpc/server.rs @@ -47,7 +47,7 @@ impl Update for MyServer { let ds = self.ds.clone(); let stream = async_stream::stream! { - let full_update_list = ds.get_full_node_list().await; + let full_update_list = ds.get_grpc_node_list().await; for update in full_update_list { yield Ok(update); } diff --git a/rewrite/src/http_server.rs b/rewrite/src/http_server.rs index 5d65490..4bb8656 100644 --- a/rewrite/src/http_server.rs +++ b/rewrite/src/http_server.rs @@ -1,4 +1,5 @@ #![allow(dead_code)] +use crate::datastore::Store; use actix_web::{ // http::StatusCode, error::ResponseError, Result, get, @@ -11,6 +12,8 @@ use actix_web::{ }; use rand::Rng; use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::time::SystemTime; // use std::{fmt, sync::Arc}; const HOMEPAGE: &str = r#"Welcome, beloved hacker! @@ -41,20 +44,20 @@ async fn homepage() -> impl Responder { } #[derive(Serialize)] -struct NodesResp { - ip: String, - last_keepalive: u64, - online: bool, - public: bool, - mints: u64, - mint_requests: u64, - ratls_attacks: u64, +pub struct NodesResp { + pub ip: String, + pub joined_at: SystemTime, + pub last_keepalive: SystemTime, + pub mint_requests: u64, + pub ratls_attacks: u64, + pub ratls_connections: u64, + pub public: bool, + pub mints: u64, } #[get("/nodes")] -async fn get_nodes() -> HttpResponse { - let mock_nodes = generate_mock_data(); - HttpResponse::Ok().json(mock_nodes) +async fn get_nodes(ds: web::Data>) -> HttpResponse { + HttpResponse::Ok().json(ds.get_http_node_list().await) } #[derive(Deserialize)] @@ -67,11 +70,10 @@ async fn mint(_: web::Json) -> impl Responder { HttpResponse::Ok().json({}) } -// TODO: init(ds: Arc) -pub async fn init() { +pub async fn init(ds: Arc) { HttpServer::new(move || { App::new() - // .app_data(web::Data::new(ds.clone())) + .app_data(web::Data::new(ds.clone())) .service(homepage) .service(get_nodes) .service(mint) @@ -83,35 +85,3 @@ pub async fn init() { .unwrap(); } -fn generate_mock_data() -> Vec { - let mut rng = rand::thread_rng(); - let mut nodes = Vec::new(); - - for _ in 0..15 { - let ip = format!( - "{}.{}.{}.{}", - rng.gen_range(1..=255), - rng.gen_range(1..=255), - rng.gen_range(1..=255), - rng.gen_range(1..=255) - ); - let last_keepalive = rng.gen_range(1_000_000..10_000_000); - let online = rng.gen_bool(0.5); - let public = rng.gen_bool(0.5); - let mints = rng.gen_range(0..500); - let mint_requests = rng.gen_range(0..1000); - let ratls_attacks = rng.gen_range(0..10); - - nodes.push(NodesResp { - ip, - last_keepalive, - online, - public, - mints, - mint_requests, - ratls_attacks, - }); - } - - nodes -} diff --git a/rewrite/src/main.rs b/rewrite/src/main.rs index 6f18b16..e958b10 100644 --- a/rewrite/src/main.rs +++ b/rewrite/src/main.rs @@ -1,16 +1,73 @@ -mod grpc; mod datastore; +mod grpc; mod http_server; +use solana_sdk::signature::keypair::Keypair; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::sync::Arc; +use tokio::sync::broadcast; use tokio::task::JoinSet; -#[tokio::main] -async fn main() { - let mut long_term_tasks = JoinSet::new(); +const INIT_NODES: &str = "detee_challenge_nodes"; - long_term_tasks.spawn(http_server::init()); - - long_term_tasks.join_next().await; - println!("Shutting down..."); +async fn get_keypair() -> Keypair { + let input = match File::open(INIT_NODES) { + Ok(i) => i, + Err(_) => { + println!("Could not find remote nodes in the file {INIT_NODES}"); + println!("Starting a new network with a new key..."); + return Keypair::new(); + } + }; + let buffered = BufReader::new(input); + for line in buffered.lines() { + match grpc::client::key_grabber(line.unwrap()).await { + Ok(keypair) => return keypair, + Err(e) => { + println!("Could not get keypair: {e:?}"); + } + }; + } + panic!("could not get keypair."); } +#[tokio::main] +async fn main() { + let keypair = get_keypair().await; + let ds = Arc::new(datastore::Store::init(keypair)); + + let (tx, mut _rx) = broadcast::channel(500); + + let mut long_term_tasks = JoinSet::new(); + let mut init_tasks = JoinSet::new(); + + long_term_tasks.spawn(http_server::init(ds.clone())); + long_term_tasks.spawn(grpc::server::MyServer::init(ds.clone(), tx.clone()).start()); + + let input = File::open(INIT_NODES).unwrap(); + let buffered = BufReader::new(input); + for line in buffered.lines() { + init_tasks.spawn( + grpc::client::ConnManager::init(ds.clone(), tx.clone()).start_with_node(line.unwrap()), + ); + } + + let mut connection_count = 0; + while init_tasks.join_next().await.is_some() { + if connection_count < 3 { + long_term_tasks.spawn(grpc::client::ConnManager::init(ds.clone(), tx.clone()).start()); + connection_count += 1; + } + } + + while connection_count < 3 { + long_term_tasks.spawn(grpc::client::ConnManager::init(ds.clone(), tx.clone()).start()); + connection_count += 1; + } + + // exit no matter which task finished + long_term_tasks.join_next().await; + + println!("Shutting down..."); +}