From b21a9e0a102dbe4901a2ecc7f918130db3fe5920 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Mon, 19 Aug 2024 00:17:20 +0300 Subject: [PATCH] IT COMPILES! GOD EXISTS! IT COMPILES! --- Cargo.lock | 2 + Cargo.toml | 2 + proto/challenge.proto | 4 +- src/datastore.rs | 2 +- src/grpc.rs | 145 ++++++++++++++++++++---------------------- src/main.rs | 1 + 6 files changed, 77 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a18d3a6..194b57a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -680,6 +680,7 @@ name = "hacker-challenge" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "ed25519-dalek", "futures", "hex", @@ -690,6 +691,7 @@ dependencies = [ "salvo", "tabled", "tokio", + "tokio-stream", "tonic", "tonic-build", ] diff --git a/Cargo.toml b/Cargo.toml index 2d8f468..53bdb21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] anyhow = "1.0.86" +async-stream = "0.3.5" ed25519-dalek = { version = "2.1.1", features = ["rand_core", "serde"] } futures = "0.3.30" hex = "0.4.3" @@ -15,6 +16,7 @@ rand = "0.8.5" salvo = { version = "0.70.0", features = ["affix"] } tabled = "0.16.0" tokio = { version = "1.39.2", features = ["macros"] } +tokio-stream = { version = "0.1.15" } tonic = "0.12.1" [build-dependencies] diff --git a/proto/challenge.proto b/proto/challenge.proto index e2dd74f..326eac3 100644 --- a/proto/challenge.proto +++ b/proto/challenge.proto @@ -10,6 +10,6 @@ message NodeUpdate { bool online = 4; } -service KeyDistribution { - rpc UpdateStreaming (stream NodeUpdate) returns (stream NodeUpdate); +service Update { + rpc GetUpdates (stream NodeUpdate) returns (stream NodeUpdate); } diff --git a/src/datastore.rs b/src/datastore.rs index 5ce1144..500765c 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -62,7 +62,7 @@ impl Store { } } - pub async fn add_mock_node(&self, ip: String) { + pub async fn add_mock_node(&self, ip: IP) { let mut csprng = OsRng; let privkey = ed25519_dalek::SigningKey::generate(&mut csprng); self.update_node( diff --git a/src/grpc.rs b/src/grpc.rs index 857b750..52d35ed 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,90 +1,83 @@ -use crate::database; -use crate::database::NodeInfo; -use challenge::key_distribution_server::{KeyDistribution, KeyDistributionServer}; -use challenge::{RemoveNodeReq, UpdateKeyReq, UpdateNodeReq}; -use ed25519_dalek::SigningKey; -use prost_types::Timestamp; -use rand::rngs::OsRng; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tonic::{transport::Server, Request, Response, Status}; +#![allow(dead_code)] + +use crate::datastore::Store; +use challenge::NodeUpdate; +use std::pin::Pin; +use std::sync::Arc; +use std::thread::JoinHandle; +use tokio::sync::broadcast::Sender; +use tokio_stream::{Stream, StreamExt}; +use tonic::{transport::Server, Request, Response, Status, Streaming}; pub mod challenge { tonic::include_proto!("challenge"); } -#[derive(Debug, Default)] -pub struct MyKeyDistribution {} +pub struct MyServer { + ds: Arc, + tx: Sender, +} -fn update_db(ip: String, privkey: String, updated_at: Option) { - let key_bytes = hex::decode(privkey).unwrap(); - let privkey = SigningKey::from_bytes(&key_bytes.as_slice().try_into().unwrap()); - let pubkey = privkey.verifying_key(); - let updated_at: std::time::SystemTime = match updated_at { - Some(ts) => { - let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); - UNIX_EPOCH - .checked_add(duration) - .unwrap_or(SystemTime::now()) - } - None => SystemTime::now(), - }; - database::add_node(ip.to_string(), NodeInfo { pubkey, updated_at }); +impl MyServer { + fn init(ds: Arc, tx: Sender) -> Self { + Self { ds, tx } + } - database::add_key(pubkey, privkey); + fn start(self) -> JoinHandle<()> { + std::thread::spawn(|| { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let addr = "0.0.0.0:31373".parse().unwrap(); + if let Err(e) = Server::builder() + .add_service(challenge::update_server::UpdateServer::new(self)) + .serve(addr) + .await + { + println!("gRPC server failed: {e:?}"); + }; + }); + }) + } } #[tonic::async_trait] -impl KeyDistribution for MyKeyDistribution { - async fn update_key(&self, request: Request) -> Result, Status> { - let ip = request.remote_addr().unwrap().ip(); - let req = request.into_inner(); - update_db(ip.to_string(), req.keypair, req.updated_at); - Ok(Response::new(())) - } +impl challenge::update_server::Update for MyServer { + type GetUpdatesStream = Pin> + Send>>; - async fn update_node(&self, request: Request) -> Result, Status> { - let req = request.into_inner(); - update_db(req.ip, req.keypair, req.updated_at); - Ok(Response::new(())) - } + async fn get_updates( + &self, + req: Request>, + ) -> Result, Status> { + let remote_ip = req.remote_addr().unwrap().ip().to_string(); + let tx = self.tx.clone(); + let mut rx = self.tx.subscribe(); + let mut inbound = req.into_inner(); - async fn remove_node(&self, _request: Request) -> Result, Status> { - // Handle RemoveNode request - Ok(Response::new(())) + let stream = async_stream::stream! { + loop { + tokio::select! { + Some(msg) = inbound.next() => { + match msg { + Ok(update) => { + if let Err(_) = tx.send(update.clone()) { + yield Err(Status::internal("Failed to broadcast update")); + } + } + Err(e) => { + yield Err(Status::internal(format!("Error receiving client stream: {}", e))); + break; + } + } + } + Ok(update) = rx.recv() => { + if update.ip != remote_ip { + yield Ok(update); + } + } + + } + } + }; + + Ok(Response::new(Box::pin(stream) as Self::GetUpdatesStream)) } } - -async fn start_client() { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - } -} - -async fn start_server() { - let addr = "0.0.0.0:31373".parse().unwrap(); - let key_distribution = MyKeyDistribution::default(); - Server::builder() - .add_service(KeyDistributionServer::new(key_distribution)) - .serve(addr) - .await - .unwrap(); -} - -pub async fn start() { - let start_client = tokio::task::spawn(start_server()); - let start_server = tokio::task::spawn(start_server()); - futures::future::select(start_client, start_server).await; -} - -pub fn add_node(ip: String) { - let mut csprng = OsRng; - let privkey = ed25519_dalek::SigningKey::generate(&mut csprng); - database::add_node( - ip, - NodeInfo { - pubkey: privkey.verifying_key(), - updated_at: std::time::SystemTime::now(), - }, - ); - database::add_key(privkey.verifying_key(), privkey); -} diff --git a/src/main.rs b/src/main.rs index ace520d..15de3b4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +mod grpc; mod datastore; mod http_server; use crate::datastore::Store;