From 55a9993572454b1e8031e2ed23f78d6fd77a4c02 Mon Sep 17 00:00:00 2001 From: Valentyn Faychuk Date: Wed, 21 Aug 2024 17:51:03 +0200 Subject: [PATCH] applied suggestions --- .gitignore | 2 + Cargo.lock | 21 ++++++++ Cargo.toml | 1 + src/datastore.rs | 120 ++++++++++++++++++++++----------------------- src/grpc/client.rs | 8 +-- src/grpc/server.rs | 2 +- src/http_server.rs | 2 +- src/main.rs | 5 +- 8 files changed, 92 insertions(+), 69 deletions(-) diff --git a/.gitignore b/.gitignore index 5a5243b..c3d77fe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target build detee_challenge_nodes +.cargo +.idea diff --git a/Cargo.lock b/Cargo.lock index e05a48d..548118d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -316,6 +316,12 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -363,6 +369,20 @@ dependencies = [ "syn 2.0.74", ] +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "der" version = "0.7.9" @@ -652,6 +672,7 @@ name = "hacker-challenge" version = "0.1.0" dependencies = [ "async-stream", + "dashmap", "ed25519-dalek", "hex", "prost", diff --git a/Cargo.toml b/Cargo.toml index ea0015d..3116191 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] async-stream = "0.3.5" +dashmap = "6.0.1" ed25519-dalek = { version = "2.1.1", features = ["rand_core", "serde"] } hex = "0.4.3" prost = "0.13.1" diff --git a/src/datastore.rs b/src/datastore.rs index 2d408f3..dfbad71 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -1,12 +1,11 @@ use crate::grpc::challenge::NodeUpdate; -use ed25519_dalek::{Signer, SigningKey, VerifyingKey}; +use dashmap::{DashMap, DashSet}; +use ed25519_dalek::{Signer, SigningKey, VerifyingKey, PUBLIC_KEY_LENGTH}; use rand::rngs::OsRng; -use std::collections::{HashMap, HashSet}; use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; use tabled::{Table, Tabled}; -use tokio::sync::Mutex; #[derive(Clone, PartialEq, Debug)] pub struct NodeInfo { @@ -17,9 +16,9 @@ pub struct NodeInfo { /// Needs to be surrounded in an Arc. pub struct Store { - nodes: Mutex>, - conns: Mutex>, - keys: Mutex>, + nodes: DashMap, + conns: DashSet, + keys: DashMap, } pub enum SigningError { CorruptedKey, @@ -60,20 +59,18 @@ impl Store { // app should exit if any error happens here so unwrap() is good pub fn init() -> Self { Self { - nodes: Mutex::new(HashMap::new()), - keys: Mutex::new(HashMap::new()), - conns: Mutex::new(HashSet::new()), + nodes: DashMap::new(), + keys: DashMap::new(), + conns: DashSet::new(), } } pub async fn add_conn(&self, ip: &str) { - let mut conns = self.conns.lock().await; - conns.insert(ip.to_string()); + self.conns.insert(ip.to_string()); } pub async fn delete_conn(&self, ip: &str) { - let mut conns = self.conns.lock().await; - conns.remove(ip); + self.conns.remove(ip); } pub async fn tabled_node_list(&self) -> String { @@ -85,10 +82,13 @@ impl Store { public: bool, } let mut output = vec![]; - for (ip, node_info) in self.nodes.lock().await.iter() { - let ip = ip.clone(); + for (ip, node_info) in self + .nodes + .iter() + .map(|n| (n.key().clone(), n.value().clone())) + { let pubkey = hex::encode(node_info.pubkey.as_bytes()); - let age = std::time::SystemTime::now() + let age = SystemTime::now() .duration_since(node_info.updated_at) .unwrap_or(Duration::ZERO) .as_secs(); @@ -108,40 +108,37 @@ impl Store { message: &str, pubkey: &str, ) -> Result { - let key_bytes = hex::decode(pubkey)?; - let pubkey = VerifyingKey::from_bytes(&key_bytes.as_slice().try_into()?)?; + let mut pubkey_bytes = [0u8; PUBLIC_KEY_LENGTH]; + hex::decode_to_slice(pubkey, &mut pubkey_bytes)?; + let pubkey = VerifyingKey::from_bytes(&pubkey_bytes)?; let signing_key = match self.get_privkey(&pubkey).await { Some(k) => k, None => return Err(SigningError::KeyNotFound), }; - // TODO: check if to_bytes returns the signature in a format that people can verify from bash - let signature = hex::encode(signing_key.sign(message.as_bytes()).to_bytes()); + let signature = format!("{:?}", signing_key.sign(message.as_bytes())); Ok(signature) } pub async fn add_key(&self, pubkey: VerifyingKey, privkey: SigningKey) { - let mut keys = self.keys.lock().await; - keys.insert(pubkey, privkey); + self.keys.insert(pubkey, privkey); } pub async fn remove_key(&self, pubkey: &VerifyingKey) { - let mut keys = self.keys.lock().await; - keys.remove(pubkey); + self.keys.remove(pubkey); } async fn get_privkey(&self, pubkey: &VerifyingKey) -> Option { - let keys = self.keys.lock().await; - keys.get(pubkey).cloned() + self.keys.get(pubkey).map(|k| k.value().clone()) } /// This returns true if NodeInfo got modified. /// - /// On a side note, there are two types of people in this worlds: - /// 1. Those that can extrapolate... - pub async fn process_grpc_update(&self, node: NodeUpdate) -> bool { + /// On a side note, there are two types of people in this world: + /// 1. Those that can extrapolate... WAT? + pub async fn process_node_update(&self, node: NodeUpdate) -> bool { let key_bytes = match hex::decode(node.keypair.clone()) { Ok(k) => k, Err(_) => return false, @@ -151,7 +148,14 @@ impl Store { Err(_) => return false, }); let pubkey = privkey.verifying_key(); - let updated_at_std: std::time::SystemTime = match node.updated_at { + + // TODO: check this suggestion + // let updated_at_std = node + // .updated_at + // .map(SystemTime::try_from) + // .unwrap_or(Ok(SystemTime::now())) + // .unwrap_or(SystemTime::now()); + let updated_at_std: SystemTime = match node.updated_at { Some(ts) => { let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); UNIX_EPOCH @@ -185,8 +189,7 @@ impl Store { /// returns old pubkey if node got updated async fn update_node(&self, ip: String, info: NodeInfo) -> Option { - let mut nodes = self.nodes.lock().await; - nodes.insert(ip, info.clone()) + self.nodes.insert(ip, info.clone()) } // pub async fn remove_node(&self, ip: &str) { @@ -195,29 +198,25 @@ impl Store { // } pub async fn get_localhost(&self) -> NodeUpdate { - // these unwrap never fail - // you could trigger reset_localhost_keys on error, though - // TODO: do that so that code looks clean - let nodes = self.nodes.lock().await; - let keys = self.keys.lock().await; - let node_info = nodes.get("localhost").unwrap(); - let keypair = keys.get(&node_info.pubkey).unwrap(); + // TODO trigger reset_localhost_keys on error instead of expects + let node = self.nodes.get("localhost").expect("no localhost node"); + let key = self.keys.get(&node.pubkey).expect("no localhost key"); NodeUpdate { ip: "localhost".to_string(), - keypair: hex::encode(keypair.as_bytes()), - updated_at: Some(prost_types::Timestamp::from(node_info.updated_at)), + keypair: hex::encode(key.value().as_bytes()), + updated_at: Some(prost_types::Timestamp::from(node.value().updated_at)), public: false, } } - /// freshes the keys of the node and returns a protobuf for the network + /// refreshes the keys of the node and returns a protobuf for the network pub async fn reset_localhost_keys(&self) -> NodeUpdate { let mut csprng = OsRng; - let keypair_raw = ed25519_dalek::SigningKey::generate(&mut csprng); + let keypair_raw = SigningKey::generate(&mut csprng); let keypair = hex::encode(keypair_raw.as_bytes()); let pubkey = keypair_raw.verifying_key(); let ip = "localhost".to_string(); - let updated_at = std::time::SystemTime::now(); + let updated_at = SystemTime::now(); let public = false; self.add_key(pubkey, keypair_raw.clone()).await; if let Some(old_data) = self @@ -243,17 +242,17 @@ impl Store { } pub async fn get_full_node_list(&self) -> Vec { - let nodes = self.nodes.lock().await; - let keys = self.keys.lock().await; - nodes + self.nodes .iter() - .filter_map(|(ip, node_info)| { - keys.get(&node_info.pubkey).map(|signing_key| NodeUpdate { - ip: ip.to_string(), - keypair: hex::encode(signing_key.as_bytes()), - updated_at: Some(prost_types::Timestamp::from(node_info.updated_at)), - public: node_info.public, - }) + .filter_map(|node| { + self.keys + .get(&node.value().pubkey) + .map(|signing_key| NodeUpdate { + ip: node.key().to_string(), + keypair: hex::encode(signing_key.as_bytes()), + updated_at: Some(prost_types::Timestamp::from(node.value().updated_at)), + public: node.value().public, + }) }) .collect() } @@ -262,19 +261,16 @@ impl Store { pub async fn get_random_node(&self) -> Option { use rand::rngs::OsRng; use rand::RngCore; - let nodes = self.nodes.lock().await; - let conns = self.conns.lock().await; - let len = nodes.len(); + let len = self.nodes.len(); if len == 0 { return None; } let skip = OsRng.next_u64().try_into().unwrap_or(0) % len; - nodes - .keys() + self.nodes + .iter() + .map(|n| n.key().clone()) .cycle() .skip(skip) - .filter(|k| !conns.contains(*k)) - .next() - .cloned() + .find(|k| !self.conns.contains(k)) } } diff --git a/src/grpc/client.rs b/src/grpc/client.rs index a86c560..1e4bf64 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -3,7 +3,7 @@ use crate::datastore::Store; use crate::grpc::challenge::update_client::UpdateClient; use std::sync::Arc; use tokio::sync::broadcast::Sender; -use tokio::time::{sleep, Duration}; +use tokio::time::Duration; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; @@ -23,13 +23,14 @@ impl ConnManager { } pub async fn start(self) { + let mut interval = tokio::time::interval(Duration::from_secs(3)); loop { if let Some(node) = self.ds.get_random_node().await { if node != "localhost" { self.connect_wrapper(node.clone()).await; } } - sleep(Duration::from_secs(3)).await; + interval.tick().await; } } @@ -51,6 +52,7 @@ impl ConnManager { let response = client.get_updates(rx_stream).await?; let mut resp_stream = response.into_inner(); + // TODO: this is a hack to send the localhost node to the peer let _ = self.tx.send(self.ds.get_localhost().await); while let Some(mut update) = resp_stream.message().await? { @@ -63,7 +65,7 @@ impl ConnManager { } // update the entire network in case the information is new - if self.ds.process_grpc_update(update.clone()).await { + if self.ds.process_node_update(update.clone()).await { if let Err(_) = self.tx.send(update.clone()) { println!("tokio broadcast receivers had an issue consuming the channel"); } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 7052394..37a5ddd 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -62,7 +62,7 @@ impl Update for MyServer { // note that we don't set this node online, // as it can be behind NAT } - if update.ip != "127.0.0.1" && ds.process_grpc_update(update.clone()).await { + if update.ip != "127.0.0.1" && ds.process_node_update(update.clone()).await { if let Err(_) = tx.send(update.clone()) { println!("tokio broadcast receivers had an issue consuming the channel"); } diff --git a/src/http_server.rs b/src/http_server.rs index 132e876..3968a30 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -24,7 +24,7 @@ impl Writer for SignError { #[handler] async fn homepage(depot: &mut Depot) -> String { let ds = depot.obtain::>().unwrap(); - ds.tabled_node_list().await + ds.tabled_node_list().await // TODO: make this paginated } #[handler] diff --git a/src/main.rs b/src/main.rs index df4fe58..7808e22 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,12 +9,13 @@ use std::fs::File; use std::io::{BufRead, BufReader}; use std::sync::Arc; use tokio::sync::broadcast; -use tokio::time::{sleep, Duration}; +use tokio::time::Duration; async fn cycle_keys(ds: Arc, tx: Sender) { + let mut interval = tokio::time::interval(Duration::from_secs(60)); loop { let _ = tx.send(ds.reset_localhost_keys().await); - sleep(Duration::from_secs(60)).await; + interval.tick().await; } }