applied suggestions
This commit is contained in:
		
							parent
							
								
									b57675d273
								
							
						
					
					
						commit
						55a9993572
					
				
							
								
								
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @ -1,3 +1,5 @@ | ||||
| /target | ||||
| build | ||||
| detee_challenge_nodes | ||||
| .cargo | ||||
| .idea | ||||
|  | ||||
							
								
								
									
										21
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										21
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -316,6 +316,12 @@ dependencies = [ | ||||
|  "libc", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "crossbeam-utils" | ||||
| version = "0.8.20" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" | ||||
| 
 | ||||
| [[package]] | ||||
| name = "crypto-common" | ||||
| version = "0.1.6" | ||||
| @ -363,6 +369,20 @@ dependencies = [ | ||||
|  "syn 2.0.74", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "dashmap" | ||||
| version = "6.0.1" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" | ||||
| dependencies = [ | ||||
|  "cfg-if", | ||||
|  "crossbeam-utils", | ||||
|  "hashbrown 0.14.5", | ||||
|  "lock_api", | ||||
|  "once_cell", | ||||
|  "parking_lot_core", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "der" | ||||
| version = "0.7.9" | ||||
| @ -652,6 +672,7 @@ name = "hacker-challenge" | ||||
| version = "0.1.0" | ||||
| dependencies = [ | ||||
|  "async-stream", | ||||
|  "dashmap", | ||||
|  "ed25519-dalek", | ||||
|  "hex", | ||||
|  "prost", | ||||
|  | ||||
| @ -5,6 +5,7 @@ edition = "2021" | ||||
| 
 | ||||
| [dependencies] | ||||
| async-stream = "0.3.5" | ||||
| dashmap = "6.0.1" | ||||
| ed25519-dalek = { version = "2.1.1", features = ["rand_core", "serde"] } | ||||
| hex = "0.4.3" | ||||
| prost = "0.13.1" | ||||
|  | ||||
							
								
								
									
										116
									
								
								src/datastore.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										116
									
								
								src/datastore.rs
									
									
									
									
									
								
							| @ -1,12 +1,11 @@ | ||||
| use crate::grpc::challenge::NodeUpdate; | ||||
| use ed25519_dalek::{Signer, SigningKey, VerifyingKey}; | ||||
| use dashmap::{DashMap, DashSet}; | ||||
| use ed25519_dalek::{Signer, SigningKey, VerifyingKey, PUBLIC_KEY_LENGTH}; | ||||
| use rand::rngs::OsRng; | ||||
| use std::collections::{HashMap, HashSet}; | ||||
| use std::time::Duration; | ||||
| use std::time::SystemTime; | ||||
| use std::time::UNIX_EPOCH; | ||||
| use tabled::{Table, Tabled}; | ||||
| use tokio::sync::Mutex; | ||||
| 
 | ||||
| #[derive(Clone, PartialEq, Debug)] | ||||
| pub struct NodeInfo { | ||||
| @ -17,9 +16,9 @@ pub struct NodeInfo { | ||||
| 
 | ||||
| /// Needs to be surrounded in an Arc.
 | ||||
| pub struct Store { | ||||
|     nodes: Mutex<HashMap<IP, NodeInfo>>, | ||||
|     conns: Mutex<HashSet<IP>>, | ||||
|     keys: Mutex<HashMap<VerifyingKey, SigningKey>>, | ||||
|     nodes: DashMap<IP, NodeInfo>, | ||||
|     conns: DashSet<IP>, | ||||
|     keys: DashMap<VerifyingKey, SigningKey>, | ||||
| } | ||||
| pub enum SigningError { | ||||
|     CorruptedKey, | ||||
| @ -60,20 +59,18 @@ impl Store { | ||||
|     // app should exit if any error happens here so unwrap() is good
 | ||||
|     pub fn init() -> Self { | ||||
|         Self { | ||||
|             nodes: Mutex::new(HashMap::new()), | ||||
|             keys: Mutex::new(HashMap::new()), | ||||
|             conns: Mutex::new(HashSet::new()), | ||||
|             nodes: DashMap::new(), | ||||
|             keys: DashMap::new(), | ||||
|             conns: DashSet::new(), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub async fn add_conn(&self, ip: &str) { | ||||
|         let mut conns = self.conns.lock().await; | ||||
|         conns.insert(ip.to_string()); | ||||
|         self.conns.insert(ip.to_string()); | ||||
|     } | ||||
| 
 | ||||
|     pub async fn delete_conn(&self, ip: &str) { | ||||
|         let mut conns = self.conns.lock().await; | ||||
|         conns.remove(ip); | ||||
|         self.conns.remove(ip); | ||||
|     } | ||||
| 
 | ||||
|     pub async fn tabled_node_list(&self) -> String { | ||||
| @ -85,10 +82,13 @@ impl Store { | ||||
|             public: bool, | ||||
|         } | ||||
|         let mut output = vec![]; | ||||
|         for (ip, node_info) in self.nodes.lock().await.iter() { | ||||
|             let ip = ip.clone(); | ||||
|         for (ip, node_info) in self | ||||
|             .nodes | ||||
|             .iter() | ||||
|             .map(|n| (n.key().clone(), n.value().clone())) | ||||
|         { | ||||
|             let pubkey = hex::encode(node_info.pubkey.as_bytes()); | ||||
|             let age = std::time::SystemTime::now() | ||||
|             let age = SystemTime::now() | ||||
|                 .duration_since(node_info.updated_at) | ||||
|                 .unwrap_or(Duration::ZERO) | ||||
|                 .as_secs(); | ||||
| @ -108,40 +108,37 @@ impl Store { | ||||
|         message: &str, | ||||
|         pubkey: &str, | ||||
|     ) -> Result<String, SigningError> { | ||||
|         let key_bytes = hex::decode(pubkey)?; | ||||
|         let pubkey = VerifyingKey::from_bytes(&key_bytes.as_slice().try_into()?)?; | ||||
|         let mut pubkey_bytes = [0u8; PUBLIC_KEY_LENGTH]; | ||||
|         hex::decode_to_slice(pubkey, &mut pubkey_bytes)?; | ||||
|         let pubkey = VerifyingKey::from_bytes(&pubkey_bytes)?; | ||||
| 
 | ||||
|         let signing_key = match self.get_privkey(&pubkey).await { | ||||
|             Some(k) => k, | ||||
|             None => return Err(SigningError::KeyNotFound), | ||||
|         }; | ||||
| 
 | ||||
|         // TODO: check if to_bytes returns the signature in a format that people can verify from bash
 | ||||
|         let signature = hex::encode(signing_key.sign(message.as_bytes()).to_bytes()); | ||||
|         let signature = format!("{:?}", signing_key.sign(message.as_bytes())); | ||||
| 
 | ||||
|         Ok(signature) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn add_key(&self, pubkey: VerifyingKey, privkey: SigningKey) { | ||||
|         let mut keys = self.keys.lock().await; | ||||
|         keys.insert(pubkey, privkey); | ||||
|         self.keys.insert(pubkey, privkey); | ||||
|     } | ||||
| 
 | ||||
|     pub async fn remove_key(&self, pubkey: &VerifyingKey) { | ||||
|         let mut keys = self.keys.lock().await; | ||||
|         keys.remove(pubkey); | ||||
|         self.keys.remove(pubkey); | ||||
|     } | ||||
| 
 | ||||
|     async fn get_privkey(&self, pubkey: &VerifyingKey) -> Option<SigningKey> { | ||||
|         let keys = self.keys.lock().await; | ||||
|         keys.get(pubkey).cloned() | ||||
|         self.keys.get(pubkey).map(|k| k.value().clone()) | ||||
|     } | ||||
| 
 | ||||
|     /// This returns true if NodeInfo got modified.
 | ||||
|     ///
 | ||||
|     /// On a side note, there are two types of people in this worlds:
 | ||||
|     ///     1. Those that can extrapolate...
 | ||||
|     pub async fn process_grpc_update(&self, node: NodeUpdate) -> bool { | ||||
|     /// On a side note, there are two types of people in this world:
 | ||||
|     ///     1. Those that can extrapolate... WAT?
 | ||||
|     pub async fn process_node_update(&self, node: NodeUpdate) -> bool { | ||||
|         let key_bytes = match hex::decode(node.keypair.clone()) { | ||||
|             Ok(k) => k, | ||||
|             Err(_) => return false, | ||||
| @ -151,7 +148,14 @@ impl Store { | ||||
|             Err(_) => return false, | ||||
|         }); | ||||
|         let pubkey = privkey.verifying_key(); | ||||
|         let updated_at_std: std::time::SystemTime = match node.updated_at { | ||||
| 
 | ||||
|         // TODO: check this suggestion
 | ||||
|         // let updated_at_std = node
 | ||||
|         //     .updated_at
 | ||||
|         //     .map(SystemTime::try_from)
 | ||||
|         //     .unwrap_or(Ok(SystemTime::now()))
 | ||||
|         //     .unwrap_or(SystemTime::now());
 | ||||
|         let updated_at_std: SystemTime = match node.updated_at { | ||||
|             Some(ts) => { | ||||
|                 let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); | ||||
|                 UNIX_EPOCH | ||||
| @ -185,8 +189,7 @@ impl Store { | ||||
| 
 | ||||
|     /// returns old pubkey if node got updated
 | ||||
|     async fn update_node(&self, ip: String, info: NodeInfo) -> Option<NodeInfo> { | ||||
|         let mut nodes = self.nodes.lock().await; | ||||
|         nodes.insert(ip, info.clone()) | ||||
|         self.nodes.insert(ip, info.clone()) | ||||
|     } | ||||
| 
 | ||||
|     //    pub async fn remove_node(&self, ip: &str) {
 | ||||
| @ -195,29 +198,25 @@ impl Store { | ||||
|     //    }
 | ||||
| 
 | ||||
|     pub async fn get_localhost(&self) -> NodeUpdate { | ||||
|         // these unwrap never fail
 | ||||
|         // you could trigger reset_localhost_keys on error, though
 | ||||
|         // TODO: do that so that code looks clean
 | ||||
|         let nodes = self.nodes.lock().await; | ||||
|         let keys = self.keys.lock().await; | ||||
|         let node_info = nodes.get("localhost").unwrap(); | ||||
|         let keypair = keys.get(&node_info.pubkey).unwrap(); | ||||
|         // TODO trigger reset_localhost_keys on error instead of expects
 | ||||
|         let node = self.nodes.get("localhost").expect("no localhost node"); | ||||
|         let key = self.keys.get(&node.pubkey).expect("no localhost key"); | ||||
|         NodeUpdate { | ||||
|             ip: "localhost".to_string(), | ||||
|             keypair: hex::encode(keypair.as_bytes()), | ||||
|             updated_at: Some(prost_types::Timestamp::from(node_info.updated_at)), | ||||
|             keypair: hex::encode(key.value().as_bytes()), | ||||
|             updated_at: Some(prost_types::Timestamp::from(node.value().updated_at)), | ||||
|             public: false, | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /// freshes the keys of the node and returns a protobuf for the network
 | ||||
|     /// refreshes the keys of the node and returns a protobuf for the network
 | ||||
|     pub async fn reset_localhost_keys(&self) -> NodeUpdate { | ||||
|         let mut csprng = OsRng; | ||||
|         let keypair_raw = ed25519_dalek::SigningKey::generate(&mut csprng); | ||||
|         let keypair_raw = SigningKey::generate(&mut csprng); | ||||
|         let keypair = hex::encode(keypair_raw.as_bytes()); | ||||
|         let pubkey = keypair_raw.verifying_key(); | ||||
|         let ip = "localhost".to_string(); | ||||
|         let updated_at = std::time::SystemTime::now(); | ||||
|         let updated_at = SystemTime::now(); | ||||
|         let public = false; | ||||
|         self.add_key(pubkey, keypair_raw.clone()).await; | ||||
|         if let Some(old_data) = self | ||||
| @ -243,16 +242,16 @@ impl Store { | ||||
|     } | ||||
| 
 | ||||
|     pub async fn get_full_node_list(&self) -> Vec<NodeUpdate> { | ||||
|         let nodes = self.nodes.lock().await; | ||||
|         let keys = self.keys.lock().await; | ||||
|         nodes | ||||
|         self.nodes | ||||
|             .iter() | ||||
|             .filter_map(|(ip, node_info)| { | ||||
|                 keys.get(&node_info.pubkey).map(|signing_key| NodeUpdate { | ||||
|                     ip: ip.to_string(), | ||||
|             .filter_map(|node| { | ||||
|                 self.keys | ||||
|                     .get(&node.value().pubkey) | ||||
|                     .map(|signing_key| NodeUpdate { | ||||
|                         ip: node.key().to_string(), | ||||
|                         keypair: hex::encode(signing_key.as_bytes()), | ||||
|                     updated_at: Some(prost_types::Timestamp::from(node_info.updated_at)), | ||||
|                     public: node_info.public, | ||||
|                         updated_at: Some(prost_types::Timestamp::from(node.value().updated_at)), | ||||
|                         public: node.value().public, | ||||
|                     }) | ||||
|             }) | ||||
|             .collect() | ||||
| @ -262,19 +261,16 @@ impl Store { | ||||
|     pub async fn get_random_node(&self) -> Option<String> { | ||||
|         use rand::rngs::OsRng; | ||||
|         use rand::RngCore; | ||||
|         let nodes = self.nodes.lock().await; | ||||
|         let conns = self.conns.lock().await; | ||||
|         let len = nodes.len(); | ||||
|         let len = self.nodes.len(); | ||||
|         if len == 0 { | ||||
|             return None; | ||||
|         } | ||||
|         let skip = OsRng.next_u64().try_into().unwrap_or(0) % len; | ||||
|         nodes | ||||
|             .keys() | ||||
|         self.nodes | ||||
|             .iter() | ||||
|             .map(|n| n.key().clone()) | ||||
|             .cycle() | ||||
|             .skip(skip) | ||||
|             .filter(|k| !conns.contains(*k)) | ||||
|             .next() | ||||
|             .cloned() | ||||
|             .find(|k| !self.conns.contains(k)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -3,7 +3,7 @@ use crate::datastore::Store; | ||||
| use crate::grpc::challenge::update_client::UpdateClient; | ||||
| use std::sync::Arc; | ||||
| use tokio::sync::broadcast::Sender; | ||||
| use tokio::time::{sleep, Duration}; | ||||
| use tokio::time::Duration; | ||||
| use tokio_stream::wrappers::BroadcastStream; | ||||
| use tokio_stream::StreamExt; | ||||
| 
 | ||||
| @ -23,13 +23,14 @@ impl ConnManager { | ||||
|     } | ||||
| 
 | ||||
|     pub async fn start(self) { | ||||
|         let mut interval = tokio::time::interval(Duration::from_secs(3)); | ||||
|         loop { | ||||
|             if let Some(node) = self.ds.get_random_node().await { | ||||
|                 if node != "localhost" { | ||||
|                     self.connect_wrapper(node.clone()).await; | ||||
|                 } | ||||
|             } | ||||
|             sleep(Duration::from_secs(3)).await; | ||||
|             interval.tick().await; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| @ -51,6 +52,7 @@ impl ConnManager { | ||||
|         let response = client.get_updates(rx_stream).await?; | ||||
|         let mut resp_stream = response.into_inner(); | ||||
| 
 | ||||
|         // TODO: this is a hack to send the localhost node to the peer
 | ||||
|         let _ = self.tx.send(self.ds.get_localhost().await); | ||||
| 
 | ||||
|         while let Some(mut update) = resp_stream.message().await? { | ||||
| @ -63,7 +65,7 @@ impl ConnManager { | ||||
|             } | ||||
| 
 | ||||
|             // update the entire network in case the information is new
 | ||||
|             if self.ds.process_grpc_update(update.clone()).await { | ||||
|             if self.ds.process_node_update(update.clone()).await { | ||||
|                 if let Err(_) = self.tx.send(update.clone()) { | ||||
|                     println!("tokio broadcast receivers had an issue consuming the channel"); | ||||
|                 } | ||||
|  | ||||
| @ -62,7 +62,7 @@ impl Update for MyServer { | ||||
|                                     // note that we don't set this node online,
 | ||||
|                                     // as it can be behind NAT
 | ||||
|                                 } | ||||
|                                 if update.ip != "127.0.0.1" && ds.process_grpc_update(update.clone()).await { | ||||
|                                 if update.ip != "127.0.0.1" && ds.process_node_update(update.clone()).await { | ||||
|                                     if let Err(_) = tx.send(update.clone()) { | ||||
|                                         println!("tokio broadcast receivers had an issue consuming the channel"); | ||||
|                                     } | ||||
|  | ||||
| @ -24,7 +24,7 @@ impl Writer for SignError { | ||||
| #[handler] | ||||
| async fn homepage(depot: &mut Depot) -> String { | ||||
|     let ds = depot.obtain::<Arc<Store>>().unwrap(); | ||||
|     ds.tabled_node_list().await | ||||
|     ds.tabled_node_list().await // TODO: make this paginated
 | ||||
| } | ||||
| 
 | ||||
| #[handler] | ||||
|  | ||||
| @ -9,12 +9,13 @@ use std::fs::File; | ||||
| use std::io::{BufRead, BufReader}; | ||||
| use std::sync::Arc; | ||||
| use tokio::sync::broadcast; | ||||
| use tokio::time::{sleep, Duration}; | ||||
| use tokio::time::Duration; | ||||
| 
 | ||||
| async fn cycle_keys(ds: Arc<Store>, tx: Sender<NodeUpdate>) { | ||||
|     let mut interval = tokio::time::interval(Duration::from_secs(60)); | ||||
|     loop { | ||||
|         let _ = tx.send(ds.reset_localhost_keys().await); | ||||
|         sleep(Duration::from_secs(60)).await; | ||||
|         interval.tick().await; | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user