saving node info and restarts metric
Signed-off-by: Valentyn Faychuk <valy@detee.ltd>
This commit is contained in:
parent
91fe7d810d
commit
334b600a2d
34
Cargo.lock
generated
34
Cargo.lock
generated
@ -1546,7 +1546,7 @@ dependencies = [
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"tower 0.5.1",
|
||||
"tower-http",
|
||||
"tower-http 0.5.2",
|
||||
"x509-parser 0.16.0",
|
||||
]
|
||||
|
||||
@ -2049,6 +2049,7 @@ dependencies = [
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"tower 0.5.1",
|
||||
"tower-http 0.6.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -5708,6 +5709,37 @@ dependencies = [
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-http"
|
||||
version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697"
|
||||
dependencies = [
|
||||
"async-compression",
|
||||
"base64 0.22.1",
|
||||
"bitflags 2.6.0",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.1",
|
||||
"http-body-util",
|
||||
"http-range-header",
|
||||
"httpdate",
|
||||
"iri-string",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower 0.5.1",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-layer"
|
||||
version = "0.3.3"
|
||||
|
@ -26,6 +26,7 @@ tonic = "0.12"
|
||||
rustls = "0.23"
|
||||
tokio-rustls = "0.26"
|
||||
tower = { version = "0.5", features = ["full"] }
|
||||
tower-http = { version = "0.6", features = ["full"] }
|
||||
hyper = "1.4.1"
|
||||
hyper-util = "0.1.7"
|
||||
hyper-rustls = { version = "0.27", features = ["http2"] }
|
||||
|
104
src/datastore.rs
104
src/datastore.rs
@ -1,13 +1,20 @@
|
||||
use crate::persistence::Logfile;
|
||||
use crate::persistence::{Logfile, SealError, SealedFile};
|
||||
use dashmap::{DashMap, DashSet};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, TimestampSeconds};
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
type IP = String;
|
||||
pub const LOCALHOST: &str = "localhost";
|
||||
const LOG_PATH: &str = "/host/main/logs";
|
||||
const LOCAL_INFO_FILE: &str = "/host/main/node_info";
|
||||
|
||||
#[derive(Clone, PartialEq, Debug)]
|
||||
#[serde_as]
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
|
||||
pub struct NodeInfo {
|
||||
#[serde_as(as = "TimestampSeconds")]
|
||||
pub started_at: SystemTime,
|
||||
#[serde_as(as = "TimestampSeconds")]
|
||||
pub keepalive: SystemTime,
|
||||
pub mint_requests: u64,
|
||||
pub mints: u64,
|
||||
@ -19,27 +26,59 @@ pub struct NodeInfo {
|
||||
}
|
||||
|
||||
impl NodeInfo {
|
||||
pub fn newer_than(&self, other_node: &Self) -> bool {
|
||||
if let Ok(duration) = self.keepalive.duration_since(other_node.keepalive) {
|
||||
if duration > Duration::from_secs(30) {
|
||||
return true;
|
||||
pub fn is_newer_than(&self, older_self: &Self) -> bool {
|
||||
self > older_self
|
||||
}
|
||||
}
|
||||
if self.mint_requests > other_node.mint_requests
|
||||
|| self.net_attacks > other_node.net_attacks
|
||||
|| self.disk_attacks > other_node.disk_attacks
|
||||
|| self.mratls_conns > other_node.mratls_conns
|
||||
|| self.mints > other_node.mints
|
||||
|| (self.public && !other_node.public)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
false
|
||||
|
||||
pub fn to_json(&self) -> String {
|
||||
serde_json::to_string(self).unwrap() // can fail only if time goes backwards :D
|
||||
}
|
||||
|
||||
pub fn log(&self, ip: &IP) {
|
||||
if let Err(e) = Logfile::append(&format!("{}: {:?}", ip, self)) {
|
||||
println!("Could not log: {:?}", e);
|
||||
let json = format!("{{\"ip\":\"{}\",", ip) + &self.to_json()[1..];
|
||||
if let Err(e) = Logfile::append(LOG_PATH, &format!("{}: {}", ip, &json)) {
|
||||
println!("Could not log node info: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load() -> Self {
|
||||
match Self::read(LOCAL_INFO_FILE) {
|
||||
Ok(mut info) => {
|
||||
info.mratls_conns = 0;
|
||||
info.restarts += 1;
|
||||
info
|
||||
}
|
||||
Err(SealError::Attack(e)) => {
|
||||
println!("The local node file is corrupted: {}", e);
|
||||
NodeInfo {
|
||||
started_at: SystemTime::now(),
|
||||
keepalive: SystemTime::now(),
|
||||
mint_requests: 0,
|
||||
mints: 0,
|
||||
mratls_conns: 0,
|
||||
net_attacks: 0,
|
||||
public: false,
|
||||
restarts: 0,
|
||||
disk_attacks: 1, // add very first disk attack
|
||||
}
|
||||
}
|
||||
Err(_) => NodeInfo {
|
||||
started_at: SystemTime::now(),
|
||||
keepalive: SystemTime::now(),
|
||||
mint_requests: 0,
|
||||
mints: 0,
|
||||
mratls_conns: 0,
|
||||
net_attacks: 0,
|
||||
public: false,
|
||||
restarts: 0,
|
||||
disk_attacks: 0,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn save(&self) {
|
||||
if let Err(e) = self.write(LOCAL_INFO_FILE) {
|
||||
println!("Could not save node info: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -52,22 +91,10 @@ pub struct State {
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn new() -> Self {
|
||||
pub fn new_with_localhost() -> Self {
|
||||
let localhost_info = NodeInfo::load();
|
||||
let state = Self { nodes: DashMap::new(), conns: DashSet::new() };
|
||||
state.nodes.insert(
|
||||
LOCALHOST.to_string(),
|
||||
NodeInfo {
|
||||
started_at: SystemTime::now(),
|
||||
keepalive: SystemTime::now(),
|
||||
mint_requests: 0,
|
||||
mints: 0,
|
||||
mratls_conns: 0,
|
||||
net_attacks: 0,
|
||||
public: false,
|
||||
restarts: 0,
|
||||
disk_attacks: 0,
|
||||
},
|
||||
);
|
||||
state.nodes.insert(LOCALHOST.to_string(), localhost_info);
|
||||
state
|
||||
}
|
||||
|
||||
@ -83,6 +110,7 @@ impl State {
|
||||
if let Some(mut localhost_info) = self.nodes.get_mut(LOCALHOST) {
|
||||
localhost_info.mint_requests += 1;
|
||||
localhost_info.log(localhost_info.key());
|
||||
localhost_info.save();
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,6 +118,7 @@ impl State {
|
||||
if let Some(mut localhost_info) = self.nodes.get_mut(LOCALHOST) {
|
||||
localhost_info.mints += 1;
|
||||
localhost_info.log(localhost_info.key());
|
||||
localhost_info.save();
|
||||
}
|
||||
}
|
||||
|
||||
@ -97,6 +126,7 @@ impl State {
|
||||
if let Some(mut localhost_info) = self.nodes.get_mut(LOCALHOST) {
|
||||
localhost_info.mratls_conns += 1;
|
||||
localhost_info.log(localhost_info.key());
|
||||
localhost_info.save();
|
||||
}
|
||||
}
|
||||
|
||||
@ -104,6 +134,7 @@ impl State {
|
||||
if let Some(mut localhost_info) = self.nodes.get_mut(LOCALHOST) {
|
||||
localhost_info.disk_attacks += 1;
|
||||
localhost_info.log(localhost_info.key());
|
||||
localhost_info.save();
|
||||
}
|
||||
}
|
||||
|
||||
@ -111,6 +142,7 @@ impl State {
|
||||
if let Some(mut localhost_info) = self.nodes.get_mut(LOCALHOST) {
|
||||
localhost_info.net_attacks += 1;
|
||||
localhost_info.log(localhost_info.key());
|
||||
localhost_info.save();
|
||||
}
|
||||
}
|
||||
|
||||
@ -123,12 +155,12 @@ impl State {
|
||||
/// This returns true if NodeInfo got modified.
|
||||
pub fn process_node_update(&self, (ip, mut node_info): (String, NodeInfo)) -> bool {
|
||||
if let Some(old_node) = self.nodes.get(&ip) {
|
||||
if !node_info.newer_than(&old_node) {
|
||||
if !node_info.is_newer_than(&old_node) {
|
||||
return false;
|
||||
}
|
||||
node_info.public = node_info.public || old_node.public;
|
||||
}
|
||||
println!("Inserting: {}, {:?}", ip, node_info);
|
||||
println!("Inserting: {}, {}", ip, node_info.to_json());
|
||||
node_info.log(&ip);
|
||||
self.nodes.insert(ip, node_info);
|
||||
true
|
||||
|
@ -89,8 +89,7 @@ impl ConnManager {
|
||||
let rx_stream = BroadcastStream::new(rx).filter_map(|n| n.ok());
|
||||
let response = client.get_updates(rx_stream).await.map_err(|e| {
|
||||
println!("Error connecting to {node_ip}: {e}");
|
||||
// TODO: fails few times during launch
|
||||
//self.state.increase_net_attacks();
|
||||
self.state.increase_net_attacks();
|
||||
e
|
||||
})?;
|
||||
let mut resp_stream = response.into_inner();
|
||||
|
@ -112,6 +112,7 @@ impl MyServer {
|
||||
#[derive(Debug)]
|
||||
struct ConnInfo {
|
||||
addr: std::net::SocketAddr,
|
||||
#[allow(dead_code)]
|
||||
certificates: Vec<CertificateDer<'static>>,
|
||||
}
|
||||
|
||||
|
25
src/main.rs
25
src/main.rs
@ -4,8 +4,10 @@ mod http_server;
|
||||
mod persistence;
|
||||
mod solana;
|
||||
|
||||
use crate::persistence::SealError;
|
||||
use crate::{
|
||||
datastore::LOCALHOST, grpc::challenge::NodeUpdate, persistence::KeysFile, solana::SolClient,
|
||||
datastore::LOCALHOST, grpc::challenge::NodeUpdate, persistence::KeysFile,
|
||||
persistence::SealedFile, solana::SolClient,
|
||||
};
|
||||
use datastore::State;
|
||||
use detee_sgx::{InstanceMeasurement, RaTlsConfig};
|
||||
@ -33,9 +35,14 @@ pub async fn localhost_cron(state: Arc<State>, tx: Sender<NodeUpdate>) {
|
||||
}
|
||||
|
||||
async fn get_sol_client(state: Arc<State>, ratls_config: RaTlsConfig) -> SolClient {
|
||||
match KeysFile::read(KEYS_FILE, &state).await {
|
||||
match KeysFile::read(KEYS_FILE) {
|
||||
Ok(keys_file) => {
|
||||
let sol_client = SolClient::try_from(keys_file).unwrap();
|
||||
let sol_client = SolClient::try_from(keys_file)
|
||||
.map_err(|e| {
|
||||
println!("Read malformed keys from the disk: {e}");
|
||||
state.increase_net_attacks();
|
||||
})
|
||||
.unwrap();
|
||||
println!(
|
||||
"Found the following wallet saved to disk: {}",
|
||||
sol_client.get_wallet_pubkey()
|
||||
@ -43,7 +50,13 @@ async fn get_sol_client(state: Arc<State>, ratls_config: RaTlsConfig) -> SolClie
|
||||
println!("Loading token mint address {}", sol_client.get_token_address());
|
||||
return sol_client;
|
||||
}
|
||||
Err(e) => println!("Can't initialize using sealed keys: {e}"),
|
||||
Err(SealError::Attack(e)) => {
|
||||
println!("The local keys file is corrupted: {}", e);
|
||||
state.increase_disk_attacks();
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Could not read keys file: {e}");
|
||||
}
|
||||
};
|
||||
|
||||
let init_nodes = match File::open(INIT_NODES_FILE) {
|
||||
@ -71,7 +84,7 @@ async fn get_sol_client(state: Arc<State>, ratls_config: RaTlsConfig) -> SolClie
|
||||
);
|
||||
println!("The address of the Token is {}", sol_client.get_token_address());
|
||||
println!("Saving this data to disk in the file {KEYS_FILE}");
|
||||
if let Err(e) = sol_client.get_keys_file().write(KEYS_FILE).await {
|
||||
if let Err(e) = sol_client.get_keys_file().write(KEYS_FILE) {
|
||||
println!("Could not save data to disk: {e}");
|
||||
}
|
||||
return sol_client;
|
||||
@ -90,7 +103,7 @@ async fn main() {
|
||||
let ratls_config = RaTlsConfig::new()
|
||||
.allow_instance_measurement(InstanceMeasurement::new().with_current_mrenclave().unwrap());
|
||||
|
||||
let state = Arc::new(State::new());
|
||||
let state = Arc::new(State::new_with_localhost());
|
||||
let sol_client = Arc::new(get_sol_client(state.clone(), ratls_config.clone()).await);
|
||||
|
||||
let (tx, _) = broadcast::channel(500);
|
||||
|
@ -1,8 +1,25 @@
|
||||
use crate::{datastore::State, grpc::challenge::Keys};
|
||||
use crate::datastore::NodeInfo;
|
||||
use crate::grpc::challenge::Keys;
|
||||
use detee_sgx::SgxError;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::base64::Base64;
|
||||
use serde_with::{base64::Base64, serde_as};
|
||||
use std::io::Write;
|
||||
|
||||
#[serde_with::serde_as]
|
||||
pub struct Logfile {}
|
||||
|
||||
impl Logfile {
|
||||
pub fn append(path: &str, msg: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let mut file = std::fs::OpenOptions::new().create(true).append(true).open(path)?;
|
||||
file.write_all(msg.as_bytes())?;
|
||||
file.write_all(b"\n")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SealedFile for KeysFile {}
|
||||
impl SealedFile for NodeInfo {}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct KeysFile {
|
||||
random: String,
|
||||
@ -27,39 +44,49 @@ impl Into<Keys> for KeysFile {
|
||||
}
|
||||
}
|
||||
|
||||
impl KeysFile {
|
||||
pub async fn write(self, path: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
pub trait SealedFile: Serialize + for<'de> Deserialize<'de> {
|
||||
fn write(&self, path: &str) -> Result<(), SealError> {
|
||||
let serialized = serde_json::to_string(&self)?;
|
||||
let sealed = detee_sgx::SealingConfig::new()?.seal_data(serialized.into_bytes())?;
|
||||
tokio::fs::write(path, sealed).await.map_err(Into::into)
|
||||
std::fs::write(path, sealed).map_err(Into::into)
|
||||
}
|
||||
|
||||
pub async fn read(path: &str, state: &State) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let sealed = tokio::fs::read(path).await?;
|
||||
let serialized = detee_sgx::SealingConfig::new()?.un_seal_data(sealed).map_err(|e| {
|
||||
match e {
|
||||
detee_sgx::SgxError::UnSealingError(ref ue) => {
|
||||
state.increase_disk_attacks();
|
||||
println!("The disk data is corrupted: {ue}");
|
||||
}
|
||||
_ => println!("Failed to unseal data: {e}"),
|
||||
};
|
||||
e
|
||||
})?;
|
||||
Ok(serde_json::from_str(&String::from_utf8(serialized)?)?)
|
||||
fn read(path: &str) -> Result<Self, SealError> {
|
||||
let sealed = std::fs::read(path)?;
|
||||
let serialized = detee_sgx::SealingConfig::new()?.un_seal_data(sealed)?;
|
||||
serde_json::from_slice(&serialized).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
const LOG_PATH: &str = "/host/main/logs";
|
||||
#[derive(Debug)]
|
||||
pub enum SealError {
|
||||
Error(String),
|
||||
Attack(String),
|
||||
}
|
||||
|
||||
pub struct Logfile {}
|
||||
|
||||
impl Logfile {
|
||||
pub fn append(msg: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
use std::io::Write;
|
||||
let mut file = std::fs::OpenOptions::new().create(true).append(true).open(LOG_PATH)?;
|
||||
file.write_all(msg.as_bytes())?;
|
||||
file.write_all(b"\n")?;
|
||||
Ok(())
|
||||
impl std::fmt::Display for SealError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
SealError::Error(e) => write!(f, "Error: {}", e),
|
||||
SealError::Attack(e) => write!(f, "SealError: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SgxError> for SealError {
|
||||
fn from(e: SgxError) -> Self {
|
||||
SealError::Attack(e.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for SealError {
|
||||
fn from(e: serde_json::Error) -> Self {
|
||||
SealError::Error(e.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for SealError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
SealError::Error(e.to_string())
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user