connections refactoring
Signed-off-by: Valentyn Faychuk <valy@detee.ltd>
This commit is contained in:
		
							parent
							
								
									00e0db312b
								
							
						
					
					
						commit
						a7d29577f9
					
				| @ -1,71 +1,58 @@ | |||||||
| #!/bin/bash | #!/bin/bash | ||||||
| set -e | set -e | ||||||
| 
 | 
 | ||||||
|  | num_nodes=${1:-4} | ||||||
| script_dir=$(dirname "$0") | script_dir=$(dirname "$0") | ||||||
| cd "${script_dir}/.." # Go to the root of the project | cd "${script_dir}/.." | ||||||
|  | 
 | ||||||
|  | function build_mint_sol() { | ||||||
|  |     command -v cargo >/dev/null 2>&1 || { echo >&2 "cargo not found, run 'curl https://sh.rustup.rs -sSf | sh'"; exit 1; } | ||||||
|  |     command -v gcc >/dev/null 2>&1 || { echo >&2 "cc not found, run 'apt update && apt install build-essential'"; exit 1; } | ||||||
|  |     command -v protoc >/dev/null 2>&1 || { echo >&2 "protoc not found, run 'apt update && apt install protobuf-compiler'"; exit 1; } | ||||||
| 
 | 
 | ||||||
| function build_mint_sol_tool() { |  | ||||||
|     echo "Building the mint_sol tool for testing" |     echo "Building the mint_sol tool for testing" | ||||||
| 
 |     cd mint_sol && cargo build --release && cd .. | ||||||
|     if ! command -v cargo 2>&1 /dev/null |     cp mint_sol/target/release/mint_sol "${script_dir}/mint_sol" | ||||||
|     then |  | ||||||
|         echo "cargo not found, run 'curl https://sh.rustup.rs -sSf | sh'" |  | ||||||
|         exit 1 |  | ||||||
|     fi |  | ||||||
| 
 |  | ||||||
|     if ! command -v gcc 2>&1 /dev/null |  | ||||||
|     then |  | ||||||
|         echo "cc not found, run 'apt update && apt install build-essential'" |  | ||||||
|         exit 1 |  | ||||||
|     fi |  | ||||||
| 
 |  | ||||||
|     if ! command -v protoc 2>&1 /dev/null |  | ||||||
|     then |  | ||||||
|         echo "protoc not found, run 'apt update && apt install protobuf-compiler'" |  | ||||||
|         exit 1 |  | ||||||
|     fi |  | ||||||
| 
 |  | ||||||
|     cd mint_sol |  | ||||||
|     cargo build --release |  | ||||||
|     cp target/release/mint_sol "../${script_dir}/mint_sol" |  | ||||||
|     cd .. |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | eval "sudo rm -rf /tmp/dthc{0..$num_nodes}" | ||||||
| TEST=1 ./${script_dir}/build-container.sh | TEST=1 ./${script_dir}/build-container.sh | ||||||
| [ -e "${script_dir}/mint_sol" ] || build_mint_sol_tool | [ -e "${script_dir}/mint_sol" ] || build_mint_sol | ||||||
| 
 | 
 | ||||||
| # Cleanup old containers and the network | # Cleanup old containers and the network | ||||||
| echo "Creating the network and the root node" | echo "Creating the network and the root node" | ||||||
| docker ps -a | grep 'dthc' | awk '{ print $NF }' | xargs docker rm -f || true | eval "docker rm -f dthc{0..$num_nodes}" 2>&1 /dev/null | ||||||
| docker network inspect dthc > /dev/null 2>&1 \ | docker network inspect dthc >/dev/null 2>&1 \ | ||||||
|     || docker network create --subnet=172.18.0.0/24 dthc \ |     || docker network create --subnet=172.18.0.0/24 dthc \ | ||||||
|     || true |     || true | ||||||
| 
 | 
 | ||||||
| echo "Waiting for the root node to start" | echo "Waiting for the root node to start" | ||||||
| # 172.18.0.1 is for the network gateway | # 172.18.0.1 is for the network gateway | ||||||
| docker run --name dthc-root \ | docker run --name dthc0 \ | ||||||
|            --network dthc -d \ |            --network dthc -d \ | ||||||
|            --ip 172.18.0.2 \ |            --ip 172.18.0.2 \ | ||||||
|            --env NODE_IP="172.18.0.2" \ |            --env NODE_IP="172.18.0.2" \ | ||||||
|  |            --volume /tmp/dthc0:/challenge/main \ | ||||||
|  |            --publish 31300:31372 \ | ||||||
|            --device /dev/sgx/provision \ |            --device /dev/sgx/provision \ | ||||||
|            --device /dev/sgx/enclave \ |            --device /dev/sgx/enclave \ | ||||||
|            detee/hacker-challenge:test |            detee/hacker-challenge:test | ||||||
| while true; do | while true; do | ||||||
|     echo -n "." && sleep 1 |     echo -n "." && sleep 1 | ||||||
|     docker logs dthc-root | grep -q "SOL" && echo && break |     docker logs dthc0 | grep -q "SOL" && echo && break | ||||||
| done | done | ||||||
| 
 | 
 | ||||||
| echo "Sending SOL to the root and waiting for the mint" | echo "Sending SOL to the root and waiting for the mint" | ||||||
| address=$(docker logs dthc-root | grep 'SOL' | awk '{ print $NF }') | address=$(docker logs dthc0 | grep 'SOL' | awk '{ print $NF }') | ||||||
| "${script_dir}"/mint_sol "${address}" | "${script_dir}"/mint_sol "${address}" | ||||||
| while true; do | while true; do | ||||||
|     echo -n "." && sleep 1 |     echo -n "." && sleep 1 | ||||||
|     docker logs dthc-root | grep -q "Mint created" && echo && break |     docker logs dthc0 | grep -q "Mint created" && echo && break | ||||||
| done | done | ||||||
| 
 | 
 | ||||||
| echo "Creating the cluster" | echo "Creating a cluster with ${num_nodes} nodes" | ||||||
| for n in {1..20}; do | for n in $(seq 1 $num_nodes); do | ||||||
|     #init_nodes=$(docker inspect dthc-root --format '{{ .NetworkSettings.Networks.dthc.IPAddress }}') |     #init_nodes=$(docker inspect dthc0 --format '{{ .NetworkSettings.Networks.dthc.IPAddress }}') | ||||||
|     node_ip="172.18.0.$((2 + n))" |     node_ip="172.18.0.$((2 + n))" | ||||||
|     node_port=$((31300 + n)) |     node_port=$((31300 + n)) | ||||||
|     node_volume="/tmp/dthc${n}" |     node_volume="/tmp/dthc${n}" | ||||||
| @ -94,4 +81,4 @@ done | |||||||
| 
 | 
 | ||||||
| # curl 127.0.0.1:31303/metrics | # curl 127.0.0.1:31303/metrics | ||||||
| # curl -X POST 127.0.0.1:31303/mint -d '{"wallet": "EZT16iP1SQVUFf1AJN6oiE5BZPnyBUqaKDkZ4oZRsvhR"}' -H 'Content-Type: application/json' | # curl -X POST 127.0.0.1:31303/mint -d '{"wallet": "EZT16iP1SQVUFf1AJN6oiE5BZPnyBUqaKDkZ4oZRsvhR"}' -H 'Content-Type: application/json' | ||||||
| # docker run --network dthc -d --name dthc-3 --ip 172.18.0.3 --env NODE_IP='172.18.0.3' --env INIT_NODES='172.18.0.2' -v /tmp/dthc3:/challenge/main -p 31303:31372 --device /dev/sgx/provision --device /dev/sgx/enclave detee/hacker-challenge:test | # docker run --name dthc0 --network dthc -d --ip 172.18.0.2 --env NODE_IP="172.18.0.2" --env INIT_NODES="172.18.0.5 172.18.0.3 172.18.0.4" --volume /tmp/dthc0:/challenge/main --publish 31300:31372 --device /dev/sgx/provision --device /dev/sgx/enclave detee/hacker-challenge:test | ||||||
|  | |||||||
| @ -183,30 +183,7 @@ impl State { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub fn get_my_info(&self) -> NodeInfo { |     pub fn get_nodes(&self) -> Vec<(String, NodeInfo)> { | ||||||
|         let mut my_info = self.nodes.get_mut(&self.my_ip).expect("no info for this node"); |  | ||||||
|         my_info.keepalive = SystemTime::now(); |  | ||||||
|         my_info.clone() |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /// This returns true if the update should be further forwarded
 |  | ||||||
|     /// For example, we never forward our own updates that came back
 |  | ||||||
|     pub fn process_node_update(&self, (ip, node_info): (String, NodeInfo)) -> bool { |  | ||||||
|         let is_update_mine = ip.eq(&self.my_ip); |  | ||||||
|         let is_update_new = self |  | ||||||
|             .nodes |  | ||||||
|             .get(&ip) |  | ||||||
|             .map(|curr_info| node_info.is_newer_than(&curr_info)) |  | ||||||
|             .unwrap_or(true); |  | ||||||
|         if is_update_new { |  | ||||||
|             println!("Inserting: {}, {}", ip, node_info.to_json()); |  | ||||||
|             node_info.log(&ip); |  | ||||||
|             self.nodes.insert(ip, node_info); |  | ||||||
|         } |  | ||||||
|         is_update_new && !is_update_mine |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub fn get_node_list(&self) -> Vec<(String, NodeInfo)> { |  | ||||||
|         self.nodes.clone().into_iter().collect() |         self.nodes.clone().into_iter().collect() | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| @ -214,28 +191,58 @@ impl State { | |||||||
|         self.my_ip.clone() |         self.my_ip.clone() | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     pub fn get_my_info(&self) -> NodeInfo { | ||||||
|  |         let mut my_info = self.nodes.get_mut(&self.my_ip).expect("no info for this node"); | ||||||
|  |         my_info.keepalive = SystemTime::now(); | ||||||
|  |         my_info.clone() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub fn get_connected_ips(&self) -> Vec<String> { | ||||||
|  |         self.conns.iter().map(|n| n.key().clone()).collect() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     // returns a random node that does not have an active connection
 |     // returns a random node that does not have an active connection
 | ||||||
|     pub fn get_random_disconnected_node(&self) -> Option<String> { |     pub fn get_random_disconnected_ip(&self) -> Option<String> { | ||||||
|         use rand::{rngs::OsRng, RngCore}; |         use rand::{rngs::OsRng, RngCore}; | ||||||
|         let len = self.nodes.len(); |         let len = self.nodes.len(); | ||||||
|         if len == 0 { |         if len == 0 { | ||||||
|             return None; |             return None; | ||||||
|         } |         } | ||||||
|  |         let conn_ips = self.get_connected_ips(); | ||||||
|         let skip = OsRng.next_u64().try_into().unwrap_or(0) % len; |         let skip = OsRng.next_u64().try_into().unwrap_or(0) % len; | ||||||
|         self.nodes |         self.nodes | ||||||
|             .iter() |             .iter() | ||||||
|             .map(|n| n.key().clone()) |             .map(|n| n.key().clone()) | ||||||
|             .cycle() |             .cycle() | ||||||
|             .skip(skip) |             .skip(skip) | ||||||
|             .find(|ip| ip != &self.my_ip && !self.conns.contains(ip)) |             .find(|ip| ip != &self.my_ip && !conn_ips.contains(ip)) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub fn remove_inactive_nodes(&self) { |     /// This returns true if the update should be further forwarded
 | ||||||
|         self.nodes.retain(|_, v| { |     /// For example, we never forward our own updates that came back
 | ||||||
|  |     pub fn process_node_update(&self, (node_ip, node_info): (String, NodeInfo)) -> bool { | ||||||
|  |         let is_update_mine = node_ip.eq(&self.my_ip); | ||||||
|  |         let is_update_new = self | ||||||
|  |             .nodes | ||||||
|  |             .get(&node_ip) | ||||||
|  |             .map(|curr_info| node_info.is_newer_than(&curr_info)) | ||||||
|  |             .unwrap_or(true); | ||||||
|  |         if is_update_new { | ||||||
|  |             println!("Inserting: {}, {}", node_ip, node_info.to_json()); | ||||||
|  |             node_info.log(&node_ip); | ||||||
|  |             self.nodes.insert(node_ip, node_info); | ||||||
|  |         } | ||||||
|  |         is_update_new && !is_update_mine | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub fn remove_inactive_nodes(&self, max_age: u64) { | ||||||
|  |         self.nodes.retain(|_, node_info| { | ||||||
|             // HACK: Check if it is possible to abuse network by corrupting system time
 |             // HACK: Check if it is possible to abuse network by corrupting system time
 | ||||||
|             let age = |             let age = SystemTime::now() | ||||||
|                 SystemTime::now().duration_since(v.keepalive).unwrap_or(Duration::ZERO).as_secs(); |                 .duration_since(node_info.keepalive) | ||||||
|             age <= 600 |                 .unwrap_or(Duration::ZERO) | ||||||
|  |                 .as_secs(); | ||||||
|  |             age <= max_age | ||||||
|         }); |         }); | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | |||||||
| @ -1,59 +1,60 @@ | |||||||
| use super::challenge::Keys; | use super::{challenge::Keys, InternalNodeUpdate}; | ||||||
| use super::InternalNodeUpdate; |  | ||||||
| use crate::{ | use crate::{ | ||||||
|     datastore::State, |     datastore::State, | ||||||
|     grpc::challenge::{update_client::UpdateClient, Empty}, |     grpc::challenge::{update_client::UpdateClient, Empty}, | ||||||
| }; | }; | ||||||
| use detee_sgx::RaTlsConfig; | use detee_sgx::RaTlsConfig; | ||||||
| use std::{str::FromStr, sync::Arc}; | use std::{net::Ipv4Addr, str::FromStr, sync::Arc}; | ||||||
| use tokio::{ | use tokio::{sync::broadcast::Sender, time::Duration}; | ||||||
|     sync::broadcast::Sender, |  | ||||||
|     time::{sleep, Duration}, |  | ||||||
| }; |  | ||||||
| use tokio_stream::{wrappers::BroadcastStream, StreamExt}; | use tokio_stream::{wrappers::BroadcastStream, StreamExt}; | ||||||
| 
 | 
 | ||||||
|  | pub async fn grpc_new_conn( | ||||||
|  |     node_ip: String, | ||||||
|  |     state: Arc<State>, | ||||||
|  |     ra_cfg: RaTlsConfig, | ||||||
|  |     tx: Sender<InternalNodeUpdate>, | ||||||
|  | ) { | ||||||
|  |     if Ipv4Addr::from_str(&node_ip).is_err() { | ||||||
|  |         println!("IPv4 address is invalid: {node_ip}"); | ||||||
|  |         return; | ||||||
|  |     } | ||||||
|  |     ConnManager::init(state, ra_cfg, tx).connect_to(node_ip).await | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn grpc_query_keys( | ||||||
|  |     node_ip: String, | ||||||
|  |     state: Arc<State>, | ||||||
|  |     ra_cfg: RaTlsConfig, | ||||||
|  | ) -> Result<Keys, Box<dyn std::error::Error>> { | ||||||
|  |     if Ipv4Addr::from_str(&node_ip).is_err() { | ||||||
|  |         let err = format!("IPv4 address is invalid: {node_ip}"); | ||||||
|  |         return Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, err))); | ||||||
|  |     } | ||||||
|  |     query_keys(node_ip, state, ra_cfg).await | ||||||
|  | } | ||||||
|  | 
 | ||||||
| #[derive(Clone)] | #[derive(Clone)] | ||||||
| pub struct ConnManager { | struct ConnManager { | ||||||
|     my_ip: String, |  | ||||||
|     state: Arc<State>, |     state: Arc<State>, | ||||||
|     tx: Sender<InternalNodeUpdate>, |     tx: Sender<InternalNodeUpdate>, | ||||||
|     ratls_config: RaTlsConfig, |     ra_cfg: RaTlsConfig, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl ConnManager { | impl ConnManager { | ||||||
|     pub fn init( |     fn init(state: Arc<State>, ra_cfg: RaTlsConfig, tx: Sender<InternalNodeUpdate>) -> Self { | ||||||
|         my_ip: String, |         Self { state, ra_cfg, tx } | ||||||
|         state: Arc<State>, |  | ||||||
|         ratls_config: RaTlsConfig, |  | ||||||
|         tx: Sender<InternalNodeUpdate>, |  | ||||||
|     ) -> Self { |  | ||||||
|         Self { my_ip, state, ratls_config, tx } |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn start_with_node(self, node_ip: String) { |     async fn connect_to(&self, node_ip: String) { | ||||||
|         self.connect_wrapper(node_ip).await; |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub async fn start(self) { |  | ||||||
|         loop { |  | ||||||
|             if let Some(ip) = self.state.get_random_disconnected_node() { |  | ||||||
|                 println!("Found random disconnected node {ip}"); |  | ||||||
|                 self.connect_wrapper(ip.clone()).await; |  | ||||||
|             } |  | ||||||
|             sleep(Duration::from_secs(3)).await; |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     async fn connect_wrapper(&self, node_ip: String) { |  | ||||||
|         let state = self.state.clone(); |         let state = self.state.clone(); | ||||||
|         state.add_conn(&node_ip); |         state.add_conn(&node_ip); | ||||||
|         if let Err(e) = self.connect(node_ip.clone()).await { |         if let Err(e) = self.connect_to_int(node_ip.clone()).await { | ||||||
|             println!("Client connection for {node_ip} failed: {e:?}"); |             println!("Client connection for {node_ip} failed: {e:?}"); | ||||||
|         } |         } | ||||||
|         state.delete_conn(&node_ip); |         state.delete_conn(&node_ip); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     async fn connect(&self, remote_ip: String) -> Result<(), Box<dyn std::error::Error>> { |     async fn connect_to_int(&self, remote_ip: String) -> Result<(), Box<dyn std::error::Error>> { | ||||||
|         use detee_sgx::RaTlsConfigBuilder; |         use detee_sgx::RaTlsConfigBuilder; | ||||||
|         use hyper::Uri; |         use hyper::Uri; | ||||||
|         use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor}; |         use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor}; | ||||||
| @ -61,7 +62,7 @@ impl ConnManager { | |||||||
| 
 | 
 | ||||||
|         println!("Connecting to {remote_ip}..."); |         println!("Connecting to {remote_ip}..."); | ||||||
| 
 | 
 | ||||||
|         let tls = ClientConfig::from_ratls_config(self.ratls_config.clone()) |         let tls = ClientConfig::from_ratls_config(self.ra_cfg.clone()) | ||||||
|             .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?; |             .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?; | ||||||
| 
 | 
 | ||||||
|         let mut http = HttpConnector::new(); |         let mut http = HttpConnector::new(); | ||||||
| @ -79,6 +80,7 @@ impl ConnManager { | |||||||
|                     .enable_http2() |                     .enable_http2() | ||||||
|                     .wrap_connector(s) |                     .wrap_connector(s) | ||||||
|             }) |             }) | ||||||
|  |             .timeout(Duration::from_secs(5)) | ||||||
|             .map_request(move |_| { |             .map_request(move |_| { | ||||||
|                 Uri::from_str(&format!("https://{cloned_node_ip}:31373")) |                 Uri::from_str(&format!("https://{cloned_node_ip}:31373")) | ||||||
|                     .expect("Could not parse URI") |                     .expect("Could not parse URI") | ||||||
| @ -112,21 +114,12 @@ impl ConnManager { | |||||||
|         let mut resp_stream = response.into_inner(); |         let mut resp_stream = response.into_inner(); | ||||||
| 
 | 
 | ||||||
|         // Immediately send our info as a network update
 |         // Immediately send our info as a network update
 | ||||||
|         let my_info = (self.my_ip.clone(), self.state.get_my_info()).into(); |         let _ = self.tx.send((self.state.get_my_ip(), self.state.get_my_info()).into()); | ||||||
|         let _ = self.tx.send(InternalNodeUpdate { sender_ip: self.my_ip.clone(), update: my_info }); |  | ||||||
| 
 |  | ||||||
|         while let Some(update) = resp_stream.message().await? { |         while let Some(update) = resp_stream.message().await? { | ||||||
|             // update the entire network in case the information is new
 |             // Update the entire network in case the information is new
 | ||||||
|             if self.state.process_node_update(update.clone().into()) { |             if self.state.process_node_update(update.clone().into()) { | ||||||
|                 // if process update returns true, the update must be forwarded
 |                 // If process update returns true, the update must be forwarded
 | ||||||
|                 if self |                 if self.tx.send((remote_ip.clone(), update).into()).is_err() { | ||||||
|                     .tx |  | ||||||
|                     .send(InternalNodeUpdate { |  | ||||||
|                         sender_ip: remote_ip.clone(), |  | ||||||
|                         update: update.clone(), |  | ||||||
|                     }) |  | ||||||
|                     .is_err() |  | ||||||
|                 { |  | ||||||
|                     println!("Tokio broadcast receivers had an issue consuming the channel"); |                     println!("Tokio broadcast receivers had an issue consuming the channel"); | ||||||
|                 }; |                 }; | ||||||
|             } |             } | ||||||
| @ -136,10 +129,10 @@ impl ConnManager { | |||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn key_grabber( | async fn query_keys( | ||||||
|     state: Arc<State>, |  | ||||||
|     node_ip: String, |     node_ip: String, | ||||||
|     ratls_config: RaTlsConfig, |     state: Arc<State>, | ||||||
|  |     ra_cfg: RaTlsConfig, | ||||||
| ) -> Result<Keys, Box<dyn std::error::Error>> { | ) -> Result<Keys, Box<dyn std::error::Error>> { | ||||||
|     use detee_sgx::RaTlsConfigBuilder; |     use detee_sgx::RaTlsConfigBuilder; | ||||||
|     use hyper::Uri; |     use hyper::Uri; | ||||||
| @ -148,7 +141,7 @@ pub async fn key_grabber( | |||||||
| 
 | 
 | ||||||
|     println!("Getting key from {node_ip}..."); |     println!("Getting key from {node_ip}..."); | ||||||
| 
 | 
 | ||||||
|     let tls = ClientConfig::from_ratls_config(ratls_config) |     let tls = ClientConfig::from_ratls_config(ra_cfg) | ||||||
|         .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?; |         .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?; | ||||||
| 
 | 
 | ||||||
|     let mut http = HttpConnector::new(); |     let mut http = HttpConnector::new(); | ||||||
| @ -166,6 +159,7 @@ pub async fn key_grabber( | |||||||
|                 .enable_http2() |                 .enable_http2() | ||||||
|                 .wrap_connector(s) |                 .wrap_connector(s) | ||||||
|         }) |         }) | ||||||
|  |         .timeout(Duration::from_secs(5)) | ||||||
|         .map_request(move |_| { |         .map_request(move |_| { | ||||||
|             Uri::from_str(&format!("https://{cloned_node_ip}:31373")).expect("Could not parse URI") |             Uri::from_str(&format!("https://{cloned_node_ip}:31373")).expect("Could not parse URI") | ||||||
|         }) |         }) | ||||||
|  | |||||||
| @ -7,16 +7,10 @@ pub mod challenge { | |||||||
|     tonic::include_proto!("challenge"); |     tonic::include_proto!("challenge"); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Clone, PartialEq)] |  | ||||||
| pub struct InternalNodeUpdate { |  | ||||||
|     pub sender_ip: String, |  | ||||||
|     pub update: NodeUpdate, |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| impl From<(String, NodeInfo)> for NodeUpdate { | impl From<(String, NodeInfo)> for NodeUpdate { | ||||||
|     fn from((ip, info): (String, NodeInfo)) -> Self { |     fn from((ip, info): (String, NodeInfo)) -> Self { | ||||||
|         NodeUpdate { |         Self { | ||||||
|             ip: ip.to_string(), |             ip, | ||||||
|             started_at: Some(prost_types::Timestamp::from(info.started_at)), |             started_at: Some(prost_types::Timestamp::from(info.started_at)), | ||||||
|             keepalive: Some(prost_types::Timestamp::from(info.keepalive)), |             keepalive: Some(prost_types::Timestamp::from(info.keepalive)), | ||||||
|             mint_requests: info.mint_requests, |             mint_requests: info.mint_requests, | ||||||
| @ -31,33 +25,49 @@ impl From<(String, NodeInfo)> for NodeUpdate { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl From<NodeUpdate> for (String, NodeInfo) { | impl From<NodeUpdate> for (String, NodeInfo) { | ||||||
|     fn from(val: NodeUpdate) -> Self { |     fn from(upd: NodeUpdate) -> (String, NodeInfo) { | ||||||
|         let ip = val.ip; |         let ip = upd.ip; | ||||||
|         let started_at: SystemTime = match val.started_at { |         let started_at: SystemTime = grpc_timestamp_to_systemtime(upd.started_at); | ||||||
|             Some(ts) => { |         let keepalive: SystemTime = grpc_timestamp_to_systemtime(upd.keepalive); | ||||||
|                 let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); |  | ||||||
|                 UNIX_EPOCH.checked_add(duration).unwrap_or(SystemTime::now()) |  | ||||||
|             } |  | ||||||
|             None => SystemTime::now(), |  | ||||||
|         }; |  | ||||||
|         let keepalive: SystemTime = match val.keepalive { |  | ||||||
|             Some(ts) => { |  | ||||||
|                 let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); |  | ||||||
|                 UNIX_EPOCH.checked_add(duration).unwrap_or(SystemTime::now()) |  | ||||||
|             } |  | ||||||
|             None => SystemTime::now(), |  | ||||||
|         }; |  | ||||||
|         let self_info = NodeInfo { |         let self_info = NodeInfo { | ||||||
|             started_at, |             started_at, | ||||||
|             keepalive, |             keepalive, | ||||||
|             mint_requests: val.mint_requests, |             mint_requests: upd.mint_requests, | ||||||
|             mints: val.mints, |             mints: upd.mints, | ||||||
|             mratls_conns: val.mratls_conns, |             mratls_conns: upd.mratls_conns, | ||||||
|             net_attacks: val.quote_attacks, |             net_attacks: upd.quote_attacks, | ||||||
|             public: val.public, |             public: upd.public, | ||||||
|             restarts: val.restarts, |             restarts: upd.restarts, | ||||||
|             disk_attacks: val.disk_attacks, |             disk_attacks: upd.disk_attacks, | ||||||
|         }; |         }; | ||||||
|         (ip, self_info) |         (ip, self_info) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | fn grpc_timestamp_to_systemtime(ts: Option<prost_types::Timestamp>) -> SystemTime { | ||||||
|  |     if let Some(ts) = ts { | ||||||
|  |         let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); | ||||||
|  |         UNIX_EPOCH.checked_add(duration).unwrap_or(SystemTime::now()) | ||||||
|  |     } else { | ||||||
|  |         println!("Timestamp is None"); | ||||||
|  |         SystemTime::now() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[derive(Clone, PartialEq)] | ||||||
|  | pub struct InternalNodeUpdate { | ||||||
|  |     pub sender_ip: String, | ||||||
|  |     pub update: NodeUpdate, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl From<(String, NodeInfo)> for InternalNodeUpdate { | ||||||
|  |     fn from((ip, info): (String, NodeInfo)) -> Self { | ||||||
|  |         Self { sender_ip: ip.clone(), update: (ip, info).into() } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl From<(String, NodeUpdate)> for InternalNodeUpdate { | ||||||
|  |     fn from((ip, update): (String, NodeUpdate)) -> Self { | ||||||
|  |         Self { sender_ip: ip, update } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | |||||||
| @ -1,5 +1,7 @@ | |||||||
| use super::challenge::{update_server::UpdateServer, Empty, Keys, NodeUpdate}; | use super::{ | ||||||
| use super::InternalNodeUpdate; |     challenge::{update_server::UpdateServer, Empty, Keys, NodeUpdate}, | ||||||
|  |     InternalNodeUpdate, | ||||||
|  | }; | ||||||
| use crate::{datastore::State, grpc::challenge::update_server::Update}; | use crate::{datastore::State, grpc::challenge::update_server::Update}; | ||||||
| use detee_sgx::RaTlsConfig; | use detee_sgx::RaTlsConfig; | ||||||
| use rustls::pki_types::CertificateDer; | use rustls::pki_types::CertificateDer; | ||||||
| @ -8,21 +10,30 @@ use tokio::sync::broadcast::Sender; | |||||||
| use tokio_stream::{Stream, StreamExt}; | use tokio_stream::{Stream, StreamExt}; | ||||||
| use tonic::{Request, Response, Status, Streaming}; | use tonic::{Request, Response, Status, Streaming}; | ||||||
| 
 | 
 | ||||||
| pub struct MyServer { | pub async fn grpc_new_server( | ||||||
|  |     state: Arc<State>, | ||||||
|  |     keys: Keys, | ||||||
|  |     ra_cfg: RaTlsConfig, | ||||||
|  |     tx: Sender<InternalNodeUpdate>, | ||||||
|  | ) { | ||||||
|  |     NodeServer::init(state, keys, ra_cfg, tx).start().await | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub struct NodeServer { | ||||||
|     state: Arc<State>, |     state: Arc<State>, | ||||||
|     tx: Sender<InternalNodeUpdate>, |     tx: Sender<InternalNodeUpdate>, | ||||||
|     ratls_config: RaTlsConfig, |     ra_cfg: RaTlsConfig, | ||||||
|     keys: Keys, // For sending secret keys to new nodes ;)
 |     keys: Keys, // For sending secret keys to new nodes ;)
 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl MyServer { | impl NodeServer { | ||||||
|     pub fn init( |     pub fn init( | ||||||
|         state: Arc<State>, |         state: Arc<State>, | ||||||
|         keys: Keys, |         keys: Keys, | ||||||
|         ratls_config: RaTlsConfig, |         ra_cfg: RaTlsConfig, | ||||||
|         tx: Sender<InternalNodeUpdate>, |         tx: Sender<InternalNodeUpdate>, | ||||||
|     ) -> Self { |     ) -> Self { | ||||||
|         Self { state, tx, keys, ratls_config } |         Self { state, tx, keys, ra_cfg } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn start(self) { |     pub async fn start(self) { | ||||||
| @ -40,7 +51,7 @@ impl MyServer { | |||||||
| 
 | 
 | ||||||
|         // TODO: error handling, shouldn't have expects
 |         // TODO: error handling, shouldn't have expects
 | ||||||
| 
 | 
 | ||||||
|         let mut tls = ServerConfig::from_ratls_config(self.ratls_config.clone()).unwrap(); |         let mut tls = ServerConfig::from_ratls_config(self.ra_cfg.clone()).unwrap(); | ||||||
|         tls.alpn_protocols = vec![b"h2".to_vec()]; |         tls.alpn_protocols = vec![b"h2".to_vec()]; | ||||||
| 
 | 
 | ||||||
|         let state = self.state.clone(); |         let state = self.state.clone(); | ||||||
| @ -119,7 +130,7 @@ struct ConnInfo { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[tonic::async_trait] | #[tonic::async_trait] | ||||||
| impl Update for MyServer { | impl Update for NodeServer { | ||||||
|     type GetUpdatesStream = Pin<Box<dyn Stream<Item = Result<NodeUpdate, Status>> + Send>>; |     type GetUpdatesStream = Pin<Box<dyn Stream<Item = Result<NodeUpdate, Status>> + Send>>; | ||||||
| 
 | 
 | ||||||
|     async fn get_keys(&self, _request: Request<Empty>) -> Result<Response<Keys>, Status> { |     async fn get_keys(&self, _request: Request<Empty>) -> Result<Response<Keys>, Status> { | ||||||
| @ -143,8 +154,8 @@ impl Update for MyServer { | |||||||
|             state.declare_myself_public(); |             state.declare_myself_public(); | ||||||
|             state.add_conn(&remote_ip); |             state.add_conn(&remote_ip); | ||||||
| 
 | 
 | ||||||
|             let full_update_list: Vec<NodeUpdate> = state.get_node_list().into_iter().map(Into::<NodeUpdate>::into).collect(); |             let known_nodes: Vec<NodeUpdate> = state.get_nodes().into_iter().map(Into::into).collect(); | ||||||
|             for update in full_update_list { |             for update in known_nodes { | ||||||
|                 yield Ok(update); |                 yield Ok(update); | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
| @ -164,7 +175,7 @@ impl Update for MyServer { | |||||||
| 
 | 
 | ||||||
|                                 if state.process_node_update(update.clone().into()) { |                                 if state.process_node_update(update.clone().into()) { | ||||||
|                                     // if process update returns true, the update must be forwarded
 |                                     // if process update returns true, the update must be forwarded
 | ||||||
|                                     if tx.send(InternalNodeUpdate { sender_ip: remote_ip.clone(), update: update.clone() }).is_err() { |                                     if tx.send((remote_ip.clone(), update).into()).is_err() { | ||||||
|                                         println!("Tokio broadcast receivers had an issue consuming the channel"); |                                         println!("Tokio broadcast receivers had an issue consuming the channel"); | ||||||
|                                     }; |                                     }; | ||||||
|                                 } |                                 } | ||||||
|  | |||||||
| @ -48,9 +48,8 @@ impl From<(String, datastore::NodeInfo)> for NodesResp { | |||||||
| 
 | 
 | ||||||
| #[get("/nodes")] | #[get("/nodes")] | ||||||
| async fn get_nodes(ds: web::Data<Arc<State>>) -> HttpResponse { | async fn get_nodes(ds: web::Data<Arc<State>>) -> HttpResponse { | ||||||
|     HttpResponse::Ok().json( |     HttpResponse::Ok() | ||||||
|         ds.get_node_list().into_iter().map(Into::<NodesResp>::into).collect::<Vec<NodesResp>>(), |         .json(ds.get_nodes().into_iter().map(Into::<NodesResp>::into).collect::<Vec<NodesResp>>()) | ||||||
|     ) |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Deserialize)] | #[derive(Deserialize)] | ||||||
| @ -81,13 +80,13 @@ async fn mint( | |||||||
| #[get("/metrics")] | #[get("/metrics")] | ||||||
| async fn metrics(ds: web::Data<Arc<State>>) -> HttpResponse { | async fn metrics(ds: web::Data<Arc<State>>) -> HttpResponse { | ||||||
|     let mut metrics = String::new(); |     let mut metrics = String::new(); | ||||||
|     for (ip, node) in ds.get_node_list() { |     for (ip, node) in ds.get_nodes() { | ||||||
|         metrics.push_str(node.to_metrics(&ip).as_str()); |         metrics.push_str(node.to_metrics(&ip).as_str()); | ||||||
|     } |     } | ||||||
|     HttpResponse::Ok().content_type("text/plain; version=0.0.4; charset=utf-8").body(metrics) |     HttpResponse::Ok().content_type("text/plain; version=0.0.4; charset=utf-8").body(metrics) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn init(state: Arc<State>, sol_client: Arc<SolClient>) { | pub async fn http_new_server(state: Arc<State>, sol_client: Arc<SolClient>) { | ||||||
|     HttpServer::new(move || { |     HttpServer::new(move || { | ||||||
|         App::new() |         App::new() | ||||||
|             .app_data(web::Data::new(state.clone())) |             .app_data(web::Data::new(state.clone())) | ||||||
|  | |||||||
							
								
								
									
										193
									
								
								src/main.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										193
									
								
								src/main.rs
									
									
									
									
									
								
							| @ -4,176 +4,145 @@ mod http_server; | |||||||
| mod persistence; | mod persistence; | ||||||
| mod solana; | mod solana; | ||||||
| 
 | 
 | ||||||
| use crate::persistence::SealError; |  | ||||||
| use crate::{ | use crate::{ | ||||||
|     grpc::challenge::NodeUpdate, grpc::InternalNodeUpdate, persistence::KeysFile, |     grpc::{ | ||||||
|     persistence::SealedFile, solana::SolClient, |         challenge::NodeUpdate, | ||||||
|  |         client::{grpc_new_conn, grpc_query_keys}, | ||||||
|  |         server::grpc_new_server, | ||||||
|  |         InternalNodeUpdate, | ||||||
|  |     }, | ||||||
|  |     http_server::http_new_server, | ||||||
|  |     persistence::{KeysFile, SealError, SealedFile}, | ||||||
|  |     solana::SolClient, | ||||||
| }; | }; | ||||||
| use datastore::State; | use datastore::State; | ||||||
| use detee_sgx::{InstanceMeasurement, RaTlsConfig}; | use detee_sgx::{InstanceMeasurement, RaTlsConfig}; | ||||||
| use std::{ | use std::{fs::read_to_string, sync::Arc}; | ||||||
|     fs::File, |  | ||||||
|     io::Error, |  | ||||||
|     io::{BufRead, BufReader}, |  | ||||||
|     sync::Arc, |  | ||||||
| }; |  | ||||||
| use tokio::{ | use tokio::{ | ||||||
|     sync::{broadcast, broadcast::Sender}, |     sync::{broadcast, broadcast::Sender}, | ||||||
|     task::JoinSet, |     task::JoinSet, | ||||||
|     time::{sleep, Duration}, |     time::{interval, Duration}, | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| const INIT_NODES_FILE: &str = "/host/detee_challenge_nodes"; | const INIT_NODES_FILE: &str = "/host/detee_challenge_nodes"; | ||||||
| const KEYS_FILE: &str = "/host/main/TRY_TO_HACK_THIS"; | const KEYS_FILE: &str = "/host/main/TRY_TO_HACK_THIS"; | ||||||
| const MAX_CONNECTIONS: usize = 3; | const NUM_CONNECTIONS: usize = 3; | ||||||
| 
 | const HEARTBEAT_INTERVAL: u64 = 30; // Seconds
 | ||||||
| #[cfg(feature = "test")] |  | ||||||
| async fn resolve_my_ip() -> Result<String, Error> { |  | ||||||
|     let node_ip = File::open("/host/detee_node_ip")?; |  | ||||||
|     let mut reader = BufReader::new(node_ip); |  | ||||||
|     let mut ip = String::new(); |  | ||||||
|     reader.read_line(&mut ip)?; |  | ||||||
|     if ip.ends_with('\n') { |  | ||||||
|         ip.pop(); |  | ||||||
|     } |  | ||||||
|     Ok(ip) |  | ||||||
| } |  | ||||||
| 
 | 
 | ||||||
| #[cfg(not(feature = "test"))] | #[cfg(not(feature = "test"))] | ||||||
| async fn resolve_my_ip() -> Result<String, Error> { | async fn resolve_my_ipv4() -> String { | ||||||
|     let err = "Can't resolve my external IP, try again"; |     public_ip::addr_v4().await.unwrap().to_string() | ||||||
|     let ip = public_ip::addr_v4().await.ok_or(Error::new(std::io::ErrorKind::Other, err))?; |  | ||||||
|     Ok(format!("{}", ip)) |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn heartbeat_cron(my_ip: String, state: Arc<State>, tx: Sender<InternalNodeUpdate>) { | #[cfg(feature = "test")] | ||||||
|  | async fn resolve_my_ipv4() -> String { | ||||||
|  |     use std::str::FromStr; | ||||||
|  |     let node_ip = read_to_string("/host/detee_node_ip").unwrap(); | ||||||
|  |     std::net::Ipv4Addr::from_str(node_ip.trim()).unwrap().to_string() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | pub async fn heartbeat( | ||||||
|  |     mut tasks: JoinSet<()>, | ||||||
|  |     ra_cfg: RaTlsConfig, | ||||||
|  |     state: Arc<State>, | ||||||
|  |     tx: Sender<InternalNodeUpdate>, | ||||||
|  | ) -> ! { | ||||||
|  |     let mut interval = interval(Duration::from_secs(HEARTBEAT_INTERVAL)); | ||||||
|     loop { |     loop { | ||||||
|         sleep(Duration::from_secs(60)).await; |         interval.tick().await; | ||||||
|         println!("Heartbeat..."); |         println!("Heartbeat..."); | ||||||
|         let update = (my_ip.clone(), state.get_my_info()).into(); |         state.remove_inactive_nodes(HEARTBEAT_INTERVAL * 3); | ||||||
|         let _ = tx.send(InternalNodeUpdate { sender_ip: my_ip.clone(), update }); |         let connected_ips = state.get_connected_ips(); | ||||||
|         state.remove_inactive_nodes(); |         println!("Connected nodes ({}): {:?}", connected_ips.len(), connected_ips); | ||||||
|  |         let _ = tx.send((state.get_my_ip(), state.get_my_info()).into()); | ||||||
|  |         if connected_ips.len() < NUM_CONNECTIONS { | ||||||
|  |             if let Some(node_ip) = state.get_random_disconnected_ip() { | ||||||
|  |                 println!("Dialing random node {}", node_ip); | ||||||
|  |                 tasks.spawn(grpc_new_conn(node_ip, state.clone(), ra_cfg.clone(), tx.clone())); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         let _ = tasks.try_join_next(); | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn get_sol_client(state: Arc<State>, ratls_config: RaTlsConfig) -> SolClient { | async fn init_token(state: Arc<State>, ra_cfg: RaTlsConfig) -> SolClient { | ||||||
|  |     // First try to get the keys from the disk
 | ||||||
|     match KeysFile::read(KEYS_FILE) { |     match KeysFile::read(KEYS_FILE) { | ||||||
|         Ok(keys_file) => { |         Ok(keys_file) => { | ||||||
|             let sol_client = SolClient::try_from(keys_file) |             // If unsealed keys are corrupted then it is very bad, we should panic
 | ||||||
|                 .map_err(|e| { |             let sol_client = SolClient::try_from(keys_file).unwrap(); | ||||||
|                     println!("Read malformed keys from the disk: {e}"); |             println!("Found sealed wallet {}", sol_client.get_wallet_pubkey()); | ||||||
|                     state.increase_net_attacks(); |             println!("The address of the Token is {}", sol_client.get_token_address()); | ||||||
|                 }) |  | ||||||
|                 .unwrap(); |  | ||||||
|             println!( |  | ||||||
|                 "Found the following wallet saved to disk: {}", |  | ||||||
|                 sol_client.get_wallet_pubkey() |  | ||||||
|             ); |  | ||||||
|             println!("Loading token mint address {}", sol_client.get_token_address()); |  | ||||||
|             return sol_client; |             return sol_client; | ||||||
|         } |         } | ||||||
|         Err(SealError::Attack(e)) => { |         Err(SealError::Attack(e)) => { | ||||||
|             println!("The local keys file is corrupted: {}", e); |             println!("The sealed keys are corrupted: {}", e); | ||||||
|             state.increase_disk_attacks(); |             state.increase_disk_attacks(); | ||||||
|         } |         } | ||||||
|         Err(e) => { |         Err(e) => { | ||||||
|             println!("Could not read keys file: {e}"); |             println!("Could not read sealed keys: {e}"); | ||||||
|         } |         } | ||||||
|     }; |     }; | ||||||
| 
 | 
 | ||||||
|     let init_nodes = match File::open(INIT_NODES_FILE) { |     let node_ips = match read_to_string(INIT_NODES_FILE) { | ||||||
|         Ok(init_nodes) => init_nodes, |         Ok(node_ips) => node_ips, | ||||||
|         Err(_) => { |         Err(_) => { | ||||||
|             println!("Can't initialize using init nodes from {INIT_NODES_FILE}"); |             println!("I am root, creating a new Token..."); | ||||||
|             println!("Starting a new network with a new key..."); |             let sol_client = SolClient::create_new_token().await; | ||||||
|             return SolClient::new().await; |             println!("The address of the Token is {}", sol_client.get_token_address()); | ||||||
|  |             println!("Sealing keys in the {KEYS_FILE}"); | ||||||
|  |             if let Err(e) = sol_client.get_keys_file().write(KEYS_FILE) { | ||||||
|  |                 println!("Could not seal keys: {e}"); | ||||||
|  |             } | ||||||
|  |             return sol_client; | ||||||
|         } |         } | ||||||
|     }; |     }; | ||||||
| 
 | 
 | ||||||
|     let init_nodes_reader = BufReader::new(init_nodes); |     for node_ip in node_ips.lines().map(String::from) { | ||||||
|     for init_node_ip in init_nodes_reader.lines().map(|l| l.unwrap()) { |         match grpc_query_keys(node_ip.clone(), state.clone(), ra_cfg.clone()).await { | ||||||
|         match grpc::client::key_grabber(state.clone(), init_node_ip, ratls_config.clone()).await { |  | ||||||
|             Ok(keys) => { |             Ok(keys) => { | ||||||
|                 let sol_client = SolClient::try_from(keys.clone()) |                 // If grpc keys are corrupted then it is very bad, we should panic
 | ||||||
|                     .map_err(|e| { |                 let sol_client = SolClient::try_from(keys.clone()).unwrap(); | ||||||
|                         println!("Received malformed keys from the network: {e}"); |                 println!("Connected network wallet {}", sol_client.get_wallet_pubkey()); | ||||||
|                         state.increase_net_attacks(); |  | ||||||
|                     }) |  | ||||||
|                     .unwrap(); |  | ||||||
|                 println!( |  | ||||||
|                     "Got keypair from the network. Joining the network using wallet {}", |  | ||||||
|                     sol_client.get_wallet_pubkey() |  | ||||||
|                 ); |  | ||||||
|                 println!("The address of the Token is {}", sol_client.get_token_address()); |                 println!("The address of the Token is {}", sol_client.get_token_address()); | ||||||
|                 println!("Saving this data to disk in the file {KEYS_FILE}"); |                 println!("Sealing keys in the {KEYS_FILE}"); | ||||||
|                 if let Err(e) = sol_client.get_keys_file().write(KEYS_FILE) { |                 if let Err(e) = sol_client.get_keys_file().write(KEYS_FILE) { | ||||||
|                     println!("Could not save data to disk: {e}"); |                     println!("Could not seal keys: {e}"); | ||||||
|                 } |                 } | ||||||
|                 return sol_client; |                 return sol_client; | ||||||
|             } |             } | ||||||
|             Err(e) => { |             Err(e) => { | ||||||
|                 println!("Could not get keypair: {e:?}"); |                 println!("Could not query keys from {node_ip}: {e:?}"); | ||||||
|             } |             } | ||||||
|         }; |         }; | ||||||
|     } |     } | ||||||
|     panic!("could not get keypair."); |     panic!("Can't initialize the Token"); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[tokio::main] | #[tokio::main] | ||||||
| async fn main() { | async fn main() { | ||||||
|     env_logger::init_from_env(env_logger::Env::default().default_filter_or("warn")); |     env_logger::init_from_env(env_logger::Env::default().default_filter_or("warn")); | ||||||
|     let ratls_config = RaTlsConfig::new() |     let mrenclave = InstanceMeasurement::new().with_current_mrenclave().unwrap(); | ||||||
|         .allow_instance_measurement(InstanceMeasurement::new().with_current_mrenclave().unwrap()); |     let ra_cfg = RaTlsConfig::new().allow_instance_measurement(mrenclave); | ||||||
|     let my_ip = resolve_my_ip().await.unwrap(); |     let my_ip = resolve_my_ipv4().await; // Guaranteed to be correct IPv4
 | ||||||
|     println!("Starting on IP {}", my_ip); |     println!("Starting on IP {}", my_ip); | ||||||
| 
 | 
 | ||||||
|     let state = Arc::new(State::new(my_ip.clone())); |     let state = Arc::new(State::new(my_ip.clone())); | ||||||
|     let sol_client = Arc::new(get_sol_client(state.clone(), ratls_config.clone()).await); |     let sol_client = Arc::new(init_token(state.clone(), ra_cfg.clone()).await); | ||||||
| 
 |  | ||||||
|     let (tx, _) = broadcast::channel(500); |  | ||||||
| 
 | 
 | ||||||
|     let mut tasks = JoinSet::new(); |     let mut tasks = JoinSet::new(); | ||||||
|  |     let (tx, _) = broadcast::channel(500); | ||||||
| 
 | 
 | ||||||
|     tasks.spawn(heartbeat_cron(my_ip.clone(), state.clone(), tx.clone())); |     // HTTP and GRPC servers
 | ||||||
|     tasks.spawn(http_server::init(state.clone(), sol_client.clone())); |     tasks.spawn(grpc_new_server(state.clone(), sol_client.get_keys(), ra_cfg.clone(), tx.clone())); | ||||||
|     tasks.spawn( |     tasks.spawn(http_new_server(state.clone(), sol_client)); | ||||||
|         grpc::server::MyServer::init( |  | ||||||
|             state.clone(), |  | ||||||
|             sol_client.get_keys(), |  | ||||||
|             ratls_config.clone(), |  | ||||||
|             tx.clone(), |  | ||||||
|         ) |  | ||||||
|         .start(), |  | ||||||
|     ); |  | ||||||
| 
 | 
 | ||||||
|     if let Ok(input) = std::fs::read_to_string(INIT_NODES_FILE) { |     if let Ok(node_ips) = read_to_string(INIT_NODES_FILE) { | ||||||
|         for line in input.lines() { |         for node_ip in node_ips.lines().take(NUM_CONNECTIONS).map(String::from) { | ||||||
|             tasks.spawn( |             tasks.spawn(grpc_new_conn(node_ip, state.clone(), ra_cfg.clone(), tx.clone())); | ||||||
|                 grpc::client::ConnManager::init( |  | ||||||
|                     my_ip.clone(), |  | ||||||
|                     state.clone(), |  | ||||||
|                     ratls_config.clone(), |  | ||||||
|                     tx.clone(), |  | ||||||
|                 ) |  | ||||||
|                 .start_with_node(line.to_string()), |  | ||||||
|             ); |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     for _ in 0..MAX_CONNECTIONS { |     // Heartbeat
 | ||||||
|         tasks.spawn( |     heartbeat(tasks, ra_cfg, state, tx).await; | ||||||
|             grpc::client::ConnManager::init( |  | ||||||
|                 my_ip.clone(), |  | ||||||
|                 state.clone(), |  | ||||||
|                 ratls_config.clone(), |  | ||||||
|                 tx.clone(), |  | ||||||
|             ) |  | ||||||
|             .start(), |  | ||||||
|         ); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     while let Some(Ok(_)) = tasks.join_next().await {} |  | ||||||
| 
 |  | ||||||
|     // task panicked
 |  | ||||||
|     println!("Shutting down..."); |  | ||||||
| } | } | ||||||
|  | |||||||
| @ -1,11 +1,9 @@ | |||||||
| use crate::datastore::NodeInfo; | use crate::{datastore::NodeInfo, grpc::challenge::Keys}; | ||||||
| use crate::grpc::challenge::Keys; |  | ||||||
| use detee_sgx::SgxError; | use detee_sgx::SgxError; | ||||||
|  | use once_cell::sync::Lazy; | ||||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||||
| use serde_with::{base64::Base64, serde_as}; | use serde_with::{base64::Base64, serde_as}; | ||||||
| use std::io::Write; | use std::{io::Write, sync::Mutex}; | ||||||
| use once_cell::sync::Lazy; |  | ||||||
| use std::sync::Mutex; |  | ||||||
| pub struct Logfile {} | pub struct Logfile {} | ||||||
| 
 | 
 | ||||||
| static LOG_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(())); | static LOG_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(())); | ||||||
| @ -41,9 +39,9 @@ impl From<Keys> for KeysFile { | |||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl Into<Keys> for KeysFile { | impl From<KeysFile> for Keys { | ||||||
|     fn into(self) -> Keys { |     fn from(kf: KeysFile) -> Keys { | ||||||
|         Keys { keypair: self.keypair, token_address: self.token } |         Keys { keypair: kf.keypair, token_address: kf.token } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -27,7 +27,7 @@ pub struct SolClient { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl SolClient { | impl SolClient { | ||||||
|     pub async fn new() -> Self { |     pub async fn create_new_token() -> Self { | ||||||
|         let client = RpcClient::new(RPC_URL); |         let client = RpcClient::new(RPC_URL); | ||||||
|         let keypair = Keypair::new(); |         let keypair = Keypair::new(); | ||||||
|         let token = create_token(&client, &keypair).await; |         let token = create_token(&client, &keypair).await; | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user