use crate::{grpc::challenge::NodeUpdate, persistence::FileManager}; use dashmap::{DashMap, DashSet}; use solana_sdk::{ pubkey::{ParsePubkeyError, Pubkey}, signature::{keypair::Keypair, Signer}, }; use std::{ str::FromStr, time::{Duration, SystemTime, UNIX_EPOCH}, }; use tabled::{Table, Tabled}; #[derive(Clone, PartialEq, Debug)] pub struct NodeInfo { pub pubkey: Pubkey, pub updated_at: SystemTime, pub public: bool, } /// Needs to be surrounded in an Arc. pub struct Store { nodes: DashMap, conns: DashSet, keys: DashMap, persistence: FileManager, } #[derive(Debug)] pub enum SigningError { CorruptedKey, KeyNotFound, } impl From for SigningError { fn from(_: hex::FromHexError) -> Self { Self::CorruptedKey } } impl From for SigningError { fn from(_: ParsePubkeyError) -> Self { Self::CorruptedKey } } impl From for SigningError { fn from(_: std::array::TryFromSliceError) -> Self { Self::CorruptedKey } } impl From for SigningError { fn from(_: std::io::Error) -> Self { Self::CorruptedKey } } type IP = String; impl std::fmt::Display for SigningError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let error_message = match self { SigningError::CorruptedKey => "The public key is corrupted", SigningError::KeyNotFound => "Did not find the private key", }; write!(f, "{}", error_message) } } impl Store { // app should exit if any error happens here so unwrap() is good pub async fn init(path: &str) -> Self { Self { nodes: DashMap::new(), keys: DashMap::new(), conns: DashSet::new(), persistence: FileManager::init(path).await.unwrap(), } } pub async fn add_conn(&self, ip: &str) { self.conns.insert(ip.to_string()); } pub async fn delete_conn(&self, ip: &str) { self.conns.remove(ip); } pub async fn tabled_memory_list(&self) -> String { #[derive(Tabled)] struct OutputRow { ip: String, pubkey: String, age: u64, public: bool, } let mut output = vec![]; for (ip, node_info) in self.nodes.iter().map(|n| (n.key().clone(), n.value().clone())) { let pubkey = node_info.pubkey.to_string(); let age = SystemTime::now() .duration_since(node_info.updated_at) .unwrap_or(Duration::ZERO) .as_secs(); let public = node_info.public; output.push(OutputRow { ip, pubkey, age, public }); } Table::new(output).to_string() } pub async fn tabled_disk_list(&self, page: u64) -> String { let mut offset = page.wrapping_mul(20); #[derive(Tabled)] struct OutputRow { id: u64, ip: String, pubkey: String, timestamp: String, } let mut output = vec![]; for (ip, keypair, timestamp) in self.persistence.get_page_of_20(offset).await.unwrap().iter().map(|n| { ( n.ip.to_string(), Keypair::from_bytes(&n.keypair.to_bytes()).unwrap(), n.joined_at.duration_since(UNIX_EPOCH).unwrap().as_secs().to_string(), ) }) { let id = offset; let pubkey = keypair.pubkey().to_string(); output.push(OutputRow { id, ip, pubkey, timestamp }); offset += 1; } Table::new(output).to_string() } pub async fn disk_sign_message_with_key( &self, message: &str, key_id: u64, ) -> Result { let crate::persistence::Node { keypair, .. } = self.persistence.get_node_by_id(key_id).await?; let signature = keypair.sign_message(message.as_bytes()); Ok(signature.to_string()) } pub async fn sign_message_with_key( &self, message: &str, pubkey: &str, ) -> Result { let pubkey = Pubkey::from_str(&pubkey)?; let keypair = match self.get_keypair(&pubkey).await { Some(k) => k, None => return Err(SigningError::KeyNotFound), }; let signature = keypair.sign_message(message.as_bytes()); Ok(signature.to_string()) } pub async fn add_key(&self, pubkey: Pubkey, keypair: Keypair) { self.keys.insert(pubkey, keypair); } pub async fn remove_key(&self, pubkey: &Pubkey) { self.keys.remove(pubkey); } async fn get_keypair(&self, pubkey: &Pubkey) -> Option { self.keys.get(pubkey).map(|k| Keypair::from_bytes(&k.to_bytes()).unwrap()) } /// This returns true if NodeInfo got modified. /// /// On a side note, there are two types of people in this world: /// 1. Those that can extrapolate... WAT? pub async fn process_node_update(&self, node: NodeUpdate) -> bool { // solana-sdk is great; it panics if the base58 string is corrupted // we wrap this in catch_unwind() to make sure it doesn't crash the app let keypair = match std::panic::catch_unwind(|| Keypair::from_base58_string(&node.keypair.clone())) { Ok(k) => k, Err(_) => return false, }; let pubkey = keypair.pubkey(); // TODO: check this suggestion // let updated_at_std = node // .updated_at // .map(SystemTime::try_from) // .unwrap_or(Ok(SystemTime::now())) // .unwrap_or(SystemTime::now()); let updated_at_std: SystemTime = match node.updated_at { Some(ts) => { let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); UNIX_EPOCH.checked_add(duration).unwrap_or(SystemTime::now()) } None => SystemTime::now(), }; self.add_key(pubkey, Keypair::from_bytes(&keypair.to_bytes()).unwrap()).await; let node_info = NodeInfo { pubkey, updated_at: updated_at_std, public: node.public }; if let Some(mut old_node_info) = self.update_node(node.ip.clone(), node_info.clone()).await { if !node_info.public { old_node_info.public = node_info.public; } match old_node_info.ne(&node_info) { true => { self.remove_key(&old_node_info.pubkey).await; true } false => false, } } else { if let Ok(persistence_node) = (node.ip.as_str(), keypair, updated_at_std).try_into() { if let Err(e) = self.persistence.append_node(persistence_node).await { println!("Could not save data to disk: {e}."); } } true } } /// returns old pubkey if node got updated async fn update_node(&self, ip: String, info: NodeInfo) -> Option { if let Some(old_node) = self.nodes.get(&ip) { if old_node.updated_at >= info.updated_at { return Some(info); } } self.nodes.insert(ip, info.clone()) } pub async fn remove_inactive_nodes(&self) { let mut dangling_pubkeys = Vec::new(); self.nodes.retain(|_, v| { let age = SystemTime::now().duration_since(v.updated_at).unwrap_or(Duration::ZERO).as_secs(); if age > 120 { dangling_pubkeys.push(v.pubkey.clone()); false } else { true } }); for pubkey in dangling_pubkeys.iter() { self.keys.remove(pubkey); } } pub async fn get_localhost(&self) -> NodeUpdate { // TODO trigger reset_localhost_keys on error instead of expects let node = self.nodes.get("localhost").expect("no localhost node"); let keypair = self.keys.get(&node.pubkey).expect("no localhost key"); NodeUpdate { ip: "localhost".to_string(), keypair: keypair.to_base58_string(), updated_at: Some(prost_types::Timestamp::from(node.value().updated_at)), public: false, } } /// refreshes the keys of the node and returns a protobuf for the network pub async fn reset_localhost_keys(&self) -> NodeUpdate { let keypair_raw = Keypair::new(); let keypair = keypair_raw.to_base58_string(); let pubkey = keypair_raw.pubkey(); let ip = "localhost".to_string(); let updated_at = SystemTime::now(); let public = false; self.add_key(pubkey, keypair_raw).await; if let Some(old_data) = self.update_node(ip.clone(), NodeInfo { pubkey, updated_at, public }).await { self.remove_key(&old_data.pubkey).await; }; let updated_at = Some(prost_types::Timestamp::from(updated_at)); NodeUpdate { ip, keypair, updated_at, public } } pub async fn get_full_node_list(&self) -> Vec { self.nodes .iter() .filter_map(|node| { self.keys.get(&node.value().pubkey).map(|keypair| NodeUpdate { ip: node.key().to_string(), keypair: keypair.to_base58_string(), updated_at: Some(prost_types::Timestamp::from(node.value().updated_at)), public: node.value().public, }) }) .collect() } // returns a random node that does not have an active connection pub async fn get_random_node(&self) -> Option { use rand::{rngs::OsRng, RngCore}; let len = self.nodes.len(); if len == 0 { return None; } let skip = OsRng.next_u64().try_into().unwrap_or(0) % len; self.nodes .iter() .map(|n| n.key().clone()) .cycle() .skip(skip) .find(|k| !self.conns.contains(k)) } } #[cfg(test)] mod tests { use std::time::{SystemTime, UNIX_EPOCH}; use dashmap::DashMap; use tokio::{fs::File, io::AsyncWriteExt}; use super::*; async fn setup_file_manager(function: &str) -> std::io::Result { let _ = tokio::fs::create_dir_all(".tmp").await; let path = ".tmp/ds_".to_string() + function; let mut file = File::create(path.clone()).await?; file.flush().await?; drop(file); FileManager::init(&path).await } #[test] fn node_info_creation() { let keypair = Keypair::new(); let node_info = NodeInfo { pubkey: keypair.pubkey(), updated_at: SystemTime::now(), public: true }; assert_eq!(node_info.pubkey, keypair.pubkey()); assert!(node_info.updated_at >= UNIX_EPOCH); assert!(node_info.public); } #[tokio::test] async fn store_creation() { let store = Store { nodes: DashMap::new(), conns: DashSet::new(), keys: DashMap::new(), persistence: setup_file_manager("store_creation").await.unwrap(), }; assert!(store.nodes.is_empty()); assert!(store.conns.is_empty()); assert!(store.keys.is_empty()); } #[test] fn signing_error_from_hex_error() { let hex_error: Result<(), hex::FromHexError> = Err(hex::FromHexError::InvalidHexCharacter { c: 'a', index: 0 }); let signing_error: SigningError = hex_error.unwrap_err().into(); match signing_error { SigningError::CorruptedKey => assert!(true), _ => assert!(false, "Expected SigningError::CorruptedKey"), } } #[tokio::test] async fn sign_message_with_key() { let keypair = Keypair::new(); let pubkey_string = keypair.pubkey().to_string(); let store = Store { nodes: DashMap::new(), conns: DashSet::new(), keys: DashMap::new(), persistence: setup_file_manager("sign_message_with_key").await.unwrap(), }; store.keys.insert(keypair.pubkey(), keypair); let message = "Test message"; let result = store.sign_message_with_key(message, &pubkey_string).await; assert!(result.is_ok()); if let Ok(signature) = result { assert!(!signature.is_empty()); } } #[tokio::test] async fn process_node_update() { let keypair = Keypair::new(); let node_update = NodeUpdate { ip: "127.0.0.1".to_string(), keypair: keypair.to_base58_string(), updated_at: Some(prost_types::Timestamp { seconds: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, nanos: 0, }), public: true, }; let store = Store { nodes: DashMap::new(), conns: DashSet::new(), keys: DashMap::new(), persistence: setup_file_manager("process_node_update").await.unwrap(), }; let result = store.process_node_update(node_update).await; assert!(result); assert!(store.nodes.contains_key("127.0.0.1")); } #[tokio::test] async fn get_full_node_list() { let keypair = Keypair::new(); let node_info = NodeInfo { pubkey: keypair.pubkey(), updated_at: SystemTime::now(), public: true }; let store = Store { nodes: DashMap::new(), conns: DashSet::new(), keys: DashMap::new(), persistence: setup_file_manager("get_full_node_list").await.unwrap(), }; store.nodes.insert("127.0.0.1".to_string(), node_info); store.keys.insert(keypair.pubkey(), Keypair::from_bytes(&keypair.to_bytes()).unwrap()); let node_list = store.get_full_node_list().await; assert_eq!(node_list.len(), 1); assert_eq!(node_list[0].ip, "127.0.0.1"); assert_eq!(node_list[0].keypair, keypair.to_base58_string()); assert!(node_list[0].public); } }