From fc933c0647944233d185f464bfe94128fb2aa21f Mon Sep 17 00:00:00 2001 From: Valentyn Faychuk Date: Tue, 24 Dec 2024 00:28:04 +0200 Subject: [PATCH] connections refactoring Signed-off-by: Valentyn Faychuk --- scripts/testnet.sh | 60 ++++++-------- src/datastore.rs | 69 ++++++++-------- src/grpc/client.rs | 98 +++++++++++------------ src/grpc/mod.rs | 72 +++++++++-------- src/grpc/server.rs | 35 +++++--- src/http_server.rs | 9 +-- src/main.rs | 193 +++++++++++++++++++-------------------------- src/persistence.rs | 14 ++-- src/solana.rs | 2 +- 9 files changed, 263 insertions(+), 289 deletions(-) diff --git a/scripts/testnet.sh b/scripts/testnet.sh index 9768bca..81fdf40 100755 --- a/scripts/testnet.sh +++ b/scripts/testnet.sh @@ -1,71 +1,58 @@ #!/bin/bash set -e +num_nodes=${1:-4} script_dir=$(dirname "$0") -cd "${script_dir}/.." # Go to the root of the project +cd "${script_dir}/.." + +function build_mint_sol() { + command -v cargo >/dev/null 2>&1 || { echo >&2 "cargo not found, run 'curl https://sh.rustup.rs -sSf | sh'"; exit 1; } + command -v gcc >/dev/null 2>&1 || { echo >&2 "cc not found, run 'apt update && apt install build-essential'"; exit 1; } + command -v protoc >/dev/null 2>&1 || { echo >&2 "protoc not found, run 'apt update && apt install protobuf-compiler'"; exit 1; } -function build_mint_sol_tool() { echo "Building the mint_sol tool for testing" - - if ! command -v cargo 2>&1 /dev/null - then - echo "cargo not found, run 'curl https://sh.rustup.rs -sSf | sh'" - exit 1 - fi - - if ! command -v gcc 2>&1 /dev/null - then - echo "cc not found, run 'apt update && apt install build-essential'" - exit 1 - fi - - if ! command -v protoc 2>&1 /dev/null - then - echo "protoc not found, run 'apt update && apt install protobuf-compiler'" - exit 1 - fi - - cd mint_sol - cargo build --release - cp target/release/mint_sol "../${script_dir}/mint_sol" - cd .. + cd mint_sol && cargo build --release && cd .. + cp mint_sol/target/release/mint_sol "${script_dir}/mint_sol" } +eval "sudo rm -rf /tmp/dthc{0..$num_nodes}" TEST=1 ./${script_dir}/build-container.sh -[ -e "${script_dir}/mint_sol" ] || build_mint_sol_tool +[ -e "${script_dir}/mint_sol" ] || build_mint_sol # Cleanup old containers and the network echo "Creating the network and the root node" -docker ps -a | grep 'dthc' | awk '{ print $NF }' | xargs docker rm -f || true -docker network inspect dthc > /dev/null 2>&1 \ +eval "docker rm -f dthc{0..$num_nodes}" 2>&1 /dev/null +docker network inspect dthc >/dev/null 2>&1 \ || docker network create --subnet=172.18.0.0/24 dthc \ || true echo "Waiting for the root node to start" # 172.18.0.1 is for the network gateway -docker run --name dthc-root \ +docker run --name dthc0 \ --network dthc -d \ --ip 172.18.0.2 \ --env NODE_IP="172.18.0.2" \ + --volume /tmp/dthc0:/challenge/main \ + --publish 31300:31372 \ --device /dev/sgx/provision \ --device /dev/sgx/enclave \ detee/hacker-challenge:test while true; do echo -n "." && sleep 1 - docker logs dthc-root | grep -q "SOL" && echo && break + docker logs dthc0 | grep -q "SOL" && echo && break done echo "Sending SOL to the root and waiting for the mint" -address=$(docker logs dthc-root | grep 'SOL' | awk '{ print $NF }') +address=$(docker logs dthc0 | grep 'SOL' | awk '{ print $NF }') "${script_dir}"/mint_sol "${address}" while true; do echo -n "." && sleep 1 - docker logs dthc-root | grep -q "Mint created" && echo && break + docker logs dthc0 | grep -q "Mint created" && echo && break done -echo "Creating the cluster" -for n in {1..20}; do - #init_nodes=$(docker inspect dthc-root --format '{{ .NetworkSettings.Networks.dthc.IPAddress }}') +echo "Creating a cluster with ${num_nodes} nodes" +for n in $(seq 1 $num_nodes); do + #init_nodes=$(docker inspect dthc0 --format '{{ .NetworkSettings.Networks.dthc.IPAddress }}') node_ip="172.18.0.$((2 + n))" node_port=$((31300 + n)) node_volume="/tmp/dthc${n}" @@ -81,7 +68,6 @@ for n in {1..20}; do --device /dev/sgx/enclave \ detee/hacker-challenge:test done -sleep 15 # Wait for the cluster to start echo "Running the test mint" for n in {1..20}; do @@ -94,4 +80,4 @@ done # curl 127.0.0.1:31303/metrics # curl -X POST 127.0.0.1:31303/mint -d '{"wallet": "EZT16iP1SQVUFf1AJN6oiE5BZPnyBUqaKDkZ4oZRsvhR"}' -H 'Content-Type: application/json' -# docker run --network dthc -d --name dthc-3 --ip 172.18.0.3 --env NODE_IP='172.18.0.3' --env INIT_NODES='172.18.0.2' -v /tmp/dthc3:/challenge/main -p 31303:31372 --device /dev/sgx/provision --device /dev/sgx/enclave detee/hacker-challenge:test +# docker run --name dthc0 --network dthc -d --ip 172.18.0.2 --env NODE_IP="172.18.0.2" --env INIT_NODES="172.18.0.5 172.18.0.3 172.18.0.4" --volume /tmp/dthc0:/challenge/main --publish 31300:31372 --device /dev/sgx/provision --device /dev/sgx/enclave detee/hacker-challenge:test diff --git a/src/datastore.rs b/src/datastore.rs index 2de87fb..7cb0ba9 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -183,30 +183,7 @@ impl State { } } - pub fn get_my_info(&self) -> NodeInfo { - let mut my_info = self.nodes.get_mut(&self.my_ip).expect("no info for this node"); - my_info.keepalive = SystemTime::now(); - my_info.clone() - } - - /// This returns true if the update should be further forwarded - /// For example, we never forward our own updates that came back - pub fn process_node_update(&self, (ip, node_info): (String, NodeInfo)) -> bool { - let is_update_mine = ip.eq(&self.my_ip); - let is_update_new = self - .nodes - .get(&ip) - .map(|curr_info| node_info.is_newer_than(&curr_info)) - .unwrap_or(true); - if is_update_new { - println!("Inserting: {}, {}", ip, node_info.to_json()); - node_info.log(&ip); - self.nodes.insert(ip, node_info); - } - is_update_new && !is_update_mine - } - - pub fn get_node_list(&self) -> Vec<(String, NodeInfo)> { + pub fn get_nodes(&self) -> Vec<(String, NodeInfo)> { self.nodes.clone().into_iter().collect() } @@ -214,28 +191,58 @@ impl State { self.my_ip.clone() } + pub fn get_my_info(&self) -> NodeInfo { + let mut my_info = self.nodes.get_mut(&self.my_ip).expect("no info for this node"); + my_info.keepalive = SystemTime::now(); + my_info.clone() + } + + pub fn get_connected_ips(&self) -> Vec { + self.conns.iter().map(|n| n.key().clone()).collect() + } + // returns a random node that does not have an active connection - pub fn get_random_disconnected_node(&self) -> Option { + pub fn get_random_disconnected_ip(&self) -> Option { use rand::{rngs::OsRng, RngCore}; let len = self.nodes.len(); if len == 0 { return None; } + let conn_ips = self.get_connected_ips(); let skip = OsRng.next_u64().try_into().unwrap_or(0) % len; self.nodes .iter() .map(|n| n.key().clone()) .cycle() .skip(skip) - .find(|ip| ip != &self.my_ip && !self.conns.contains(ip)) + .find(|ip| ip != &self.my_ip && !conn_ips.contains(ip)) } - pub fn remove_inactive_nodes(&self) { - self.nodes.retain(|_, v| { + /// This returns true if the update should be further forwarded + /// For example, we never forward our own updates that came back + pub fn process_node_update(&self, (node_ip, node_info): (String, NodeInfo)) -> bool { + let is_update_mine = node_ip.eq(&self.my_ip); + let is_update_new = self + .nodes + .get(&node_ip) + .map(|curr_info| node_info.is_newer_than(&curr_info)) + .unwrap_or(true); + if is_update_new { + println!("Inserting: {}, {}", node_ip, node_info.to_json()); + node_info.log(&node_ip); + self.nodes.insert(node_ip, node_info); + } + is_update_new && !is_update_mine + } + + pub fn remove_inactive_nodes(&self, max_age: u64) { + self.nodes.retain(|_, node_info| { // HACK: Check if it is possible to abuse network by corrupting system time - let age = - SystemTime::now().duration_since(v.keepalive).unwrap_or(Duration::ZERO).as_secs(); - age <= 600 + let age = SystemTime::now() + .duration_since(node_info.keepalive) + .unwrap_or(Duration::ZERO) + .as_secs(); + age <= max_age }); } } diff --git a/src/grpc/client.rs b/src/grpc/client.rs index 2a79cf0..96769d4 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -1,59 +1,60 @@ -use super::challenge::Keys; -use super::InternalNodeUpdate; +use super::{challenge::Keys, InternalNodeUpdate}; use crate::{ datastore::State, grpc::challenge::{update_client::UpdateClient, Empty}, }; use detee_sgx::RaTlsConfig; -use std::{str::FromStr, sync::Arc}; -use tokio::{ - sync::broadcast::Sender, - time::{sleep, Duration}, -}; +use std::{net::Ipv4Addr, str::FromStr, sync::Arc}; +use tokio::{sync::broadcast::Sender, time::Duration}; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; +pub async fn grpc_new_conn( + node_ip: String, + state: Arc, + ra_cfg: RaTlsConfig, + tx: Sender, +) { + if Ipv4Addr::from_str(&node_ip).is_err() { + println!("IPv4 address is invalid: {node_ip}"); + return; + } + ConnManager::init(state, ra_cfg, tx).connect_to(node_ip).await +} + +pub async fn grpc_query_keys( + node_ip: String, + state: Arc, + ra_cfg: RaTlsConfig, +) -> Result> { + if Ipv4Addr::from_str(&node_ip).is_err() { + let err = format!("IPv4 address is invalid: {node_ip}"); + return Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, err))); + } + query_keys(node_ip, state, ra_cfg).await +} + #[derive(Clone)] -pub struct ConnManager { - my_ip: String, +struct ConnManager { state: Arc, tx: Sender, - ratls_config: RaTlsConfig, + ra_cfg: RaTlsConfig, } impl ConnManager { - pub fn init( - my_ip: String, - state: Arc, - ratls_config: RaTlsConfig, - tx: Sender, - ) -> Self { - Self { my_ip, state, ratls_config, tx } + fn init(state: Arc, ra_cfg: RaTlsConfig, tx: Sender) -> Self { + Self { state, ra_cfg, tx } } - pub async fn start_with_node(self, node_ip: String) { - self.connect_wrapper(node_ip).await; - } - - pub async fn start(self) { - loop { - if let Some(ip) = self.state.get_random_disconnected_node() { - println!("Found random disconnected node {ip}"); - self.connect_wrapper(ip.clone()).await; - } - sleep(Duration::from_secs(3)).await; - } - } - - async fn connect_wrapper(&self, node_ip: String) { + async fn connect_to(&self, node_ip: String) { let state = self.state.clone(); state.add_conn(&node_ip); - if let Err(e) = self.connect(node_ip.clone()).await { + if let Err(e) = self.connect_to_int(node_ip.clone()).await { println!("Client connection for {node_ip} failed: {e:?}"); } state.delete_conn(&node_ip); } - async fn connect(&self, remote_ip: String) -> Result<(), Box> { + async fn connect_to_int(&self, remote_ip: String) -> Result<(), Box> { use detee_sgx::RaTlsConfigBuilder; use hyper::Uri; use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor}; @@ -61,7 +62,7 @@ impl ConnManager { println!("Connecting to {remote_ip}..."); - let tls = ClientConfig::from_ratls_config(self.ratls_config.clone()) + let tls = ClientConfig::from_ratls_config(self.ra_cfg.clone()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?; let mut http = HttpConnector::new(); @@ -79,6 +80,7 @@ impl ConnManager { .enable_http2() .wrap_connector(s) }) + .timeout(Duration::from_secs(5)) .map_request(move |_| { Uri::from_str(&format!("https://{cloned_node_ip}:31373")) .expect("Could not parse URI") @@ -112,21 +114,12 @@ impl ConnManager { let mut resp_stream = response.into_inner(); // Immediately send our info as a network update - let my_info = (self.my_ip.clone(), self.state.get_my_info()).into(); - let _ = self.tx.send(InternalNodeUpdate { sender_ip: self.my_ip.clone(), update: my_info }); - + let _ = self.tx.send((self.state.get_my_ip(), self.state.get_my_info()).into()); while let Some(update) = resp_stream.message().await? { - // update the entire network in case the information is new + // Update the entire network in case the information is new if self.state.process_node_update(update.clone().into()) { - // if process update returns true, the update must be forwarded - if self - .tx - .send(InternalNodeUpdate { - sender_ip: remote_ip.clone(), - update: update.clone(), - }) - .is_err() - { + // If process update returns true, the update must be forwarded + if self.tx.send((remote_ip.clone(), update).into()).is_err() { println!("Tokio broadcast receivers had an issue consuming the channel"); }; } @@ -136,10 +129,10 @@ impl ConnManager { } } -pub async fn key_grabber( - state: Arc, +async fn query_keys( node_ip: String, - ratls_config: RaTlsConfig, + state: Arc, + ra_cfg: RaTlsConfig, ) -> Result> { use detee_sgx::RaTlsConfigBuilder; use hyper::Uri; @@ -148,7 +141,7 @@ pub async fn key_grabber( println!("Getting key from {node_ip}..."); - let tls = ClientConfig::from_ratls_config(ratls_config) + let tls = ClientConfig::from_ratls_config(ra_cfg) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?; let mut http = HttpConnector::new(); @@ -166,6 +159,7 @@ pub async fn key_grabber( .enable_http2() .wrap_connector(s) }) + .timeout(Duration::from_secs(5)) .map_request(move |_| { Uri::from_str(&format!("https://{cloned_node_ip}:31373")).expect("Could not parse URI") }) diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs index 6f7eae1..6b083b2 100644 --- a/src/grpc/mod.rs +++ b/src/grpc/mod.rs @@ -7,16 +7,10 @@ pub mod challenge { tonic::include_proto!("challenge"); } -#[derive(Clone, PartialEq)] -pub struct InternalNodeUpdate { - pub sender_ip: String, - pub update: NodeUpdate, -} - impl From<(String, NodeInfo)> for NodeUpdate { fn from((ip, info): (String, NodeInfo)) -> Self { - NodeUpdate { - ip: ip.to_string(), + Self { + ip, started_at: Some(prost_types::Timestamp::from(info.started_at)), keepalive: Some(prost_types::Timestamp::from(info.keepalive)), mint_requests: info.mint_requests, @@ -31,33 +25,49 @@ impl From<(String, NodeInfo)> for NodeUpdate { } impl From for (String, NodeInfo) { - fn from(val: NodeUpdate) -> Self { - let ip = val.ip; - let started_at: SystemTime = match val.started_at { - Some(ts) => { - let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); - UNIX_EPOCH.checked_add(duration).unwrap_or(SystemTime::now()) - } - None => SystemTime::now(), - }; - let keepalive: SystemTime = match val.keepalive { - Some(ts) => { - let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); - UNIX_EPOCH.checked_add(duration).unwrap_or(SystemTime::now()) - } - None => SystemTime::now(), - }; + fn from(upd: NodeUpdate) -> (String, NodeInfo) { + let ip = upd.ip; + let started_at: SystemTime = grpc_timestamp_to_systemtime(upd.started_at); + let keepalive: SystemTime = grpc_timestamp_to_systemtime(upd.keepalive); let self_info = NodeInfo { started_at, keepalive, - mint_requests: val.mint_requests, - mints: val.mints, - mratls_conns: val.mratls_conns, - net_attacks: val.quote_attacks, - public: val.public, - restarts: val.restarts, - disk_attacks: val.disk_attacks, + mint_requests: upd.mint_requests, + mints: upd.mints, + mratls_conns: upd.mratls_conns, + net_attacks: upd.quote_attacks, + public: upd.public, + restarts: upd.restarts, + disk_attacks: upd.disk_attacks, }; (ip, self_info) } } + +fn grpc_timestamp_to_systemtime(ts: Option) -> SystemTime { + if let Some(ts) = ts { + let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); + UNIX_EPOCH.checked_add(duration).unwrap_or(SystemTime::now()) + } else { + println!("Timestamp is None"); + SystemTime::now() + } +} + +#[derive(Clone, PartialEq)] +pub struct InternalNodeUpdate { + pub sender_ip: String, + pub update: NodeUpdate, +} + +impl From<(String, NodeInfo)> for InternalNodeUpdate { + fn from((ip, info): (String, NodeInfo)) -> Self { + Self { sender_ip: ip.clone(), update: (ip, info).into() } + } +} + +impl From<(String, NodeUpdate)> for InternalNodeUpdate { + fn from((ip, update): (String, NodeUpdate)) -> Self { + Self { sender_ip: ip, update } + } +} diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 7c51697..42c5a29 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -1,5 +1,7 @@ -use super::challenge::{update_server::UpdateServer, Empty, Keys, NodeUpdate}; -use super::InternalNodeUpdate; +use super::{ + challenge::{update_server::UpdateServer, Empty, Keys, NodeUpdate}, + InternalNodeUpdate, +}; use crate::{datastore::State, grpc::challenge::update_server::Update}; use detee_sgx::RaTlsConfig; use rustls::pki_types::CertificateDer; @@ -8,21 +10,30 @@ use tokio::sync::broadcast::Sender; use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; -pub struct MyServer { +pub async fn grpc_new_server( + state: Arc, + keys: Keys, + ra_cfg: RaTlsConfig, + tx: Sender, +) { + NodeServer::init(state, keys, ra_cfg, tx).start().await +} + +pub struct NodeServer { state: Arc, tx: Sender, - ratls_config: RaTlsConfig, + ra_cfg: RaTlsConfig, keys: Keys, // For sending secret keys to new nodes ;) } -impl MyServer { +impl NodeServer { pub fn init( state: Arc, keys: Keys, - ratls_config: RaTlsConfig, + ra_cfg: RaTlsConfig, tx: Sender, ) -> Self { - Self { state, tx, keys, ratls_config } + Self { state, tx, keys, ra_cfg } } pub async fn start(self) { @@ -40,7 +51,7 @@ impl MyServer { // TODO: error handling, shouldn't have expects - let mut tls = ServerConfig::from_ratls_config(self.ratls_config.clone()).unwrap(); + let mut tls = ServerConfig::from_ratls_config(self.ra_cfg.clone()).unwrap(); tls.alpn_protocols = vec![b"h2".to_vec()]; let state = self.state.clone(); @@ -119,7 +130,7 @@ struct ConnInfo { } #[tonic::async_trait] -impl Update for MyServer { +impl Update for NodeServer { type GetUpdatesStream = Pin> + Send>>; async fn get_keys(&self, _request: Request) -> Result, Status> { @@ -143,8 +154,8 @@ impl Update for MyServer { state.declare_myself_public(); state.add_conn(&remote_ip); - let full_update_list: Vec = state.get_node_list().into_iter().map(Into::::into).collect(); - for update in full_update_list { + let known_nodes: Vec = state.get_nodes().into_iter().map(Into::into).collect(); + for update in known_nodes { yield Ok(update); } @@ -164,7 +175,7 @@ impl Update for MyServer { if state.process_node_update(update.clone().into()) { // if process update returns true, the update must be forwarded - if tx.send(InternalNodeUpdate { sender_ip: remote_ip.clone(), update: update.clone() }).is_err() { + if tx.send((remote_ip.clone(), update).into()).is_err() { println!("Tokio broadcast receivers had an issue consuming the channel"); }; } diff --git a/src/http_server.rs b/src/http_server.rs index 305f71a..00742e8 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -48,9 +48,8 @@ impl From<(String, datastore::NodeInfo)> for NodesResp { #[get("/nodes")] async fn get_nodes(ds: web::Data>) -> HttpResponse { - HttpResponse::Ok().json( - ds.get_node_list().into_iter().map(Into::::into).collect::>(), - ) + HttpResponse::Ok() + .json(ds.get_nodes().into_iter().map(Into::::into).collect::>()) } #[derive(Deserialize)] @@ -81,13 +80,13 @@ async fn mint( #[get("/metrics")] async fn metrics(ds: web::Data>) -> HttpResponse { let mut metrics = String::new(); - for (ip, node) in ds.get_node_list() { + for (ip, node) in ds.get_nodes() { metrics.push_str(node.to_metrics(&ip).as_str()); } HttpResponse::Ok().content_type("text/plain; version=0.0.4; charset=utf-8").body(metrics) } -pub async fn init(state: Arc, sol_client: Arc) { +pub async fn http_new_server(state: Arc, sol_client: Arc) { HttpServer::new(move || { App::new() .app_data(web::Data::new(state.clone())) diff --git a/src/main.rs b/src/main.rs index 57bf275..5f6a27f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,176 +4,145 @@ mod http_server; mod persistence; mod solana; -use crate::persistence::SealError; use crate::{ - grpc::challenge::NodeUpdate, grpc::InternalNodeUpdate, persistence::KeysFile, - persistence::SealedFile, solana::SolClient, + grpc::{ + challenge::NodeUpdate, + client::{grpc_new_conn, grpc_query_keys}, + server::grpc_new_server, + InternalNodeUpdate, + }, + http_server::http_new_server, + persistence::{KeysFile, SealError, SealedFile}, + solana::SolClient, }; use datastore::State; use detee_sgx::{InstanceMeasurement, RaTlsConfig}; -use std::{ - fs::File, - io::Error, - io::{BufRead, BufReader}, - sync::Arc, -}; +use std::{fs::read_to_string, sync::Arc}; use tokio::{ sync::{broadcast, broadcast::Sender}, task::JoinSet, - time::{sleep, Duration}, + time::{interval, Duration}, }; const INIT_NODES_FILE: &str = "/host/detee_challenge_nodes"; const KEYS_FILE: &str = "/host/main/TRY_TO_HACK_THIS"; -const MAX_CONNECTIONS: usize = 3; - -#[cfg(feature = "test")] -async fn resolve_my_ip() -> Result { - let node_ip = File::open("/host/detee_node_ip")?; - let mut reader = BufReader::new(node_ip); - let mut ip = String::new(); - reader.read_line(&mut ip)?; - if ip.ends_with('\n') { - ip.pop(); - } - Ok(ip) -} +const NUM_CONNECTIONS: usize = 3; +const HEARTBEAT_INTERVAL: u64 = 30; // Seconds #[cfg(not(feature = "test"))] -async fn resolve_my_ip() -> Result { - let err = "Can't resolve my external IP, try again"; - let ip = public_ip::addr_v4().await.ok_or(Error::new(std::io::ErrorKind::Other, err))?; - Ok(format!("{}", ip)) +async fn resolve_my_ipv4() -> String { + public_ip::addr_v4().await.unwrap().to_string() } -pub async fn heartbeat_cron(my_ip: String, state: Arc, tx: Sender) { +#[cfg(feature = "test")] +async fn resolve_my_ipv4() -> String { + use std::str::FromStr; + let node_ip = read_to_string("/host/detee_node_ip").unwrap(); + std::net::Ipv4Addr::from_str(node_ip.trim()).unwrap().to_string() +} + +pub async fn heartbeat( + mut tasks: JoinSet<()>, + ra_cfg: RaTlsConfig, + state: Arc, + tx: Sender, +) -> ! { + let mut interval = interval(Duration::from_secs(HEARTBEAT_INTERVAL)); loop { - sleep(Duration::from_secs(60)).await; + interval.tick().await; println!("Heartbeat..."); - let update = (my_ip.clone(), state.get_my_info()).into(); - let _ = tx.send(InternalNodeUpdate { sender_ip: my_ip.clone(), update }); - state.remove_inactive_nodes(); + state.remove_inactive_nodes(HEARTBEAT_INTERVAL * 3); + let connected_ips = state.get_connected_ips(); + println!("Connected nodes ({}): {:?}", connected_ips.len(), connected_ips); + let _ = tx.send((state.get_my_ip(), state.get_my_info()).into()); + if connected_ips.len() < NUM_CONNECTIONS { + if let Some(node_ip) = state.get_random_disconnected_ip() { + println!("Dialing random node {}", node_ip); + tasks.spawn(grpc_new_conn(node_ip, state.clone(), ra_cfg.clone(), tx.clone())); + } + } + let _ = tasks.try_join_next(); } } -async fn get_sol_client(state: Arc, ratls_config: RaTlsConfig) -> SolClient { +async fn init_token(state: Arc, ra_cfg: RaTlsConfig) -> SolClient { + // First try to get the keys from the disk match KeysFile::read(KEYS_FILE) { Ok(keys_file) => { - let sol_client = SolClient::try_from(keys_file) - .map_err(|e| { - println!("Read malformed keys from the disk: {e}"); - state.increase_net_attacks(); - }) - .unwrap(); - println!( - "Found the following wallet saved to disk: {}", - sol_client.get_wallet_pubkey() - ); - println!("Loading token mint address {}", sol_client.get_token_address()); + // If unsealed keys are corrupted then it is very bad, we should panic + let sol_client = SolClient::try_from(keys_file).unwrap(); + println!("Found sealed wallet {}", sol_client.get_wallet_pubkey()); + println!("The address of the Token is {}", sol_client.get_token_address()); return sol_client; } Err(SealError::Attack(e)) => { - println!("The local keys file is corrupted: {}", e); + println!("The sealed keys are corrupted: {}", e); state.increase_disk_attacks(); } Err(e) => { - println!("Could not read keys file: {e}"); + println!("Could not read sealed keys: {e}"); } }; - let init_nodes = match File::open(INIT_NODES_FILE) { - Ok(init_nodes) => init_nodes, + let node_ips = match read_to_string(INIT_NODES_FILE) { + Ok(node_ips) => node_ips, Err(_) => { - println!("Can't initialize using init nodes from {INIT_NODES_FILE}"); - println!("Starting a new network with a new key..."); - return SolClient::new().await; + println!("I am root, creating a new Token..."); + let sol_client = SolClient::create_new_token().await; + println!("The address of the Token is {}", sol_client.get_token_address()); + println!("Sealing keys in the {KEYS_FILE}"); + if let Err(e) = sol_client.get_keys_file().write(KEYS_FILE) { + println!("Could not seal keys: {e}"); + } + return sol_client; } }; - let init_nodes_reader = BufReader::new(init_nodes); - for init_node_ip in init_nodes_reader.lines().map(|l| l.unwrap()) { - match grpc::client::key_grabber(state.clone(), init_node_ip, ratls_config.clone()).await { + for node_ip in node_ips.lines().map(String::from) { + match grpc_query_keys(node_ip.clone(), state.clone(), ra_cfg.clone()).await { Ok(keys) => { - let sol_client = SolClient::try_from(keys.clone()) - .map_err(|e| { - println!("Received malformed keys from the network: {e}"); - state.increase_net_attacks(); - }) - .unwrap(); - println!( - "Got keypair from the network. Joining the network using wallet {}", - sol_client.get_wallet_pubkey() - ); + // If grpc keys are corrupted then it is very bad, we should panic + let sol_client = SolClient::try_from(keys.clone()).unwrap(); + println!("Connected network wallet {}", sol_client.get_wallet_pubkey()); println!("The address of the Token is {}", sol_client.get_token_address()); - println!("Saving this data to disk in the file {KEYS_FILE}"); + println!("Sealing keys in the {KEYS_FILE}"); if let Err(e) = sol_client.get_keys_file().write(KEYS_FILE) { - println!("Could not save data to disk: {e}"); + println!("Could not seal keys: {e}"); } return sol_client; } Err(e) => { - println!("Could not get keypair: {e:?}"); + println!("Could not query keys from {node_ip}: {e:?}"); } }; } - panic!("could not get keypair."); + panic!("Can't initialize the Token"); } #[tokio::main] async fn main() { env_logger::init_from_env(env_logger::Env::default().default_filter_or("warn")); - let ratls_config = RaTlsConfig::new() - .allow_instance_measurement(InstanceMeasurement::new().with_current_mrenclave().unwrap()); - let my_ip = resolve_my_ip().await.unwrap(); + let mrenclave = InstanceMeasurement::new().with_current_mrenclave().unwrap(); + let ra_cfg = RaTlsConfig::new().allow_instance_measurement(mrenclave); + let my_ip = resolve_my_ipv4().await; // Guaranteed to be correct IPv4 println!("Starting on IP {}", my_ip); let state = Arc::new(State::new(my_ip.clone())); - let sol_client = Arc::new(get_sol_client(state.clone(), ratls_config.clone()).await); - - let (tx, _) = broadcast::channel(500); + let sol_client = Arc::new(init_token(state.clone(), ra_cfg.clone()).await); let mut tasks = JoinSet::new(); + let (tx, _) = broadcast::channel(500); - tasks.spawn(heartbeat_cron(my_ip.clone(), state.clone(), tx.clone())); - tasks.spawn(http_server::init(state.clone(), sol_client.clone())); - tasks.spawn( - grpc::server::MyServer::init( - state.clone(), - sol_client.get_keys(), - ratls_config.clone(), - tx.clone(), - ) - .start(), - ); + // HTTP and GRPC servers + tasks.spawn(grpc_new_server(state.clone(), sol_client.get_keys(), ra_cfg.clone(), tx.clone())); + tasks.spawn(http_new_server(state.clone(), sol_client)); - if let Ok(input) = std::fs::read_to_string(INIT_NODES_FILE) { - for line in input.lines() { - tasks.spawn( - grpc::client::ConnManager::init( - my_ip.clone(), - state.clone(), - ratls_config.clone(), - tx.clone(), - ) - .start_with_node(line.to_string()), - ); + if let Ok(node_ips) = read_to_string(INIT_NODES_FILE) { + for node_ip in node_ips.lines().take(NUM_CONNECTIONS).map(String::from) { + tasks.spawn(grpc_new_conn(node_ip, state.clone(), ra_cfg.clone(), tx.clone())); } } - for _ in 0..MAX_CONNECTIONS { - tasks.spawn( - grpc::client::ConnManager::init( - my_ip.clone(), - state.clone(), - ratls_config.clone(), - tx.clone(), - ) - .start(), - ); - } - - while let Some(Ok(_)) = tasks.join_next().await {} - - // task panicked - println!("Shutting down..."); + // Heartbeat + heartbeat(tasks, ra_cfg, state, tx).await; } diff --git a/src/persistence.rs b/src/persistence.rs index 59f26dd..5ee7fc9 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -1,11 +1,9 @@ -use crate::datastore::NodeInfo; -use crate::grpc::challenge::Keys; +use crate::{datastore::NodeInfo, grpc::challenge::Keys}; use detee_sgx::SgxError; +use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_with::{base64::Base64, serde_as}; -use std::io::Write; -use once_cell::sync::Lazy; -use std::sync::Mutex; +use std::{io::Write, sync::Mutex}; pub struct Logfile {} static LOG_MUTEX: Lazy> = Lazy::new(|| Mutex::new(())); @@ -41,9 +39,9 @@ impl From for KeysFile { } } -impl Into for KeysFile { - fn into(self) -> Keys { - Keys { keypair: self.keypair, token_address: self.token } +impl From for Keys { + fn from(kf: KeysFile) -> Keys { + Keys { keypair: kf.keypair, token_address: kf.token } } } diff --git a/src/solana.rs b/src/solana.rs index b4d8b98..39a0610 100644 --- a/src/solana.rs +++ b/src/solana.rs @@ -27,7 +27,7 @@ pub struct SolClient { } impl SolClient { - pub async fn new() -> Self { + pub async fn create_new_token() -> Self { let client = RpcClient::new(RPC_URL); let keypair = Keypair::new(); let token = create_token(&client, &keypair).await;