add code to start servers

This commit is contained in:
ghe0 2024-09-18 04:41:03 +03:00
parent e2c77841de
commit cdb88f4b16
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
4 changed files with 100 additions and 56 deletions

@ -11,6 +11,7 @@ type IP = String;
pub struct NodeInfo {
pub started_at: SystemTime,
pub keepalive: SystemTime,
pub mint_requests: u64,
pub total_mints: u64,
pub ratls_conns: u64,
pub ratls_attacks: u64,
@ -72,7 +73,23 @@ impl Store {
true
}
pub async fn get_full_node_list(&self) -> Vec<NodeUpdate> {
pub async fn get_http_node_list(&self) -> Vec<crate::http_server::NodesResp> {
self.nodes
.iter()
.map(|node| crate::http_server::NodesResp {
ip: node.key().to_string(),
joined_at: node.value().started_at,
last_keepalive: node.value().keepalive,
mints: node.value().total_mints,
ratls_connections: node.value().ratls_conns,
ratls_attacks: node.value().ratls_attacks,
public: node.value().public,
mint_requests: node.value().mint_requests,
})
.collect()
}
pub async fn get_grpc_node_list(&self) -> Vec<NodeUpdate> {
self.nodes
.iter()
.map(|node| NodeUpdate {

@ -47,7 +47,7 @@ impl Update for MyServer {
let ds = self.ds.clone();
let stream = async_stream::stream! {
let full_update_list = ds.get_full_node_list().await;
let full_update_list = ds.get_grpc_node_list().await;
for update in full_update_list {
yield Ok(update);
}

@ -1,4 +1,5 @@
#![allow(dead_code)]
use crate::datastore::Store;
use actix_web::{
// http::StatusCode, error::ResponseError, Result,
get,
@ -11,6 +12,8 @@ use actix_web::{
};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::SystemTime;
// use std::{fmt, sync::Arc};
const HOMEPAGE: &str = r#"Welcome, beloved hacker!
@ -41,20 +44,20 @@ async fn homepage() -> impl Responder {
}
#[derive(Serialize)]
struct NodesResp {
ip: String,
last_keepalive: u64,
online: bool,
public: bool,
mints: u64,
mint_requests: u64,
ratls_attacks: u64,
pub struct NodesResp {
pub ip: String,
pub joined_at: SystemTime,
pub last_keepalive: SystemTime,
pub mint_requests: u64,
pub ratls_attacks: u64,
pub ratls_connections: u64,
pub public: bool,
pub mints: u64,
}
#[get("/nodes")]
async fn get_nodes() -> HttpResponse {
let mock_nodes = generate_mock_data();
HttpResponse::Ok().json(mock_nodes)
async fn get_nodes(ds: web::Data<Arc<Store>>) -> HttpResponse {
HttpResponse::Ok().json(ds.get_http_node_list().await)
}
#[derive(Deserialize)]
@ -67,11 +70,10 @@ async fn mint(_: web::Json<MintReq>) -> impl Responder {
HttpResponse::Ok().json({})
}
// TODO: init(ds: Arc<Store>)
pub async fn init() {
pub async fn init(ds: Arc<Store>) {
HttpServer::new(move || {
App::new()
// .app_data(web::Data::new(ds.clone()))
.app_data(web::Data::new(ds.clone()))
.service(homepage)
.service(get_nodes)
.service(mint)
@ -83,35 +85,3 @@ pub async fn init() {
.unwrap();
}
fn generate_mock_data() -> Vec<NodesResp> {
let mut rng = rand::thread_rng();
let mut nodes = Vec::new();
for _ in 0..15 {
let ip = format!(
"{}.{}.{}.{}",
rng.gen_range(1..=255),
rng.gen_range(1..=255),
rng.gen_range(1..=255),
rng.gen_range(1..=255)
);
let last_keepalive = rng.gen_range(1_000_000..10_000_000);
let online = rng.gen_bool(0.5);
let public = rng.gen_bool(0.5);
let mints = rng.gen_range(0..500);
let mint_requests = rng.gen_range(0..1000);
let ratls_attacks = rng.gen_range(0..10);
nodes.push(NodesResp {
ip,
last_keepalive,
online,
public,
mints,
mint_requests,
ratls_attacks,
});
}
nodes
}

@ -1,16 +1,73 @@
mod grpc;
mod datastore;
mod grpc;
mod http_server;
use solana_sdk::signature::keypair::Keypair;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::task::JoinSet;
#[tokio::main]
async fn main() {
let mut long_term_tasks = JoinSet::new();
const INIT_NODES: &str = "detee_challenge_nodes";
long_term_tasks.spawn(http_server::init());
long_term_tasks.join_next().await;
println!("Shutting down...");
async fn get_keypair() -> Keypair {
let input = match File::open(INIT_NODES) {
Ok(i) => i,
Err(_) => {
println!("Could not find remote nodes in the file {INIT_NODES}");
println!("Starting a new network with a new key...");
return Keypair::new();
}
};
let buffered = BufReader::new(input);
for line in buffered.lines() {
match grpc::client::key_grabber(line.unwrap()).await {
Ok(keypair) => return keypair,
Err(e) => {
println!("Could not get keypair: {e:?}");
}
};
}
panic!("could not get keypair.");
}
#[tokio::main]
async fn main() {
let keypair = get_keypair().await;
let ds = Arc::new(datastore::Store::init(keypair));
let (tx, mut _rx) = broadcast::channel(500);
let mut long_term_tasks = JoinSet::new();
let mut init_tasks = JoinSet::new();
long_term_tasks.spawn(http_server::init(ds.clone()));
long_term_tasks.spawn(grpc::server::MyServer::init(ds.clone(), tx.clone()).start());
let input = File::open(INIT_NODES).unwrap();
let buffered = BufReader::new(input);
for line in buffered.lines() {
init_tasks.spawn(
grpc::client::ConnManager::init(ds.clone(), tx.clone()).start_with_node(line.unwrap()),
);
}
let mut connection_count = 0;
while init_tasks.join_next().await.is_some() {
if connection_count < 3 {
long_term_tasks.spawn(grpc::client::ConnManager::init(ds.clone(), tx.clone()).start());
connection_count += 1;
}
}
while connection_count < 3 {
long_term_tasks.spawn(grpc::client::ConnManager::init(ds.clone(), tx.clone()).start());
connection_count += 1;
}
// exit no matter which task finished
long_term_tasks.join_next().await;
println!("Shutting down...");
}