248 lines
7.9 KiB
Rust
248 lines
7.9 KiB
Rust
use crate::persistence::{Logfile, SealError, SealedFile};
|
|
use dashmap::{DashMap, DashSet};
|
|
use serde::{Deserialize, Serialize};
|
|
use serde_with::{serde_as, TimestampSeconds};
|
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
|
|
|
type IP = String;
|
|
const LOG_PATH: &str = "/host/main/logs";
|
|
const LOCAL_INFO_FILE: &str = "/host/main/node_info";
|
|
|
|
#[serde_as]
|
|
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
|
|
pub struct NodeInfo {
|
|
#[serde_as(as = "TimestampSeconds")]
|
|
pub started_at: SystemTime,
|
|
#[serde_as(as = "TimestampSeconds")]
|
|
pub keepalive: SystemTime,
|
|
pub mint_requests: u64,
|
|
pub mints: u64,
|
|
pub mratls_conns: u64,
|
|
pub net_attacks: u64,
|
|
pub public: bool,
|
|
pub restarts: u64,
|
|
pub disk_attacks: u64,
|
|
}
|
|
|
|
impl NodeInfo {
|
|
pub fn is_newer_than(&self, older_self: &Self) -> bool {
|
|
self.keepalive > older_self.keepalive
|
|
}
|
|
|
|
pub fn to_json(&self) -> String {
|
|
serde_json::to_string(self).unwrap() // can fail only if time goes backwards :D
|
|
}
|
|
|
|
pub fn to_metrics(&self, ip: &str) -> String {
|
|
let started_at = self.started_at.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO);
|
|
let keepalive = self.keepalive.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO);
|
|
let labels = format!("{{ip=\"{}\", public=\"{}\"}}", ip, self.public);
|
|
|
|
let mut res = String::new();
|
|
res.push_str(&format!("started_at{} {}\n", labels, started_at.as_secs()));
|
|
res.push_str(&format!("keepalive{} {}\n", labels, keepalive.as_secs()));
|
|
res.push_str(&format!("mint_requests{} {}\n", labels, self.mint_requests));
|
|
res.push_str(&format!("mints{} {}\n", labels, self.mints));
|
|
res.push_str(&format!("mratls_conns{} {}\n", labels, self.mratls_conns));
|
|
res.push_str(&format!("net_attacks{} {}\n", labels, self.net_attacks));
|
|
res.push_str(&format!("restarts{} {}\n", labels, self.restarts));
|
|
res.push_str(&format!("disk_attacks{} {}\n", labels, self.disk_attacks));
|
|
res
|
|
}
|
|
|
|
pub fn log(&self, ip: &IP) {
|
|
let json = format!("{{\"ip\":\"{}\",", ip) + &self.to_json()[1..];
|
|
if let Err(e) = Logfile::append(LOG_PATH, &format!("{}\n", &json)) {
|
|
println!("Could not log node info: {:?}", e);
|
|
}
|
|
}
|
|
|
|
pub fn load() -> Self {
|
|
match Self::read(LOCAL_INFO_FILE) {
|
|
Ok(mut info) => {
|
|
info.mratls_conns = 0;
|
|
info.restarts += 1;
|
|
info
|
|
}
|
|
Err(SealError::Attack(e)) => {
|
|
println!("The local node file is corrupted: {}", e);
|
|
NodeInfo {
|
|
started_at: SystemTime::now(),
|
|
keepalive: SystemTime::now(),
|
|
mint_requests: 0,
|
|
mints: 0,
|
|
mratls_conns: 0,
|
|
net_attacks: 0,
|
|
public: false,
|
|
restarts: 0,
|
|
disk_attacks: 1, // add very first disk attack
|
|
}
|
|
}
|
|
Err(_) => NodeInfo {
|
|
started_at: SystemTime::now(),
|
|
keepalive: SystemTime::now(),
|
|
mint_requests: 0,
|
|
mints: 0,
|
|
mratls_conns: 0,
|
|
net_attacks: 0,
|
|
public: false,
|
|
restarts: 0,
|
|
disk_attacks: 0,
|
|
},
|
|
}
|
|
}
|
|
|
|
pub fn save(&self) {
|
|
if let Err(e) = self.write(LOCAL_INFO_FILE) {
|
|
println!("Could not save node info: {}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Multithreaded state, designed to be
|
|
/// shared everywhere in the code
|
|
pub struct State {
|
|
my_ip: String,
|
|
nodes: DashMap<IP, NodeInfo>,
|
|
conns: DashSet<IP>,
|
|
}
|
|
|
|
impl State {
|
|
pub fn new(my_ip: String) -> Self {
|
|
let nodes = DashMap::new();
|
|
let my_info = NodeInfo::load();
|
|
nodes.insert(my_ip.clone(), my_info);
|
|
Self { my_ip, nodes, conns: DashSet::new() }
|
|
}
|
|
|
|
pub fn add_conn(&self, ip: &str) {
|
|
self.conns.insert(ip.to_string());
|
|
self.increase_mratls_conns();
|
|
}
|
|
|
|
pub fn delete_conn(&self, ip: &str) {
|
|
self.conns.remove(ip);
|
|
self.decrease_mratls_conns();
|
|
}
|
|
|
|
pub fn increase_mint_requests(&self) {
|
|
if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) {
|
|
my_info.mint_requests += 1;
|
|
my_info.log(my_info.key());
|
|
my_info.save();
|
|
}
|
|
}
|
|
|
|
pub fn increase_mints(&self) {
|
|
if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) {
|
|
my_info.mints += 1;
|
|
my_info.log(my_info.key());
|
|
my_info.save();
|
|
}
|
|
}
|
|
|
|
pub fn increase_mratls_conns(&self) {
|
|
if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) {
|
|
my_info.mratls_conns += 1;
|
|
my_info.log(my_info.key());
|
|
my_info.save();
|
|
}
|
|
}
|
|
|
|
pub fn decrease_mratls_conns(&self) {
|
|
if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) {
|
|
if my_info.mratls_conns > 0 {
|
|
my_info.mratls_conns -= 1;
|
|
my_info.log(my_info.key());
|
|
my_info.save();
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn increase_disk_attacks(&self) {
|
|
if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) {
|
|
my_info.disk_attacks += 1;
|
|
my_info.log(my_info.key());
|
|
my_info.save();
|
|
}
|
|
}
|
|
|
|
pub fn increase_net_attacks(&self) {
|
|
if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) {
|
|
my_info.net_attacks += 1;
|
|
my_info.log(my_info.key());
|
|
my_info.save();
|
|
}
|
|
}
|
|
|
|
pub fn declare_myself_public(&self) {
|
|
if let Some(mut my_info) = self.nodes.get_mut(&self.my_ip) {
|
|
my_info.public = true;
|
|
my_info.log(my_info.key());
|
|
my_info.save();
|
|
}
|
|
}
|
|
|
|
pub fn get_nodes(&self) -> Vec<(String, NodeInfo)> {
|
|
self.nodes.clone().into_iter().collect()
|
|
}
|
|
|
|
pub fn get_my_ip(&self) -> String {
|
|
self.my_ip.clone()
|
|
}
|
|
|
|
pub fn get_my_info(&self) -> NodeInfo {
|
|
let mut my_info = self.nodes.get_mut(&self.my_ip).expect("no info for this node");
|
|
my_info.keepalive = SystemTime::now();
|
|
my_info.clone()
|
|
}
|
|
|
|
pub fn get_connected_ips(&self) -> Vec<String> {
|
|
self.conns.iter().map(|n| n.key().clone()).collect()
|
|
}
|
|
|
|
// returns a random node that does not have an active connection
|
|
pub fn get_random_disconnected_ip(&self) -> Option<String> {
|
|
use rand::{rngs::OsRng, RngCore};
|
|
let len = self.nodes.len();
|
|
if len == 0 {
|
|
return None;
|
|
}
|
|
let skip = OsRng.next_u64().try_into().unwrap_or(0) % len;
|
|
self.nodes
|
|
.iter()
|
|
.map(|n| n.key().clone())
|
|
.cycle()
|
|
.skip(skip)
|
|
.find(|ip| ip != &self.my_ip && !self.conns.contains(ip))
|
|
}
|
|
|
|
/// This returns true if the update should be further forwarded
|
|
/// For example, we never forward our own updates that came back
|
|
pub fn process_node_update(&self, (node_ip, node_info): (String, NodeInfo)) -> bool {
|
|
let is_update_mine = node_ip.eq(&self.my_ip);
|
|
let is_update_new = self
|
|
.nodes
|
|
.get(&node_ip)
|
|
.map(|curr_info| node_info.is_newer_than(&curr_info))
|
|
.unwrap_or(true);
|
|
if is_update_new {
|
|
println!("Inserting: {}, {}", node_ip, node_info.to_json());
|
|
node_info.log(&node_ip);
|
|
self.nodes.insert(node_ip, node_info);
|
|
}
|
|
is_update_new && !is_update_mine
|
|
}
|
|
|
|
pub fn remove_inactive_nodes(&self, max_age: u64) {
|
|
self.nodes.retain(|_, node_info| {
|
|
// HACK: Check if it is possible to abuse network by corrupting system time
|
|
let age = SystemTime::now()
|
|
.duration_since(node_info.keepalive)
|
|
.unwrap_or(Duration::ZERO)
|
|
.as_secs();
|
|
age <= max_age
|
|
});
|
|
}
|
|
}
|