changed startup procedure to prod specification

This commit is contained in:
ghe0 2024-08-19 21:54:14 +03:00
parent d47f71c941
commit 0a711b7917
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
8 changed files with 67 additions and 89 deletions

1
.gitignore vendored

@ -1 +1,2 @@
/target
detee_challenge_nodes

31
Cargo.lock generated

@ -528,21 +528,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
@ -550,7 +535,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -559,17 +543,6 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
@ -605,7 +578,6 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
@ -679,12 +651,9 @@ dependencies = [
name = "hacker-challenge"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"ed25519-dalek",
"futures",
"hex",
"once_cell",
"prost",
"prost-types",
"rand",

@ -4,12 +4,9 @@ version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.86"
async-stream = "0.3.5"
ed25519-dalek = { version = "2.1.1", features = ["rand_core", "serde"] }
futures = "0.3.30"
hex = "0.4.3"
once_cell = "1.19.0"
prost = "0.13.1"
prost-types = "0.13.1"
rand = "0.8.5"

@ -2,8 +2,9 @@
use super::challenge::NodeUpdate;
use crate::datastore::Store;
use crate::grpc::challenge::update_client::UpdateClient;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::sync::Arc;
use std::thread::JoinHandle;
use tokio::sync::broadcast::Sender;
use tokio::task::JoinSet;
use tokio_stream::wrappers::BroadcastStream;
@ -36,7 +37,6 @@ impl ConnManager {
let mut resp_stream = response.into_inner();
while let Some(mut update) = resp_stream.message().await.unwrap() {
// "localhost" IPs need to be changed to the real IP of the counterpart
if update.ip == "localhost" {
update.ip = node_ip.clone();
@ -55,21 +55,29 @@ impl ConnManager {
}
}
fn init_connections(ds: Arc<Store>, tx: Sender<NodeUpdate>) -> JoinHandle<()> {
std::thread::spawn(move || {
tokio::runtime::Runtime::new().unwrap().block_on(async {
// we rotate online and offline nodes, to constantly check new nodes
let mut only_online_nodes = true;
loop {
let mut set = JoinSet::new();
let nodes = ds.get_random_nodes(only_online_nodes).await;
for node in nodes {
let conn = ConnManager::init(ds.clone(), tx.clone());
set.spawn(conn.connect(node));
}
while let Some(_) = set.join_next().await {}
only_online_nodes = !only_online_nodes;
}
});
})
// this must panic on failure; app can't start without init nodes
fn load_init_nodes(path: &str) -> Vec<String> {
let input = File::open(path).unwrap();
let buffered = BufReader::new(input);
let mut ips = Vec::new();
for line in buffered.lines() {
ips.push(line.unwrap());
}
ips
}
pub async fn init_connections(ds: Arc<Store>, tx: Sender<NodeUpdate>) {
let mut nodes = load_init_nodes("detee_challenge_nodes");
// we rotate online and offline nodes, to constantly check new nodes
let mut only_online_nodes = true;
loop {
let mut set = JoinSet::new();
for node in nodes {
let conn = ConnManager::init(ds.clone(), tx.clone());
set.spawn(conn.connect(node));
}
while let Some(_) = set.join_next().await {}
nodes = ds.get_random_nodes(only_online_nodes).await;
only_online_nodes = !only_online_nodes;
}
}

@ -1,5 +1,5 @@
mod server;
mod client;
pub mod server;
pub mod client;
pub mod challenge {
tonic::include_proto!("challenge");

@ -6,7 +6,6 @@ use super::challenge::NodeUpdate;
use crate::datastore::Store;
use std::pin::Pin;
use std::sync::Arc;
use std::thread::JoinHandle;
use tokio::sync::broadcast::Sender;
use tokio_stream::{Stream, StreamExt};
use tonic::{transport::Server, Request, Response, Status, Streaming};
@ -17,23 +16,19 @@ pub struct MyServer {
}
impl MyServer {
fn init(ds: Arc<Store>, tx: Sender<NodeUpdate>) -> Self {
pub fn init(ds: Arc<Store>, tx: Sender<NodeUpdate>) -> Self {
Self { ds, tx }
}
fn start(self) -> JoinHandle<()> {
std::thread::spawn(|| {
tokio::runtime::Runtime::new().unwrap().block_on(async {
let addr = "0.0.0.0:31373".parse().unwrap();
if let Err(e) = Server::builder()
.add_service(UpdateServer::new(self))
.serve(addr)
.await
{
println!("gRPC server failed: {e:?}");
};
});
})
pub async fn start(self) {
let addr = "0.0.0.0:31373".parse().unwrap();
if let Err(e) = Server::builder()
.add_service(UpdateServer::new(self))
.serve(addr)
.await
{
println!("gRPC server failed: {e:?}");
};
}
}

@ -1,8 +1,8 @@
use crate::datastore::Store;
use std::sync::Arc;
use salvo::prelude::*;
use salvo::affix;
use salvo::prelude::*;
#[handler]
async fn homepage(depot: &mut Depot) -> String {
@ -29,16 +29,12 @@ async fn sign(req: &mut Request, depot: &mut Depot) -> String {
}
}
pub fn init(ds: Arc<Store>) -> std::thread::JoinHandle<()> {
std::thread::spawn(|| {
tokio::runtime::Runtime::new().unwrap().block_on(async {
let acceptor = TcpListener::new("0.0.0.0:31372").bind().await;
let router = Router::new()
.hoop(affix::inject(ds))
.get(homepage)
.push(Router::with_path("sign").get(sign));
println!("{:?}", router);
Server::new(acceptor).serve(router).await;
});
})
pub async fn init(ds: Arc<Store>) {
let acceptor = TcpListener::new("0.0.0.0:31372").bind().await;
let router = Router::new()
.hoop(affix::inject(ds))
.get(homepage)
.push(Router::with_path("sign").get(sign));
println!("{:?}", router);
Server::new(acceptor).serve(router).await;
}

@ -1,15 +1,27 @@
mod grpc;
mod datastore;
use tokio::task::JoinSet;
mod grpc;
mod http_server;
use crate::datastore::Store;
use std::sync::Arc;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let ds: Arc<Store> = Arc::new(Store::init());
ds.add_mock_node("1.1.1.1".to_string()).await;
ds.add_mock_node("1.2.3.4".to_string()).await;
ds.add_mock_node("1.2.2.2".to_string()).await;
let thread_result = crate::http_server::init(ds).join();
println!("{thread_result:?}");
let (tx, mut _rx) = broadcast::channel(500);
// ds.add_mock_node("1.1.1.1".to_string()).await;
// ds.add_mock_node("1.2.3.4".to_string()).await;
// ds.add_mock_node("1.2.2.2".to_string()).await;
let mut join_set = JoinSet::new();
join_set.spawn(http_server::init(ds.clone()));
join_set.spawn(grpc::server::MyServer::init(ds.clone(), tx.clone()).start());
join_set.spawn(grpc::client::init_connections(ds.clone(), tx.clone()));
// exit no matter which task finished
join_set.join_next().await;
println!("Shutting down...");
}