IT COMPILES! GOD EXISTS! IT COMPILES!

This commit is contained in:
ghe0 2024-08-19 00:17:20 +03:00
parent 21f7a9f971
commit b21a9e0a10
6 changed files with 77 additions and 79 deletions

2
Cargo.lock generated

@ -680,6 +680,7 @@ name = "hacker-challenge"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream",
"ed25519-dalek", "ed25519-dalek",
"futures", "futures",
"hex", "hex",
@ -690,6 +691,7 @@ dependencies = [
"salvo", "salvo",
"tabled", "tabled",
"tokio", "tokio",
"tokio-stream",
"tonic", "tonic",
"tonic-build", "tonic-build",
] ]

@ -5,6 +5,7 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.86" anyhow = "1.0.86"
async-stream = "0.3.5"
ed25519-dalek = { version = "2.1.1", features = ["rand_core", "serde"] } ed25519-dalek = { version = "2.1.1", features = ["rand_core", "serde"] }
futures = "0.3.30" futures = "0.3.30"
hex = "0.4.3" hex = "0.4.3"
@ -15,6 +16,7 @@ rand = "0.8.5"
salvo = { version = "0.70.0", features = ["affix"] } salvo = { version = "0.70.0", features = ["affix"] }
tabled = "0.16.0" tabled = "0.16.0"
tokio = { version = "1.39.2", features = ["macros"] } tokio = { version = "1.39.2", features = ["macros"] }
tokio-stream = { version = "0.1.15" }
tonic = "0.12.1" tonic = "0.12.1"
[build-dependencies] [build-dependencies]

@ -10,6 +10,6 @@ message NodeUpdate {
bool online = 4; bool online = 4;
} }
service KeyDistribution { service Update {
rpc UpdateStreaming (stream NodeUpdate) returns (stream NodeUpdate); rpc GetUpdates (stream NodeUpdate) returns (stream NodeUpdate);
} }

@ -62,7 +62,7 @@ impl Store {
} }
} }
pub async fn add_mock_node(&self, ip: String) { pub async fn add_mock_node(&self, ip: IP) {
let mut csprng = OsRng; let mut csprng = OsRng;
let privkey = ed25519_dalek::SigningKey::generate(&mut csprng); let privkey = ed25519_dalek::SigningKey::generate(&mut csprng);
self.update_node( self.update_node(

@ -1,90 +1,83 @@
use crate::database; #![allow(dead_code)]
use crate::database::NodeInfo;
use challenge::key_distribution_server::{KeyDistribution, KeyDistributionServer}; use crate::datastore::Store;
use challenge::{RemoveNodeReq, UpdateKeyReq, UpdateNodeReq}; use challenge::NodeUpdate;
use ed25519_dalek::SigningKey; use std::pin::Pin;
use prost_types::Timestamp; use std::sync::Arc;
use rand::rngs::OsRng; use std::thread::JoinHandle;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::broadcast::Sender;
use tonic::{transport::Server, Request, Response, Status}; use tokio_stream::{Stream, StreamExt};
use tonic::{transport::Server, Request, Response, Status, Streaming};
pub mod challenge { pub mod challenge {
tonic::include_proto!("challenge"); tonic::include_proto!("challenge");
} }
#[derive(Debug, Default)] pub struct MyServer {
pub struct MyKeyDistribution {} ds: Arc<Store>,
tx: Sender<NodeUpdate>,
}
fn update_db(ip: String, privkey: String, updated_at: Option<Timestamp>) { impl MyServer {
let key_bytes = hex::decode(privkey).unwrap(); fn init(ds: Arc<Store>, tx: Sender<NodeUpdate>) -> Self {
let privkey = SigningKey::from_bytes(&key_bytes.as_slice().try_into().unwrap()); Self { ds, tx }
let pubkey = privkey.verifying_key(); }
let updated_at: std::time::SystemTime = match updated_at {
Some(ts) => {
let duration = Duration::new(ts.seconds as u64, ts.nanos as u32);
UNIX_EPOCH
.checked_add(duration)
.unwrap_or(SystemTime::now())
}
None => SystemTime::now(),
};
database::add_node(ip.to_string(), NodeInfo { pubkey, updated_at });
database::add_key(pubkey, privkey); fn start(self) -> JoinHandle<()> {
std::thread::spawn(|| {
tokio::runtime::Runtime::new().unwrap().block_on(async {
let addr = "0.0.0.0:31373".parse().unwrap();
if let Err(e) = Server::builder()
.add_service(challenge::update_server::UpdateServer::new(self))
.serve(addr)
.await
{
println!("gRPC server failed: {e:?}");
};
});
})
}
} }
#[tonic::async_trait] #[tonic::async_trait]
impl KeyDistribution for MyKeyDistribution { impl challenge::update_server::Update for MyServer {
async fn update_key(&self, request: Request<UpdateKeyReq>) -> Result<Response<()>, Status> { type GetUpdatesStream = Pin<Box<dyn Stream<Item = Result<NodeUpdate, Status>> + Send>>;
let ip = request.remote_addr().unwrap().ip();
let req = request.into_inner();
update_db(ip.to_string(), req.keypair, req.updated_at);
Ok(Response::new(()))
}
async fn update_node(&self, request: Request<UpdateNodeReq>) -> Result<Response<()>, Status> { async fn get_updates(
let req = request.into_inner(); &self,
update_db(req.ip, req.keypair, req.updated_at); req: Request<Streaming<NodeUpdate>>,
Ok(Response::new(())) ) -> Result<Response<Self::GetUpdatesStream>, Status> {
} let remote_ip = req.remote_addr().unwrap().ip().to_string();
let tx = self.tx.clone();
let mut rx = self.tx.subscribe();
let mut inbound = req.into_inner();
async fn remove_node(&self, _request: Request<RemoveNodeReq>) -> Result<Response<()>, Status> { let stream = async_stream::stream! {
// Handle RemoveNode request loop {
Ok(Response::new(())) tokio::select! {
Some(msg) = inbound.next() => {
match msg {
Ok(update) => {
if let Err(_) = tx.send(update.clone()) {
yield Err(Status::internal("Failed to broadcast update"));
}
}
Err(e) => {
yield Err(Status::internal(format!("Error receiving client stream: {}", e)));
break;
}
}
}
Ok(update) = rx.recv() => {
if update.ip != remote_ip {
yield Ok(update);
}
}
}
}
};
Ok(Response::new(Box::pin(stream) as Self::GetUpdatesStream))
} }
} }
async fn start_client() {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
async fn start_server() {
let addr = "0.0.0.0:31373".parse().unwrap();
let key_distribution = MyKeyDistribution::default();
Server::builder()
.add_service(KeyDistributionServer::new(key_distribution))
.serve(addr)
.await
.unwrap();
}
pub async fn start() {
let start_client = tokio::task::spawn(start_server());
let start_server = tokio::task::spawn(start_server());
futures::future::select(start_client, start_server).await;
}
pub fn add_node(ip: String) {
let mut csprng = OsRng;
let privkey = ed25519_dalek::SigningKey::generate(&mut csprng);
database::add_node(
ip,
NodeInfo {
pubkey: privkey.verifying_key(),
updated_at: std::time::SystemTime::now(),
},
);
database::add_key(privkey.verifying_key(), privkey);
}

@ -1,3 +1,4 @@
mod grpc;
mod datastore; mod datastore;
mod http_server; mod http_server;
use crate::datastore::Store; use crate::datastore::Store;