From 0a711b7917f66ff7d7fcdb170b6ec31ddb3c855c Mon Sep 17 00:00:00 2001 From: ghe0 Date: Mon, 19 Aug 2024 21:54:14 +0300 Subject: [PATCH] changed startup procedure to prod specification --- .gitignore | 1 + Cargo.lock | 31 ------------------------------- Cargo.toml | 3 --- src/grpc/client.rs | 46 +++++++++++++++++++++++++++------------------- src/grpc/mod.rs | 4 ++-- src/grpc/server.rs | 25 ++++++++++--------------- src/http_server.rs | 22 +++++++++------------- src/main.rs | 24 ++++++++++++++++++------ 8 files changed, 67 insertions(+), 89 deletions(-) diff --git a/.gitignore b/.gitignore index ea8c4bf..34022ef 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +detee_challenge_nodes diff --git a/Cargo.lock b/Cargo.lock index 0f4be4c..e05a48d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -528,21 +528,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "futures" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.30" @@ -550,7 +535,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", - "futures-sink", ] [[package]] @@ -559,17 +543,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" -[[package]] -name = "futures-executor" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - [[package]] name = "futures-io" version = "0.3.30" @@ -605,7 +578,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ - "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -679,12 +651,9 @@ dependencies = [ name = "hacker-challenge" version = "0.1.0" dependencies = [ - "anyhow", "async-stream", "ed25519-dalek", - "futures", "hex", - "once_cell", "prost", "prost-types", "rand", diff --git a/Cargo.toml b/Cargo.toml index f202878..ea0015d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,12 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] -anyhow = "1.0.86" async-stream = "0.3.5" ed25519-dalek = { version = "2.1.1", features = ["rand_core", "serde"] } -futures = "0.3.30" hex = "0.4.3" -once_cell = "1.19.0" prost = "0.13.1" prost-types = "0.13.1" rand = "0.8.5" diff --git a/src/grpc/client.rs b/src/grpc/client.rs index ea1796e..205c661 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -2,8 +2,9 @@ use super::challenge::NodeUpdate; use crate::datastore::Store; use crate::grpc::challenge::update_client::UpdateClient; +use std::fs::File; +use std::io::{BufRead, BufReader}; use std::sync::Arc; -use std::thread::JoinHandle; use tokio::sync::broadcast::Sender; use tokio::task::JoinSet; use tokio_stream::wrappers::BroadcastStream; @@ -36,7 +37,6 @@ impl ConnManager { let mut resp_stream = response.into_inner(); while let Some(mut update) = resp_stream.message().await.unwrap() { - // "localhost" IPs need to be changed to the real IP of the counterpart if update.ip == "localhost" { update.ip = node_ip.clone(); @@ -55,21 +55,29 @@ impl ConnManager { } } -fn init_connections(ds: Arc, tx: Sender) -> JoinHandle<()> { - std::thread::spawn(move || { - tokio::runtime::Runtime::new().unwrap().block_on(async { - // we rotate online and offline nodes, to constantly check new nodes - let mut only_online_nodes = true; - loop { - let mut set = JoinSet::new(); - let nodes = ds.get_random_nodes(only_online_nodes).await; - for node in nodes { - let conn = ConnManager::init(ds.clone(), tx.clone()); - set.spawn(conn.connect(node)); - } - while let Some(_) = set.join_next().await {} - only_online_nodes = !only_online_nodes; - } - }); - }) +// this must panic on failure; app can't start without init nodes +fn load_init_nodes(path: &str) -> Vec { + let input = File::open(path).unwrap(); + let buffered = BufReader::new(input); + let mut ips = Vec::new(); + for line in buffered.lines() { + ips.push(line.unwrap()); + } + ips +} + +pub async fn init_connections(ds: Arc, tx: Sender) { + let mut nodes = load_init_nodes("detee_challenge_nodes"); + // we rotate online and offline nodes, to constantly check new nodes + let mut only_online_nodes = true; + loop { + let mut set = JoinSet::new(); + for node in nodes { + let conn = ConnManager::init(ds.clone(), tx.clone()); + set.spawn(conn.connect(node)); + } + while let Some(_) = set.join_next().await {} + nodes = ds.get_random_nodes(only_online_nodes).await; + only_online_nodes = !only_online_nodes; + } } diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs index 65e5cbc..15cc0bb 100644 --- a/src/grpc/mod.rs +++ b/src/grpc/mod.rs @@ -1,5 +1,5 @@ -mod server; -mod client; +pub mod server; +pub mod client; pub mod challenge { tonic::include_proto!("challenge"); diff --git a/src/grpc/server.rs b/src/grpc/server.rs index f9dcdff..8d0c486 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -6,7 +6,6 @@ use super::challenge::NodeUpdate; use crate::datastore::Store; use std::pin::Pin; use std::sync::Arc; -use std::thread::JoinHandle; use tokio::sync::broadcast::Sender; use tokio_stream::{Stream, StreamExt}; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -17,23 +16,19 @@ pub struct MyServer { } impl MyServer { - fn init(ds: Arc, tx: Sender) -> Self { + pub fn init(ds: Arc, tx: Sender) -> Self { Self { ds, tx } } - fn start(self) -> JoinHandle<()> { - std::thread::spawn(|| { - tokio::runtime::Runtime::new().unwrap().block_on(async { - let addr = "0.0.0.0:31373".parse().unwrap(); - if let Err(e) = Server::builder() - .add_service(UpdateServer::new(self)) - .serve(addr) - .await - { - println!("gRPC server failed: {e:?}"); - }; - }); - }) + pub async fn start(self) { + let addr = "0.0.0.0:31373".parse().unwrap(); + if let Err(e) = Server::builder() + .add_service(UpdateServer::new(self)) + .serve(addr) + .await + { + println!("gRPC server failed: {e:?}"); + }; } } diff --git a/src/http_server.rs b/src/http_server.rs index 0172683..36eece5 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -1,8 +1,8 @@ use crate::datastore::Store; use std::sync::Arc; -use salvo::prelude::*; use salvo::affix; +use salvo::prelude::*; #[handler] async fn homepage(depot: &mut Depot) -> String { @@ -29,16 +29,12 @@ async fn sign(req: &mut Request, depot: &mut Depot) -> String { } } -pub fn init(ds: Arc) -> std::thread::JoinHandle<()> { - std::thread::spawn(|| { - tokio::runtime::Runtime::new().unwrap().block_on(async { - let acceptor = TcpListener::new("0.0.0.0:31372").bind().await; - let router = Router::new() - .hoop(affix::inject(ds)) - .get(homepage) - .push(Router::with_path("sign").get(sign)); - println!("{:?}", router); - Server::new(acceptor).serve(router).await; - }); - }) +pub async fn init(ds: Arc) { + let acceptor = TcpListener::new("0.0.0.0:31372").bind().await; + let router = Router::new() + .hoop(affix::inject(ds)) + .get(homepage) + .push(Router::with_path("sign").get(sign)); + println!("{:?}", router); + Server::new(acceptor).serve(router).await; } diff --git a/src/main.rs b/src/main.rs index 15de3b4..e8527db 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,27 @@ -mod grpc; mod datastore; +use tokio::task::JoinSet; +mod grpc; mod http_server; use crate::datastore::Store; use std::sync::Arc; +use tokio::sync::broadcast; #[tokio::main] async fn main() { let ds: Arc = Arc::new(Store::init()); - ds.add_mock_node("1.1.1.1".to_string()).await; - ds.add_mock_node("1.2.3.4".to_string()).await; - ds.add_mock_node("1.2.2.2".to_string()).await; - let thread_result = crate::http_server::init(ds).join(); - println!("{thread_result:?}"); + let (tx, mut _rx) = broadcast::channel(500); + + // ds.add_mock_node("1.1.1.1".to_string()).await; + // ds.add_mock_node("1.2.3.4".to_string()).await; + // ds.add_mock_node("1.2.2.2".to_string()).await; + + let mut join_set = JoinSet::new(); + + join_set.spawn(http_server::init(ds.clone())); + join_set.spawn(grpc::server::MyServer::init(ds.clone(), tx.clone()).start()); + join_set.spawn(grpc::client::init_connections(ds.clone(), tx.clone())); + + // exit no matter which task finished + join_set.join_next().await; + println!("Shutting down..."); }