From 3b960ed596185bc3ed5de3ab4478fe2508137839 Mon Sep 17 00:00:00 2001 From: Valentyn Faychuk Date: Tue, 24 Dec 2024 02:01:42 +0200 Subject: [PATCH] remove logging, switch to clasics --- Cargo.toml | 2 - src/datastore.rs | 210 ++++++++++++++++++++++++--------------------- src/persistence.rs | 14 --- 3 files changed, 112 insertions(+), 114 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5311fa6..8ac8cfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,12 +5,10 @@ version = "0.1.0" edition = "2021" [dependencies] -once_cell = "1.20" env_logger = "0.11" actix-web = "4.9" async-stream = "0.3" chrono = "0.4" -dashmap = "6.1" tonic = "0.12" prost = "0.13" prost-types = "0.13" diff --git a/src/datastore.rs b/src/datastore.rs index 7cb0ba9..c7e728a 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -1,11 +1,11 @@ -use crate::persistence::{Logfile, SealError, SealedFile}; -use dashmap::{DashMap, DashSet}; +use crate::persistence::{SealError, SealedFile}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, TimestampSeconds}; +use std::collections::{HashMap, HashSet}; +use std::sync::RwLock; use std::time::{Duration, SystemTime, UNIX_EPOCH}; type IP = String; -const LOG_PATH: &str = "/host/main/logs"; const LOCAL_INFO_FILE: &str = "/host/main/node_info"; #[serde_as] @@ -25,6 +25,20 @@ pub struct NodeInfo { } impl NodeInfo { + pub fn new_empty() -> Self { + NodeInfo { + started_at: SystemTime::now(), + keepalive: SystemTime::now(), + mint_requests: 0, + mints: 0, + mratls_conns: 0, + net_attacks: 0, + public: false, + restarts: 0, + disk_attacks: 0, + } + } + pub fn is_newer_than(&self, older_self: &Self) -> bool { self.keepalive > older_self.keepalive } @@ -50,13 +64,6 @@ impl NodeInfo { res } - pub fn log(&self, ip: &IP) { - let json = format!("{{\"ip\":\"{}\",", ip) + &self.to_json()[1..]; - if let Err(e) = Logfile::append(LOG_PATH, &format!("{}\n", &json)) { - println!("Could not log node info: {:?}", e); - } - } - pub fn load() -> Self { match Self::read(LOCAL_INFO_FILE) { Ok(mut info) => { @@ -66,29 +73,11 @@ impl NodeInfo { } Err(SealError::Attack(e)) => { println!("The local node file is corrupted: {}", e); - NodeInfo { - started_at: SystemTime::now(), - keepalive: SystemTime::now(), - mint_requests: 0, - mints: 0, - mratls_conns: 0, - net_attacks: 0, - public: false, - restarts: 0, - disk_attacks: 1, // add very first disk attack - } + let mut info = Self::new_empty(); + info.disk_attacks += 1; // add very first disk attack + info } - Err(_) => NodeInfo { - started_at: SystemTime::now(), - keepalive: SystemTime::now(), - mint_requests: 0, - mints: 0, - mratls_conns: 0, - net_attacks: 0, - public: false, - restarts: 0, - disk_attacks: 0, - }, + Err(_) => Self::new_empty(), } } @@ -103,88 +92,103 @@ impl NodeInfo { /// shared everywhere in the code pub struct State { my_ip: String, - nodes: DashMap, - conns: DashSet, + nodes: RwLock>, + conns: RwLock>, } impl State { pub fn new(my_ip: String) -> Self { - let nodes = DashMap::new(); + let mut nodes = HashMap::new(); let my_info = NodeInfo::load(); nodes.insert(my_ip.clone(), my_info); - Self { my_ip, nodes, conns: DashSet::new() } + Self { my_ip, nodes: RwLock::new(nodes), conns: RwLock::new(HashSet::new()) } } pub fn add_conn(&self, ip: &str) { - self.conns.insert(ip.to_string()); self.increase_mratls_conns(); + self.add_mratls_conn(ip); } pub fn delete_conn(&self, ip: &str) { - self.conns.remove(ip); self.decrease_mratls_conns(); + self.delete_mratls_conn(ip); + } + + fn add_mratls_conn(&self, ip: &str) { + if let Ok(mut conns) = self.conns.write() { + conns.insert(ip.to_string()); + } + } + + fn delete_mratls_conn(&self, ip: &str) { + if let Ok(mut conns) = self.conns.write() { + conns.insert(ip.to_string()); + } } pub fn increase_mint_requests(&self) { - if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) { - my_info.mint_requests += 1; - my_info.log(my_info.key()); - my_info.save(); + if let Ok(mut nodes) = self.nodes.write() { + if let Some(my_info) = nodes.get_mut(&self.my_ip) { + my_info.mint_requests += 1; + } } } pub fn increase_mints(&self) { - if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) { - my_info.mints += 1; - my_info.log(my_info.key()); - my_info.save(); + if let Ok(mut nodes) = self.nodes.write() { + if let Some(my_info) = nodes.get_mut(&self.my_ip) { + my_info.mints += 1; + } } } pub fn increase_mratls_conns(&self) { - if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) { - my_info.mratls_conns += 1; - my_info.log(my_info.key()); - my_info.save(); + if let Ok(mut nodes) = self.nodes.write() { + if let Some(my_info) = nodes.get_mut(&self.my_ip) { + my_info.mratls_conns += 1; + } } } pub fn decrease_mratls_conns(&self) { - if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) { - if my_info.mratls_conns > 0 { - my_info.mratls_conns -= 1; - my_info.log(my_info.key()); - my_info.save(); + if let Ok(mut nodes) = self.nodes.write() { + if let Some(my_info) = nodes.get_mut(&self.my_ip) { + if my_info.mratls_conns > 0 { + my_info.mratls_conns -= 1; + } } } } pub fn increase_disk_attacks(&self) { - if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) { - my_info.disk_attacks += 1; - my_info.log(my_info.key()); - my_info.save(); + if let Ok(mut nodes) = self.nodes.write() { + if let Some(my_info) = nodes.get_mut(&self.my_ip) { + my_info.disk_attacks += 1; + } } } pub fn increase_net_attacks(&self) { - if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) { - my_info.net_attacks += 1; - my_info.log(my_info.key()); - my_info.save(); + if let Ok(mut nodes) = self.nodes.write() { + if let Some(my_info) = nodes.get_mut(&self.my_ip) { + my_info.net_attacks += 1; + } } } pub fn declare_myself_public(&self) { - if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) { - my_info.public = true; - my_info.log(my_info.key()); - my_info.save(); + if let Ok(mut nodes) = self.nodes.write() { + if let Some(my_info) = nodes.get_mut(&self.my_ip) { + my_info.public = true; + } } } pub fn get_nodes(&self) -> Vec<(String, NodeInfo)> { - self.nodes.clone().into_iter().collect() + if let Ok(nodes) = self.nodes.read() { + return nodes.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); + } + Vec::new() } pub fn get_my_ip(&self) -> String { @@ -192,57 +196,67 @@ impl State { } pub fn get_my_info(&self) -> NodeInfo { - let mut my_info = self.nodes.get_mut(&self.my_ip).expect("no info for this node"); - my_info.keepalive = SystemTime::now(); - my_info.clone() + if let Ok(nodes) = self.nodes.read() { + if let Some(found_info) = nodes.get(&self.my_ip) { + return found_info.clone(); + } + } + NodeInfo::new_empty() } pub fn get_connected_ips(&self) -> Vec { - self.conns.iter().map(|n| n.key().clone()).collect() + if let Ok(conns) = self.conns.read() { + return conns.iter().map(|n| n.clone()).collect(); + } + Vec::new() } // returns a random node that does not have an active connection pub fn get_random_disconnected_ip(&self) -> Option { use rand::{rngs::OsRng, RngCore}; - let len = self.nodes.len(); - if len == 0 { - return None; - } + let conn_ips = self.get_connected_ips(); - let skip = OsRng.next_u64().try_into().unwrap_or(0) % len; - self.nodes - .iter() - .map(|n| n.key().clone()) - .cycle() - .skip(skip) - .find(|ip| ip != &self.my_ip && !conn_ips.contains(ip)) + if let Ok(nodes) = self.nodes.read() { + let skip = OsRng.next_u64().try_into().unwrap_or(0) % nodes.len(); + return nodes + .keys() + .map(|ip| ip.to_string()) + .filter(|ip| ip != &self.my_ip && !conn_ips.contains(ip)) + .cycle() + .skip(skip) + .next(); + } + None } /// This returns true if the update should be further forwarded /// For example, we never forward our own updates that came back pub fn process_node_update(&self, (node_ip, node_info): (String, NodeInfo)) -> bool { let is_update_mine = node_ip.eq(&self.my_ip); - let is_update_new = self - .nodes - .get(&node_ip) - .map(|curr_info| node_info.is_newer_than(&curr_info)) - .unwrap_or(true); + let mut is_update_new = false; + if let Ok(mut nodes) = self.nodes.write() { + is_update_new = nodes + .get(&node_ip) + .map(|curr_info| node_info.is_newer_than(&curr_info)) + .unwrap_or(true); + if is_update_new { + nodes.insert(node_ip.clone(), node_info.clone()); + } + } if is_update_new { println!("Inserting: {}, {}", node_ip, node_info.to_json()); - node_info.log(&node_ip); - self.nodes.insert(node_ip, node_info); } is_update_new && !is_update_mine } pub fn remove_inactive_nodes(&self, max_age: u64) { - self.nodes.retain(|_, node_info| { - // HACK: Check if it is possible to abuse network by corrupting system time - let age = SystemTime::now() - .duration_since(node_info.keepalive) - .unwrap_or(Duration::ZERO) - .as_secs(); - age <= max_age - }); + if let Ok(mut nodes) = self.nodes.write() { + // TODO: Check if it is possible to corrupt SGX system time + let now = SystemTime::now(); + nodes.retain(|_, n| { + let age = now.duration_since(n.keepalive).unwrap_or(Duration::ZERO).as_secs(); + age <= max_age + }); + } } } diff --git a/src/persistence.rs b/src/persistence.rs index 5ee7fc9..355b2cb 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -1,21 +1,7 @@ use crate::{datastore::NodeInfo, grpc::challenge::Keys}; use detee_sgx::SgxError; -use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_with::{base64::Base64, serde_as}; -use std::{io::Write, sync::Mutex}; -pub struct Logfile {} - -static LOG_MUTEX: Lazy> = Lazy::new(|| Mutex::new(())); - -impl Logfile { - pub fn append(path: &str, msg: &str) -> Result<(), Box> { - let _lock = LOG_MUTEX.lock(); - let mut file = std::fs::OpenOptions::new().create(true).append(true).open(path)?; - file.write_all(msg.as_bytes())?; - Ok(()) - } -} impl SealedFile for KeysFile {} impl SealedFile for NodeInfo {}