connections refactoring
Signed-off-by: Valentyn Faychuk <valy@detee.ltd>
This commit is contained in:
parent
00e0db312b
commit
fc933c0647
@ -1,71 +1,58 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
set -e
|
set -e
|
||||||
|
|
||||||
|
num_nodes=${1:-4}
|
||||||
script_dir=$(dirname "$0")
|
script_dir=$(dirname "$0")
|
||||||
cd "${script_dir}/.." # Go to the root of the project
|
cd "${script_dir}/.."
|
||||||
|
|
||||||
|
function build_mint_sol() {
|
||||||
|
command -v cargo >/dev/null 2>&1 || { echo >&2 "cargo not found, run 'curl https://sh.rustup.rs -sSf | sh'"; exit 1; }
|
||||||
|
command -v gcc >/dev/null 2>&1 || { echo >&2 "cc not found, run 'apt update && apt install build-essential'"; exit 1; }
|
||||||
|
command -v protoc >/dev/null 2>&1 || { echo >&2 "protoc not found, run 'apt update && apt install protobuf-compiler'"; exit 1; }
|
||||||
|
|
||||||
function build_mint_sol_tool() {
|
|
||||||
echo "Building the mint_sol tool for testing"
|
echo "Building the mint_sol tool for testing"
|
||||||
|
cd mint_sol && cargo build --release && cd ..
|
||||||
if ! command -v cargo 2>&1 /dev/null
|
cp mint_sol/target/release/mint_sol "${script_dir}/mint_sol"
|
||||||
then
|
|
||||||
echo "cargo not found, run 'curl https://sh.rustup.rs -sSf | sh'"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
if ! command -v gcc 2>&1 /dev/null
|
|
||||||
then
|
|
||||||
echo "cc not found, run 'apt update && apt install build-essential'"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
if ! command -v protoc 2>&1 /dev/null
|
|
||||||
then
|
|
||||||
echo "protoc not found, run 'apt update && apt install protobuf-compiler'"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
cd mint_sol
|
|
||||||
cargo build --release
|
|
||||||
cp target/release/mint_sol "../${script_dir}/mint_sol"
|
|
||||||
cd ..
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eval "sudo rm -rf /tmp/dthc{0..$num_nodes}"
|
||||||
TEST=1 ./${script_dir}/build-container.sh
|
TEST=1 ./${script_dir}/build-container.sh
|
||||||
[ -e "${script_dir}/mint_sol" ] || build_mint_sol_tool
|
[ -e "${script_dir}/mint_sol" ] || build_mint_sol
|
||||||
|
|
||||||
# Cleanup old containers and the network
|
# Cleanup old containers and the network
|
||||||
echo "Creating the network and the root node"
|
echo "Creating the network and the root node"
|
||||||
docker ps -a | grep 'dthc' | awk '{ print $NF }' | xargs docker rm -f || true
|
eval "docker rm -f dthc{0..$num_nodes}" 2>&1 /dev/null
|
||||||
docker network inspect dthc > /dev/null 2>&1 \
|
docker network inspect dthc >/dev/null 2>&1 \
|
||||||
|| docker network create --subnet=172.18.0.0/24 dthc \
|
|| docker network create --subnet=172.18.0.0/24 dthc \
|
||||||
|| true
|
|| true
|
||||||
|
|
||||||
echo "Waiting for the root node to start"
|
echo "Waiting for the root node to start"
|
||||||
# 172.18.0.1 is for the network gateway
|
# 172.18.0.1 is for the network gateway
|
||||||
docker run --name dthc-root \
|
docker run --name dthc0 \
|
||||||
--network dthc -d \
|
--network dthc -d \
|
||||||
--ip 172.18.0.2 \
|
--ip 172.18.0.2 \
|
||||||
--env NODE_IP="172.18.0.2" \
|
--env NODE_IP="172.18.0.2" \
|
||||||
|
--volume /tmp/dthc0:/challenge/main \
|
||||||
|
--publish 31300:31372 \
|
||||||
--device /dev/sgx/provision \
|
--device /dev/sgx/provision \
|
||||||
--device /dev/sgx/enclave \
|
--device /dev/sgx/enclave \
|
||||||
detee/hacker-challenge:test
|
detee/hacker-challenge:test
|
||||||
while true; do
|
while true; do
|
||||||
echo -n "." && sleep 1
|
echo -n "." && sleep 1
|
||||||
docker logs dthc-root | grep -q "SOL" && echo && break
|
docker logs dthc0 | grep -q "SOL" && echo && break
|
||||||
done
|
done
|
||||||
|
|
||||||
echo "Sending SOL to the root and waiting for the mint"
|
echo "Sending SOL to the root and waiting for the mint"
|
||||||
address=$(docker logs dthc-root | grep 'SOL' | awk '{ print $NF }')
|
address=$(docker logs dthc0 | grep 'SOL' | awk '{ print $NF }')
|
||||||
"${script_dir}"/mint_sol "${address}"
|
"${script_dir}"/mint_sol "${address}"
|
||||||
while true; do
|
while true; do
|
||||||
echo -n "." && sleep 1
|
echo -n "." && sleep 1
|
||||||
docker logs dthc-root | grep -q "Mint created" && echo && break
|
docker logs dthc0 | grep -q "Mint created" && echo && break
|
||||||
done
|
done
|
||||||
|
|
||||||
echo "Creating the cluster"
|
echo "Creating a cluster with ${num_nodes} nodes"
|
||||||
for n in {1..20}; do
|
for n in $(seq 1 $num_nodes); do
|
||||||
#init_nodes=$(docker inspect dthc-root --format '{{ .NetworkSettings.Networks.dthc.IPAddress }}')
|
#init_nodes=$(docker inspect dthc0 --format '{{ .NetworkSettings.Networks.dthc.IPAddress }}')
|
||||||
node_ip="172.18.0.$((2 + n))"
|
node_ip="172.18.0.$((2 + n))"
|
||||||
node_port=$((31300 + n))
|
node_port=$((31300 + n))
|
||||||
node_volume="/tmp/dthc${n}"
|
node_volume="/tmp/dthc${n}"
|
||||||
@ -81,7 +68,6 @@ for n in {1..20}; do
|
|||||||
--device /dev/sgx/enclave \
|
--device /dev/sgx/enclave \
|
||||||
detee/hacker-challenge:test
|
detee/hacker-challenge:test
|
||||||
done
|
done
|
||||||
sleep 15 # Wait for the cluster to start
|
|
||||||
|
|
||||||
echo "Running the test mint"
|
echo "Running the test mint"
|
||||||
for n in {1..20}; do
|
for n in {1..20}; do
|
||||||
@ -94,4 +80,4 @@ done
|
|||||||
|
|
||||||
# curl 127.0.0.1:31303/metrics
|
# curl 127.0.0.1:31303/metrics
|
||||||
# curl -X POST 127.0.0.1:31303/mint -d '{"wallet": "EZT16iP1SQVUFf1AJN6oiE5BZPnyBUqaKDkZ4oZRsvhR"}' -H 'Content-Type: application/json'
|
# curl -X POST 127.0.0.1:31303/mint -d '{"wallet": "EZT16iP1SQVUFf1AJN6oiE5BZPnyBUqaKDkZ4oZRsvhR"}' -H 'Content-Type: application/json'
|
||||||
# docker run --network dthc -d --name dthc-3 --ip 172.18.0.3 --env NODE_IP='172.18.0.3' --env INIT_NODES='172.18.0.2' -v /tmp/dthc3:/challenge/main -p 31303:31372 --device /dev/sgx/provision --device /dev/sgx/enclave detee/hacker-challenge:test
|
# docker run --name dthc0 --network dthc -d --ip 172.18.0.2 --env NODE_IP="172.18.0.2" --env INIT_NODES="172.18.0.5 172.18.0.3 172.18.0.4" --volume /tmp/dthc0:/challenge/main --publish 31300:31372 --device /dev/sgx/provision --device /dev/sgx/enclave detee/hacker-challenge:test
|
||||||
|
@ -183,30 +183,7 @@ impl State {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_my_info(&self) -> NodeInfo {
|
pub fn get_nodes(&self) -> Vec<(String, NodeInfo)> {
|
||||||
let mut my_info = self.nodes.get_mut(&self.my_ip).expect("no info for this node");
|
|
||||||
my_info.keepalive = SystemTime::now();
|
|
||||||
my_info.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This returns true if the update should be further forwarded
|
|
||||||
/// For example, we never forward our own updates that came back
|
|
||||||
pub fn process_node_update(&self, (ip, node_info): (String, NodeInfo)) -> bool {
|
|
||||||
let is_update_mine = ip.eq(&self.my_ip);
|
|
||||||
let is_update_new = self
|
|
||||||
.nodes
|
|
||||||
.get(&ip)
|
|
||||||
.map(|curr_info| node_info.is_newer_than(&curr_info))
|
|
||||||
.unwrap_or(true);
|
|
||||||
if is_update_new {
|
|
||||||
println!("Inserting: {}, {}", ip, node_info.to_json());
|
|
||||||
node_info.log(&ip);
|
|
||||||
self.nodes.insert(ip, node_info);
|
|
||||||
}
|
|
||||||
is_update_new && !is_update_mine
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_node_list(&self) -> Vec<(String, NodeInfo)> {
|
|
||||||
self.nodes.clone().into_iter().collect()
|
self.nodes.clone().into_iter().collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,28 +191,58 @@ impl State {
|
|||||||
self.my_ip.clone()
|
self.my_ip.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_my_info(&self) -> NodeInfo {
|
||||||
|
let mut my_info = self.nodes.get_mut(&self.my_ip).expect("no info for this node");
|
||||||
|
my_info.keepalive = SystemTime::now();
|
||||||
|
my_info.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_connected_ips(&self) -> Vec<String> {
|
||||||
|
self.conns.iter().map(|n| n.key().clone()).collect()
|
||||||
|
}
|
||||||
|
|
||||||
// returns a random node that does not have an active connection
|
// returns a random node that does not have an active connection
|
||||||
pub fn get_random_disconnected_node(&self) -> Option<String> {
|
pub fn get_random_disconnected_ip(&self) -> Option<String> {
|
||||||
use rand::{rngs::OsRng, RngCore};
|
use rand::{rngs::OsRng, RngCore};
|
||||||
let len = self.nodes.len();
|
let len = self.nodes.len();
|
||||||
if len == 0 {
|
if len == 0 {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
let conn_ips = self.get_connected_ips();
|
||||||
let skip = OsRng.next_u64().try_into().unwrap_or(0) % len;
|
let skip = OsRng.next_u64().try_into().unwrap_or(0) % len;
|
||||||
self.nodes
|
self.nodes
|
||||||
.iter()
|
.iter()
|
||||||
.map(|n| n.key().clone())
|
.map(|n| n.key().clone())
|
||||||
.cycle()
|
.cycle()
|
||||||
.skip(skip)
|
.skip(skip)
|
||||||
.find(|ip| ip != &self.my_ip && !self.conns.contains(ip))
|
.find(|ip| ip != &self.my_ip && !conn_ips.contains(ip))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove_inactive_nodes(&self) {
|
/// This returns true if the update should be further forwarded
|
||||||
self.nodes.retain(|_, v| {
|
/// For example, we never forward our own updates that came back
|
||||||
|
pub fn process_node_update(&self, (node_ip, node_info): (String, NodeInfo)) -> bool {
|
||||||
|
let is_update_mine = node_ip.eq(&self.my_ip);
|
||||||
|
let is_update_new = self
|
||||||
|
.nodes
|
||||||
|
.get(&node_ip)
|
||||||
|
.map(|curr_info| node_info.is_newer_than(&curr_info))
|
||||||
|
.unwrap_or(true);
|
||||||
|
if is_update_new {
|
||||||
|
println!("Inserting: {}, {}", node_ip, node_info.to_json());
|
||||||
|
node_info.log(&node_ip);
|
||||||
|
self.nodes.insert(node_ip, node_info);
|
||||||
|
}
|
||||||
|
is_update_new && !is_update_mine
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_inactive_nodes(&self, max_age: u64) {
|
||||||
|
self.nodes.retain(|_, node_info| {
|
||||||
// HACK: Check if it is possible to abuse network by corrupting system time
|
// HACK: Check if it is possible to abuse network by corrupting system time
|
||||||
let age =
|
let age = SystemTime::now()
|
||||||
SystemTime::now().duration_since(v.keepalive).unwrap_or(Duration::ZERO).as_secs();
|
.duration_since(node_info.keepalive)
|
||||||
age <= 600
|
.unwrap_or(Duration::ZERO)
|
||||||
|
.as_secs();
|
||||||
|
age <= max_age
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,59 +1,60 @@
|
|||||||
use super::challenge::Keys;
|
use super::{challenge::Keys, InternalNodeUpdate};
|
||||||
use super::InternalNodeUpdate;
|
|
||||||
use crate::{
|
use crate::{
|
||||||
datastore::State,
|
datastore::State,
|
||||||
grpc::challenge::{update_client::UpdateClient, Empty},
|
grpc::challenge::{update_client::UpdateClient, Empty},
|
||||||
};
|
};
|
||||||
use detee_sgx::RaTlsConfig;
|
use detee_sgx::RaTlsConfig;
|
||||||
use std::{str::FromStr, sync::Arc};
|
use std::{net::Ipv4Addr, str::FromStr, sync::Arc};
|
||||||
use tokio::{
|
use tokio::{sync::broadcast::Sender, time::Duration};
|
||||||
sync::broadcast::Sender,
|
|
||||||
time::{sleep, Duration},
|
|
||||||
};
|
|
||||||
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
|
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
|
||||||
|
|
||||||
|
pub async fn grpc_new_conn(
|
||||||
|
node_ip: String,
|
||||||
|
state: Arc<State>,
|
||||||
|
ra_cfg: RaTlsConfig,
|
||||||
|
tx: Sender<InternalNodeUpdate>,
|
||||||
|
) {
|
||||||
|
if Ipv4Addr::from_str(&node_ip).is_err() {
|
||||||
|
println!("IPv4 address is invalid: {node_ip}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ConnManager::init(state, ra_cfg, tx).connect_to(node_ip).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn grpc_query_keys(
|
||||||
|
node_ip: String,
|
||||||
|
state: Arc<State>,
|
||||||
|
ra_cfg: RaTlsConfig,
|
||||||
|
) -> Result<Keys, Box<dyn std::error::Error>> {
|
||||||
|
if Ipv4Addr::from_str(&node_ip).is_err() {
|
||||||
|
let err = format!("IPv4 address is invalid: {node_ip}");
|
||||||
|
return Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, err)));
|
||||||
|
}
|
||||||
|
query_keys(node_ip, state, ra_cfg).await
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ConnManager {
|
struct ConnManager {
|
||||||
my_ip: String,
|
|
||||||
state: Arc<State>,
|
state: Arc<State>,
|
||||||
tx: Sender<InternalNodeUpdate>,
|
tx: Sender<InternalNodeUpdate>,
|
||||||
ratls_config: RaTlsConfig,
|
ra_cfg: RaTlsConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnManager {
|
impl ConnManager {
|
||||||
pub fn init(
|
fn init(state: Arc<State>, ra_cfg: RaTlsConfig, tx: Sender<InternalNodeUpdate>) -> Self {
|
||||||
my_ip: String,
|
Self { state, ra_cfg, tx }
|
||||||
state: Arc<State>,
|
|
||||||
ratls_config: RaTlsConfig,
|
|
||||||
tx: Sender<InternalNodeUpdate>,
|
|
||||||
) -> Self {
|
|
||||||
Self { my_ip, state, ratls_config, tx }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_with_node(self, node_ip: String) {
|
async fn connect_to(&self, node_ip: String) {
|
||||||
self.connect_wrapper(node_ip).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start(self) {
|
|
||||||
loop {
|
|
||||||
if let Some(ip) = self.state.get_random_disconnected_node() {
|
|
||||||
println!("Found random disconnected node {ip}");
|
|
||||||
self.connect_wrapper(ip.clone()).await;
|
|
||||||
}
|
|
||||||
sleep(Duration::from_secs(3)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn connect_wrapper(&self, node_ip: String) {
|
|
||||||
let state = self.state.clone();
|
let state = self.state.clone();
|
||||||
state.add_conn(&node_ip);
|
state.add_conn(&node_ip);
|
||||||
if let Err(e) = self.connect(node_ip.clone()).await {
|
if let Err(e) = self.connect_to_int(node_ip.clone()).await {
|
||||||
println!("Client connection for {node_ip} failed: {e:?}");
|
println!("Client connection for {node_ip} failed: {e:?}");
|
||||||
}
|
}
|
||||||
state.delete_conn(&node_ip);
|
state.delete_conn(&node_ip);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connect(&self, remote_ip: String) -> Result<(), Box<dyn std::error::Error>> {
|
async fn connect_to_int(&self, remote_ip: String) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
use detee_sgx::RaTlsConfigBuilder;
|
use detee_sgx::RaTlsConfigBuilder;
|
||||||
use hyper::Uri;
|
use hyper::Uri;
|
||||||
use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor};
|
use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor};
|
||||||
@ -61,7 +62,7 @@ impl ConnManager {
|
|||||||
|
|
||||||
println!("Connecting to {remote_ip}...");
|
println!("Connecting to {remote_ip}...");
|
||||||
|
|
||||||
let tls = ClientConfig::from_ratls_config(self.ratls_config.clone())
|
let tls = ClientConfig::from_ratls_config(self.ra_cfg.clone())
|
||||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?;
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?;
|
||||||
|
|
||||||
let mut http = HttpConnector::new();
|
let mut http = HttpConnector::new();
|
||||||
@ -79,6 +80,7 @@ impl ConnManager {
|
|||||||
.enable_http2()
|
.enable_http2()
|
||||||
.wrap_connector(s)
|
.wrap_connector(s)
|
||||||
})
|
})
|
||||||
|
.timeout(Duration::from_secs(5))
|
||||||
.map_request(move |_| {
|
.map_request(move |_| {
|
||||||
Uri::from_str(&format!("https://{cloned_node_ip}:31373"))
|
Uri::from_str(&format!("https://{cloned_node_ip}:31373"))
|
||||||
.expect("Could not parse URI")
|
.expect("Could not parse URI")
|
||||||
@ -112,21 +114,12 @@ impl ConnManager {
|
|||||||
let mut resp_stream = response.into_inner();
|
let mut resp_stream = response.into_inner();
|
||||||
|
|
||||||
// Immediately send our info as a network update
|
// Immediately send our info as a network update
|
||||||
let my_info = (self.my_ip.clone(), self.state.get_my_info()).into();
|
let _ = self.tx.send((self.state.get_my_ip(), self.state.get_my_info()).into());
|
||||||
let _ = self.tx.send(InternalNodeUpdate { sender_ip: self.my_ip.clone(), update: my_info });
|
|
||||||
|
|
||||||
while let Some(update) = resp_stream.message().await? {
|
while let Some(update) = resp_stream.message().await? {
|
||||||
// update the entire network in case the information is new
|
// Update the entire network in case the information is new
|
||||||
if self.state.process_node_update(update.clone().into()) {
|
if self.state.process_node_update(update.clone().into()) {
|
||||||
// if process update returns true, the update must be forwarded
|
// If process update returns true, the update must be forwarded
|
||||||
if self
|
if self.tx.send((remote_ip.clone(), update).into()).is_err() {
|
||||||
.tx
|
|
||||||
.send(InternalNodeUpdate {
|
|
||||||
sender_ip: remote_ip.clone(),
|
|
||||||
update: update.clone(),
|
|
||||||
})
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
println!("Tokio broadcast receivers had an issue consuming the channel");
|
println!("Tokio broadcast receivers had an issue consuming the channel");
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -136,10 +129,10 @@ impl ConnManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn key_grabber(
|
async fn query_keys(
|
||||||
state: Arc<State>,
|
|
||||||
node_ip: String,
|
node_ip: String,
|
||||||
ratls_config: RaTlsConfig,
|
state: Arc<State>,
|
||||||
|
ra_cfg: RaTlsConfig,
|
||||||
) -> Result<Keys, Box<dyn std::error::Error>> {
|
) -> Result<Keys, Box<dyn std::error::Error>> {
|
||||||
use detee_sgx::RaTlsConfigBuilder;
|
use detee_sgx::RaTlsConfigBuilder;
|
||||||
use hyper::Uri;
|
use hyper::Uri;
|
||||||
@ -148,7 +141,7 @@ pub async fn key_grabber(
|
|||||||
|
|
||||||
println!("Getting key from {node_ip}...");
|
println!("Getting key from {node_ip}...");
|
||||||
|
|
||||||
let tls = ClientConfig::from_ratls_config(ratls_config)
|
let tls = ClientConfig::from_ratls_config(ra_cfg)
|
||||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?;
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?;
|
||||||
|
|
||||||
let mut http = HttpConnector::new();
|
let mut http = HttpConnector::new();
|
||||||
@ -166,6 +159,7 @@ pub async fn key_grabber(
|
|||||||
.enable_http2()
|
.enable_http2()
|
||||||
.wrap_connector(s)
|
.wrap_connector(s)
|
||||||
})
|
})
|
||||||
|
.timeout(Duration::from_secs(5))
|
||||||
.map_request(move |_| {
|
.map_request(move |_| {
|
||||||
Uri::from_str(&format!("https://{cloned_node_ip}:31373")).expect("Could not parse URI")
|
Uri::from_str(&format!("https://{cloned_node_ip}:31373")).expect("Could not parse URI")
|
||||||
})
|
})
|
||||||
|
@ -7,16 +7,10 @@ pub mod challenge {
|
|||||||
tonic::include_proto!("challenge");
|
tonic::include_proto!("challenge");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, PartialEq)]
|
|
||||||
pub struct InternalNodeUpdate {
|
|
||||||
pub sender_ip: String,
|
|
||||||
pub update: NodeUpdate,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<(String, NodeInfo)> for NodeUpdate {
|
impl From<(String, NodeInfo)> for NodeUpdate {
|
||||||
fn from((ip, info): (String, NodeInfo)) -> Self {
|
fn from((ip, info): (String, NodeInfo)) -> Self {
|
||||||
NodeUpdate {
|
Self {
|
||||||
ip: ip.to_string(),
|
ip,
|
||||||
started_at: Some(prost_types::Timestamp::from(info.started_at)),
|
started_at: Some(prost_types::Timestamp::from(info.started_at)),
|
||||||
keepalive: Some(prost_types::Timestamp::from(info.keepalive)),
|
keepalive: Some(prost_types::Timestamp::from(info.keepalive)),
|
||||||
mint_requests: info.mint_requests,
|
mint_requests: info.mint_requests,
|
||||||
@ -31,33 +25,49 @@ impl From<(String, NodeInfo)> for NodeUpdate {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl From<NodeUpdate> for (String, NodeInfo) {
|
impl From<NodeUpdate> for (String, NodeInfo) {
|
||||||
fn from(val: NodeUpdate) -> Self {
|
fn from(upd: NodeUpdate) -> (String, NodeInfo) {
|
||||||
let ip = val.ip;
|
let ip = upd.ip;
|
||||||
let started_at: SystemTime = match val.started_at {
|
let started_at: SystemTime = grpc_timestamp_to_systemtime(upd.started_at);
|
||||||
Some(ts) => {
|
let keepalive: SystemTime = grpc_timestamp_to_systemtime(upd.keepalive);
|
||||||
let duration = Duration::new(ts.seconds as u64, ts.nanos as u32);
|
|
||||||
UNIX_EPOCH.checked_add(duration).unwrap_or(SystemTime::now())
|
|
||||||
}
|
|
||||||
None => SystemTime::now(),
|
|
||||||
};
|
|
||||||
let keepalive: SystemTime = match val.keepalive {
|
|
||||||
Some(ts) => {
|
|
||||||
let duration = Duration::new(ts.seconds as u64, ts.nanos as u32);
|
|
||||||
UNIX_EPOCH.checked_add(duration).unwrap_or(SystemTime::now())
|
|
||||||
}
|
|
||||||
None => SystemTime::now(),
|
|
||||||
};
|
|
||||||
let self_info = NodeInfo {
|
let self_info = NodeInfo {
|
||||||
started_at,
|
started_at,
|
||||||
keepalive,
|
keepalive,
|
||||||
mint_requests: val.mint_requests,
|
mint_requests: upd.mint_requests,
|
||||||
mints: val.mints,
|
mints: upd.mints,
|
||||||
mratls_conns: val.mratls_conns,
|
mratls_conns: upd.mratls_conns,
|
||||||
net_attacks: val.quote_attacks,
|
net_attacks: upd.quote_attacks,
|
||||||
public: val.public,
|
public: upd.public,
|
||||||
restarts: val.restarts,
|
restarts: upd.restarts,
|
||||||
disk_attacks: val.disk_attacks,
|
disk_attacks: upd.disk_attacks,
|
||||||
};
|
};
|
||||||
(ip, self_info)
|
(ip, self_info)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn grpc_timestamp_to_systemtime(ts: Option<prost_types::Timestamp>) -> SystemTime {
|
||||||
|
if let Some(ts) = ts {
|
||||||
|
let duration = Duration::new(ts.seconds as u64, ts.nanos as u32);
|
||||||
|
UNIX_EPOCH.checked_add(duration).unwrap_or(SystemTime::now())
|
||||||
|
} else {
|
||||||
|
println!("Timestamp is None");
|
||||||
|
SystemTime::now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, PartialEq)]
|
||||||
|
pub struct InternalNodeUpdate {
|
||||||
|
pub sender_ip: String,
|
||||||
|
pub update: NodeUpdate,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<(String, NodeInfo)> for InternalNodeUpdate {
|
||||||
|
fn from((ip, info): (String, NodeInfo)) -> Self {
|
||||||
|
Self { sender_ip: ip.clone(), update: (ip, info).into() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<(String, NodeUpdate)> for InternalNodeUpdate {
|
||||||
|
fn from((ip, update): (String, NodeUpdate)) -> Self {
|
||||||
|
Self { sender_ip: ip, update }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
use super::challenge::{update_server::UpdateServer, Empty, Keys, NodeUpdate};
|
use super::{
|
||||||
use super::InternalNodeUpdate;
|
challenge::{update_server::UpdateServer, Empty, Keys, NodeUpdate},
|
||||||
|
InternalNodeUpdate,
|
||||||
|
};
|
||||||
use crate::{datastore::State, grpc::challenge::update_server::Update};
|
use crate::{datastore::State, grpc::challenge::update_server::Update};
|
||||||
use detee_sgx::RaTlsConfig;
|
use detee_sgx::RaTlsConfig;
|
||||||
use rustls::pki_types::CertificateDer;
|
use rustls::pki_types::CertificateDer;
|
||||||
@ -8,21 +10,30 @@ use tokio::sync::broadcast::Sender;
|
|||||||
use tokio_stream::{Stream, StreamExt};
|
use tokio_stream::{Stream, StreamExt};
|
||||||
use tonic::{Request, Response, Status, Streaming};
|
use tonic::{Request, Response, Status, Streaming};
|
||||||
|
|
||||||
pub struct MyServer {
|
pub async fn grpc_new_server(
|
||||||
|
state: Arc<State>,
|
||||||
|
keys: Keys,
|
||||||
|
ra_cfg: RaTlsConfig,
|
||||||
|
tx: Sender<InternalNodeUpdate>,
|
||||||
|
) {
|
||||||
|
NodeServer::init(state, keys, ra_cfg, tx).start().await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct NodeServer {
|
||||||
state: Arc<State>,
|
state: Arc<State>,
|
||||||
tx: Sender<InternalNodeUpdate>,
|
tx: Sender<InternalNodeUpdate>,
|
||||||
ratls_config: RaTlsConfig,
|
ra_cfg: RaTlsConfig,
|
||||||
keys: Keys, // For sending secret keys to new nodes ;)
|
keys: Keys, // For sending secret keys to new nodes ;)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MyServer {
|
impl NodeServer {
|
||||||
pub fn init(
|
pub fn init(
|
||||||
state: Arc<State>,
|
state: Arc<State>,
|
||||||
keys: Keys,
|
keys: Keys,
|
||||||
ratls_config: RaTlsConfig,
|
ra_cfg: RaTlsConfig,
|
||||||
tx: Sender<InternalNodeUpdate>,
|
tx: Sender<InternalNodeUpdate>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self { state, tx, keys, ratls_config }
|
Self { state, tx, keys, ra_cfg }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start(self) {
|
pub async fn start(self) {
|
||||||
@ -40,7 +51,7 @@ impl MyServer {
|
|||||||
|
|
||||||
// TODO: error handling, shouldn't have expects
|
// TODO: error handling, shouldn't have expects
|
||||||
|
|
||||||
let mut tls = ServerConfig::from_ratls_config(self.ratls_config.clone()).unwrap();
|
let mut tls = ServerConfig::from_ratls_config(self.ra_cfg.clone()).unwrap();
|
||||||
tls.alpn_protocols = vec![b"h2".to_vec()];
|
tls.alpn_protocols = vec![b"h2".to_vec()];
|
||||||
|
|
||||||
let state = self.state.clone();
|
let state = self.state.clone();
|
||||||
@ -119,7 +130,7 @@ struct ConnInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl Update for MyServer {
|
impl Update for NodeServer {
|
||||||
type GetUpdatesStream = Pin<Box<dyn Stream<Item = Result<NodeUpdate, Status>> + Send>>;
|
type GetUpdatesStream = Pin<Box<dyn Stream<Item = Result<NodeUpdate, Status>> + Send>>;
|
||||||
|
|
||||||
async fn get_keys(&self, _request: Request<Empty>) -> Result<Response<Keys>, Status> {
|
async fn get_keys(&self, _request: Request<Empty>) -> Result<Response<Keys>, Status> {
|
||||||
@ -143,8 +154,8 @@ impl Update for MyServer {
|
|||||||
state.declare_myself_public();
|
state.declare_myself_public();
|
||||||
state.add_conn(&remote_ip);
|
state.add_conn(&remote_ip);
|
||||||
|
|
||||||
let full_update_list: Vec<NodeUpdate> = state.get_node_list().into_iter().map(Into::<NodeUpdate>::into).collect();
|
let known_nodes: Vec<NodeUpdate> = state.get_nodes().into_iter().map(Into::into).collect();
|
||||||
for update in full_update_list {
|
for update in known_nodes {
|
||||||
yield Ok(update);
|
yield Ok(update);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,7 +175,7 @@ impl Update for MyServer {
|
|||||||
|
|
||||||
if state.process_node_update(update.clone().into()) {
|
if state.process_node_update(update.clone().into()) {
|
||||||
// if process update returns true, the update must be forwarded
|
// if process update returns true, the update must be forwarded
|
||||||
if tx.send(InternalNodeUpdate { sender_ip: remote_ip.clone(), update: update.clone() }).is_err() {
|
if tx.send((remote_ip.clone(), update).into()).is_err() {
|
||||||
println!("Tokio broadcast receivers had an issue consuming the channel");
|
println!("Tokio broadcast receivers had an issue consuming the channel");
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -48,9 +48,8 @@ impl From<(String, datastore::NodeInfo)> for NodesResp {
|
|||||||
|
|
||||||
#[get("/nodes")]
|
#[get("/nodes")]
|
||||||
async fn get_nodes(ds: web::Data<Arc<State>>) -> HttpResponse {
|
async fn get_nodes(ds: web::Data<Arc<State>>) -> HttpResponse {
|
||||||
HttpResponse::Ok().json(
|
HttpResponse::Ok()
|
||||||
ds.get_node_list().into_iter().map(Into::<NodesResp>::into).collect::<Vec<NodesResp>>(),
|
.json(ds.get_nodes().into_iter().map(Into::<NodesResp>::into).collect::<Vec<NodesResp>>())
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
@ -81,13 +80,13 @@ async fn mint(
|
|||||||
#[get("/metrics")]
|
#[get("/metrics")]
|
||||||
async fn metrics(ds: web::Data<Arc<State>>) -> HttpResponse {
|
async fn metrics(ds: web::Data<Arc<State>>) -> HttpResponse {
|
||||||
let mut metrics = String::new();
|
let mut metrics = String::new();
|
||||||
for (ip, node) in ds.get_node_list() {
|
for (ip, node) in ds.get_nodes() {
|
||||||
metrics.push_str(node.to_metrics(&ip).as_str());
|
metrics.push_str(node.to_metrics(&ip).as_str());
|
||||||
}
|
}
|
||||||
HttpResponse::Ok().content_type("text/plain; version=0.0.4; charset=utf-8").body(metrics)
|
HttpResponse::Ok().content_type("text/plain; version=0.0.4; charset=utf-8").body(metrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn init(state: Arc<State>, sol_client: Arc<SolClient>) {
|
pub async fn http_new_server(state: Arc<State>, sol_client: Arc<SolClient>) {
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
App::new()
|
App::new()
|
||||||
.app_data(web::Data::new(state.clone()))
|
.app_data(web::Data::new(state.clone()))
|
||||||
|
193
src/main.rs
193
src/main.rs
@ -4,176 +4,145 @@ mod http_server;
|
|||||||
mod persistence;
|
mod persistence;
|
||||||
mod solana;
|
mod solana;
|
||||||
|
|
||||||
use crate::persistence::SealError;
|
|
||||||
use crate::{
|
use crate::{
|
||||||
grpc::challenge::NodeUpdate, grpc::InternalNodeUpdate, persistence::KeysFile,
|
grpc::{
|
||||||
persistence::SealedFile, solana::SolClient,
|
challenge::NodeUpdate,
|
||||||
|
client::{grpc_new_conn, grpc_query_keys},
|
||||||
|
server::grpc_new_server,
|
||||||
|
InternalNodeUpdate,
|
||||||
|
},
|
||||||
|
http_server::http_new_server,
|
||||||
|
persistence::{KeysFile, SealError, SealedFile},
|
||||||
|
solana::SolClient,
|
||||||
};
|
};
|
||||||
use datastore::State;
|
use datastore::State;
|
||||||
use detee_sgx::{InstanceMeasurement, RaTlsConfig};
|
use detee_sgx::{InstanceMeasurement, RaTlsConfig};
|
||||||
use std::{
|
use std::{fs::read_to_string, sync::Arc};
|
||||||
fs::File,
|
|
||||||
io::Error,
|
|
||||||
io::{BufRead, BufReader},
|
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{broadcast, broadcast::Sender},
|
sync::{broadcast, broadcast::Sender},
|
||||||
task::JoinSet,
|
task::JoinSet,
|
||||||
time::{sleep, Duration},
|
time::{interval, Duration},
|
||||||
};
|
};
|
||||||
|
|
||||||
const INIT_NODES_FILE: &str = "/host/detee_challenge_nodes";
|
const INIT_NODES_FILE: &str = "/host/detee_challenge_nodes";
|
||||||
const KEYS_FILE: &str = "/host/main/TRY_TO_HACK_THIS";
|
const KEYS_FILE: &str = "/host/main/TRY_TO_HACK_THIS";
|
||||||
const MAX_CONNECTIONS: usize = 3;
|
const NUM_CONNECTIONS: usize = 3;
|
||||||
|
const HEARTBEAT_INTERVAL: u64 = 30; // Seconds
|
||||||
#[cfg(feature = "test")]
|
|
||||||
async fn resolve_my_ip() -> Result<String, Error> {
|
|
||||||
let node_ip = File::open("/host/detee_node_ip")?;
|
|
||||||
let mut reader = BufReader::new(node_ip);
|
|
||||||
let mut ip = String::new();
|
|
||||||
reader.read_line(&mut ip)?;
|
|
||||||
if ip.ends_with('\n') {
|
|
||||||
ip.pop();
|
|
||||||
}
|
|
||||||
Ok(ip)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(not(feature = "test"))]
|
#[cfg(not(feature = "test"))]
|
||||||
async fn resolve_my_ip() -> Result<String, Error> {
|
async fn resolve_my_ipv4() -> String {
|
||||||
let err = "Can't resolve my external IP, try again";
|
public_ip::addr_v4().await.unwrap().to_string()
|
||||||
let ip = public_ip::addr_v4().await.ok_or(Error::new(std::io::ErrorKind::Other, err))?;
|
|
||||||
Ok(format!("{}", ip))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn heartbeat_cron(my_ip: String, state: Arc<State>, tx: Sender<InternalNodeUpdate>) {
|
#[cfg(feature = "test")]
|
||||||
|
async fn resolve_my_ipv4() -> String {
|
||||||
|
use std::str::FromStr;
|
||||||
|
let node_ip = read_to_string("/host/detee_node_ip").unwrap();
|
||||||
|
std::net::Ipv4Addr::from_str(node_ip.trim()).unwrap().to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn heartbeat(
|
||||||
|
mut tasks: JoinSet<()>,
|
||||||
|
ra_cfg: RaTlsConfig,
|
||||||
|
state: Arc<State>,
|
||||||
|
tx: Sender<InternalNodeUpdate>,
|
||||||
|
) -> ! {
|
||||||
|
let mut interval = interval(Duration::from_secs(HEARTBEAT_INTERVAL));
|
||||||
loop {
|
loop {
|
||||||
sleep(Duration::from_secs(60)).await;
|
interval.tick().await;
|
||||||
println!("Heartbeat...");
|
println!("Heartbeat...");
|
||||||
let update = (my_ip.clone(), state.get_my_info()).into();
|
state.remove_inactive_nodes(HEARTBEAT_INTERVAL * 3);
|
||||||
let _ = tx.send(InternalNodeUpdate { sender_ip: my_ip.clone(), update });
|
let connected_ips = state.get_connected_ips();
|
||||||
state.remove_inactive_nodes();
|
println!("Connected nodes ({}): {:?}", connected_ips.len(), connected_ips);
|
||||||
|
let _ = tx.send((state.get_my_ip(), state.get_my_info()).into());
|
||||||
|
if connected_ips.len() < NUM_CONNECTIONS {
|
||||||
|
if let Some(node_ip) = state.get_random_disconnected_ip() {
|
||||||
|
println!("Dialing random node {}", node_ip);
|
||||||
|
tasks.spawn(grpc_new_conn(node_ip, state.clone(), ra_cfg.clone(), tx.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let _ = tasks.try_join_next();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_sol_client(state: Arc<State>, ratls_config: RaTlsConfig) -> SolClient {
|
async fn init_token(state: Arc<State>, ra_cfg: RaTlsConfig) -> SolClient {
|
||||||
|
// First try to get the keys from the disk
|
||||||
match KeysFile::read(KEYS_FILE) {
|
match KeysFile::read(KEYS_FILE) {
|
||||||
Ok(keys_file) => {
|
Ok(keys_file) => {
|
||||||
let sol_client = SolClient::try_from(keys_file)
|
// If unsealed keys are corrupted then it is very bad, we should panic
|
||||||
.map_err(|e| {
|
let sol_client = SolClient::try_from(keys_file).unwrap();
|
||||||
println!("Read malformed keys from the disk: {e}");
|
println!("Found sealed wallet {}", sol_client.get_wallet_pubkey());
|
||||||
state.increase_net_attacks();
|
println!("The address of the Token is {}", sol_client.get_token_address());
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
println!(
|
|
||||||
"Found the following wallet saved to disk: {}",
|
|
||||||
sol_client.get_wallet_pubkey()
|
|
||||||
);
|
|
||||||
println!("Loading token mint address {}", sol_client.get_token_address());
|
|
||||||
return sol_client;
|
return sol_client;
|
||||||
}
|
}
|
||||||
Err(SealError::Attack(e)) => {
|
Err(SealError::Attack(e)) => {
|
||||||
println!("The local keys file is corrupted: {}", e);
|
println!("The sealed keys are corrupted: {}", e);
|
||||||
state.increase_disk_attacks();
|
state.increase_disk_attacks();
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
println!("Could not read keys file: {e}");
|
println!("Could not read sealed keys: {e}");
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let init_nodes = match File::open(INIT_NODES_FILE) {
|
let node_ips = match read_to_string(INIT_NODES_FILE) {
|
||||||
Ok(init_nodes) => init_nodes,
|
Ok(node_ips) => node_ips,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
println!("Can't initialize using init nodes from {INIT_NODES_FILE}");
|
println!("I am root, creating a new Token...");
|
||||||
println!("Starting a new network with a new key...");
|
let sol_client = SolClient::create_new_token().await;
|
||||||
return SolClient::new().await;
|
println!("The address of the Token is {}", sol_client.get_token_address());
|
||||||
|
println!("Sealing keys in the {KEYS_FILE}");
|
||||||
|
if let Err(e) = sol_client.get_keys_file().write(KEYS_FILE) {
|
||||||
|
println!("Could not seal keys: {e}");
|
||||||
|
}
|
||||||
|
return sol_client;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let init_nodes_reader = BufReader::new(init_nodes);
|
for node_ip in node_ips.lines().map(String::from) {
|
||||||
for init_node_ip in init_nodes_reader.lines().map(|l| l.unwrap()) {
|
match grpc_query_keys(node_ip.clone(), state.clone(), ra_cfg.clone()).await {
|
||||||
match grpc::client::key_grabber(state.clone(), init_node_ip, ratls_config.clone()).await {
|
|
||||||
Ok(keys) => {
|
Ok(keys) => {
|
||||||
let sol_client = SolClient::try_from(keys.clone())
|
// If grpc keys are corrupted then it is very bad, we should panic
|
||||||
.map_err(|e| {
|
let sol_client = SolClient::try_from(keys.clone()).unwrap();
|
||||||
println!("Received malformed keys from the network: {e}");
|
println!("Connected network wallet {}", sol_client.get_wallet_pubkey());
|
||||||
state.increase_net_attacks();
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
println!(
|
|
||||||
"Got keypair from the network. Joining the network using wallet {}",
|
|
||||||
sol_client.get_wallet_pubkey()
|
|
||||||
);
|
|
||||||
println!("The address of the Token is {}", sol_client.get_token_address());
|
println!("The address of the Token is {}", sol_client.get_token_address());
|
||||||
println!("Saving this data to disk in the file {KEYS_FILE}");
|
println!("Sealing keys in the {KEYS_FILE}");
|
||||||
if let Err(e) = sol_client.get_keys_file().write(KEYS_FILE) {
|
if let Err(e) = sol_client.get_keys_file().write(KEYS_FILE) {
|
||||||
println!("Could not save data to disk: {e}");
|
println!("Could not seal keys: {e}");
|
||||||
}
|
}
|
||||||
return sol_client;
|
return sol_client;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
println!("Could not get keypair: {e:?}");
|
println!("Could not query keys from {node_ip}: {e:?}");
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
panic!("could not get keypair.");
|
panic!("Can't initialize the Token");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
env_logger::init_from_env(env_logger::Env::default().default_filter_or("warn"));
|
env_logger::init_from_env(env_logger::Env::default().default_filter_or("warn"));
|
||||||
let ratls_config = RaTlsConfig::new()
|
let mrenclave = InstanceMeasurement::new().with_current_mrenclave().unwrap();
|
||||||
.allow_instance_measurement(InstanceMeasurement::new().with_current_mrenclave().unwrap());
|
let ra_cfg = RaTlsConfig::new().allow_instance_measurement(mrenclave);
|
||||||
let my_ip = resolve_my_ip().await.unwrap();
|
let my_ip = resolve_my_ipv4().await; // Guaranteed to be correct IPv4
|
||||||
println!("Starting on IP {}", my_ip);
|
println!("Starting on IP {}", my_ip);
|
||||||
|
|
||||||
let state = Arc::new(State::new(my_ip.clone()));
|
let state = Arc::new(State::new(my_ip.clone()));
|
||||||
let sol_client = Arc::new(get_sol_client(state.clone(), ratls_config.clone()).await);
|
let sol_client = Arc::new(init_token(state.clone(), ra_cfg.clone()).await);
|
||||||
|
|
||||||
let (tx, _) = broadcast::channel(500);
|
|
||||||
|
|
||||||
let mut tasks = JoinSet::new();
|
let mut tasks = JoinSet::new();
|
||||||
|
let (tx, _) = broadcast::channel(500);
|
||||||
|
|
||||||
tasks.spawn(heartbeat_cron(my_ip.clone(), state.clone(), tx.clone()));
|
// HTTP and GRPC servers
|
||||||
tasks.spawn(http_server::init(state.clone(), sol_client.clone()));
|
tasks.spawn(grpc_new_server(state.clone(), sol_client.get_keys(), ra_cfg.clone(), tx.clone()));
|
||||||
tasks.spawn(
|
tasks.spawn(http_new_server(state.clone(), sol_client));
|
||||||
grpc::server::MyServer::init(
|
|
||||||
state.clone(),
|
|
||||||
sol_client.get_keys(),
|
|
||||||
ratls_config.clone(),
|
|
||||||
tx.clone(),
|
|
||||||
)
|
|
||||||
.start(),
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Ok(input) = std::fs::read_to_string(INIT_NODES_FILE) {
|
if let Ok(node_ips) = read_to_string(INIT_NODES_FILE) {
|
||||||
for line in input.lines() {
|
for node_ip in node_ips.lines().take(NUM_CONNECTIONS).map(String::from) {
|
||||||
tasks.spawn(
|
tasks.spawn(grpc_new_conn(node_ip, state.clone(), ra_cfg.clone(), tx.clone()));
|
||||||
grpc::client::ConnManager::init(
|
|
||||||
my_ip.clone(),
|
|
||||||
state.clone(),
|
|
||||||
ratls_config.clone(),
|
|
||||||
tx.clone(),
|
|
||||||
)
|
|
||||||
.start_with_node(line.to_string()),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _ in 0..MAX_CONNECTIONS {
|
// Heartbeat
|
||||||
tasks.spawn(
|
heartbeat(tasks, ra_cfg, state, tx).await;
|
||||||
grpc::client::ConnManager::init(
|
|
||||||
my_ip.clone(),
|
|
||||||
state.clone(),
|
|
||||||
ratls_config.clone(),
|
|
||||||
tx.clone(),
|
|
||||||
)
|
|
||||||
.start(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
while let Some(Ok(_)) = tasks.join_next().await {}
|
|
||||||
|
|
||||||
// task panicked
|
|
||||||
println!("Shutting down...");
|
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,9 @@
|
|||||||
use crate::datastore::NodeInfo;
|
use crate::{datastore::NodeInfo, grpc::challenge::Keys};
|
||||||
use crate::grpc::challenge::Keys;
|
|
||||||
use detee_sgx::SgxError;
|
use detee_sgx::SgxError;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_with::{base64::Base64, serde_as};
|
use serde_with::{base64::Base64, serde_as};
|
||||||
use std::io::Write;
|
use std::{io::Write, sync::Mutex};
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use std::sync::Mutex;
|
|
||||||
pub struct Logfile {}
|
pub struct Logfile {}
|
||||||
|
|
||||||
static LOG_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
|
static LOG_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
|
||||||
@ -41,9 +39,9 @@ impl From<Keys> for KeysFile {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Into<Keys> for KeysFile {
|
impl From<KeysFile> for Keys {
|
||||||
fn into(self) -> Keys {
|
fn from(kf: KeysFile) -> Keys {
|
||||||
Keys { keypair: self.keypair, token_address: self.token }
|
Keys { keypair: kf.keypair, token_address: kf.token }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,7 +27,7 @@ pub struct SolClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SolClient {
|
impl SolClient {
|
||||||
pub async fn new() -> Self {
|
pub async fn create_new_token() -> Self {
|
||||||
let client = RpcClient::new(RPC_URL);
|
let client = RpcClient::new(RPC_URL);
|
||||||
let keypair = Keypair::new();
|
let keypair = Keypair::new();
|
||||||
let token = create_token(&client, &keypair).await;
|
let token = create_token(&client, &keypair).await;
|
||||||
|
Loading…
Reference in New Issue
Block a user