bounceback optimisation
Signed-off-by: Valentyn Faychuk <valy@detee.ltd>
This commit is contained in:
		
							parent
							
								
									e15aa63af9
								
							
						
					
					
						commit
						f256e5e44f
					
				| @ -192,7 +192,7 @@ impl State { | ||||
|     /// This returns true if the update should be further forwarded
 | ||||
|     /// For example, we never forward our own updates that came back
 | ||||
|     pub fn process_node_update(&self, (ip, node_info): (String, NodeInfo)) -> bool { | ||||
|         let is_update_mine = ip == self.my_ip; | ||||
|         let is_update_mine = ip.eq(&self.my_ip); | ||||
|         let is_update_new = self | ||||
|             .nodes | ||||
|             .get(&ip) | ||||
|  | ||||
| @ -1,4 +1,5 @@ | ||||
| use super::challenge::{Keys, NodeUpdate}; | ||||
| use super::challenge::Keys; | ||||
| use super::InternalNodeUpdate; | ||||
| use crate::{ | ||||
|     datastore::State, | ||||
|     grpc::challenge::{update_client::UpdateClient, Empty}, | ||||
| @ -15,7 +16,7 @@ use tokio_stream::{wrappers::BroadcastStream, StreamExt}; | ||||
| pub struct ConnManager { | ||||
|     my_ip: String, | ||||
|     state: Arc<State>, | ||||
|     tx: Sender<NodeUpdate>, | ||||
|     tx: Sender<InternalNodeUpdate>, | ||||
|     ratls_config: RaTlsConfig, | ||||
| } | ||||
| 
 | ||||
| @ -24,7 +25,7 @@ impl ConnManager { | ||||
|         my_ip: String, | ||||
|         state: Arc<State>, | ||||
|         ratls_config: RaTlsConfig, | ||||
|         tx: Sender<NodeUpdate>, | ||||
|         tx: Sender<InternalNodeUpdate>, | ||||
|     ) -> Self { | ||||
|         Self { my_ip, state, ratls_config, tx } | ||||
|     } | ||||
| @ -52,13 +53,13 @@ impl ConnManager { | ||||
|         state.delete_conn(&node_ip); | ||||
|     } | ||||
| 
 | ||||
|     async fn connect(&self, node_ip: String) -> Result<(), Box<dyn std::error::Error>> { | ||||
|     async fn connect(&self, remote_ip: String) -> Result<(), Box<dyn std::error::Error>> { | ||||
|         use detee_sgx::RaTlsConfigBuilder; | ||||
|         use hyper::Uri; | ||||
|         use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor}; | ||||
|         use tokio_rustls::rustls::ClientConfig; | ||||
| 
 | ||||
|         println!("Connecting to {node_ip}..."); | ||||
|         println!("Connecting to {remote_ip}..."); | ||||
| 
 | ||||
|         let tls = ClientConfig::from_ratls_config(self.ratls_config.clone()) | ||||
|             .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?; | ||||
| @ -66,7 +67,7 @@ impl ConnManager { | ||||
|         let mut http = HttpConnector::new(); | ||||
|         http.enforce_http(false); | ||||
| 
 | ||||
|         let cloned_node_ip = node_ip.clone(); | ||||
|         let cloned_node_ip = remote_ip.clone(); | ||||
| 
 | ||||
|         let connector = tower::ServiceBuilder::new() | ||||
|             .layer_fn(move |s| { | ||||
| @ -91,9 +92,18 @@ impl ConnManager { | ||||
|         let mut client = UpdateClient::with_origin(client, uri); | ||||
| 
 | ||||
|         let rx = self.tx.subscribe(); | ||||
|         let rx_stream = BroadcastStream::new(rx).filter_map(|n| n.ok()); | ||||
|         let cloned_remote_ip = remote_ip.clone(); | ||||
|         let rx_stream = | ||||
|             BroadcastStream::new(rx).filter_map(|n| n.ok()).filter_map(move |int_update| { | ||||
|                 if int_update.sender_ip != cloned_remote_ip { | ||||
|                     Some(int_update.update) | ||||
|                 } else { | ||||
|                     None | ||||
|                 } | ||||
|             }); | ||||
| 
 | ||||
|         let response = client.get_updates(rx_stream).await.map_err(|e| { | ||||
|             println!("Error connecting to {node_ip}: {e}"); | ||||
|             println!("Error connecting to {remote_ip}: {e}"); | ||||
|             if e.to_string().contains("QuoteVerifyError") { | ||||
|                 self.state.increase_net_attacks(); | ||||
|             } | ||||
| @ -101,16 +111,26 @@ impl ConnManager { | ||||
|         })?; | ||||
|         let mut resp_stream = response.into_inner(); | ||||
| 
 | ||||
|         let _ = self.tx.send((self.my_ip.clone(), self.state.get_my_info()).into()); | ||||
|         // Immediately send our info as a network update
 | ||||
|         let my_info = (self.my_ip.clone(), self.state.get_my_info()).into(); | ||||
|         let _ = self.tx.send(InternalNodeUpdate { sender_ip: self.my_ip.clone(), update: my_info }); | ||||
| 
 | ||||
|         while let Some(update) = resp_stream.message().await? { | ||||
|             // update the entire network in case the information is new
 | ||||
|             if self.state.process_node_update(update.clone().into()) | ||||
|                 && self.tx.send(update.clone()).is_err() | ||||
|             if self.state.process_node_update(update.clone().into()) { | ||||
|                 // if process update returns true, the update must be forwarded
 | ||||
|                 if self | ||||
|                     .tx | ||||
|                     .send(InternalNodeUpdate { | ||||
|                         sender_ip: remote_ip.clone(), | ||||
|                         update: update.clone(), | ||||
|                     }) | ||||
|                     .is_err() | ||||
|                 { | ||||
|                     println!("Tokio broadcast receivers had an issue consuming the channel"); | ||||
|                 }; | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
| @ -7,6 +7,12 @@ pub mod challenge { | ||||
|     tonic::include_proto!("challenge"); | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone, PartialEq)] | ||||
| pub struct InternalNodeUpdate { | ||||
|     pub sender_ip: String, | ||||
|     pub update: NodeUpdate, | ||||
| } | ||||
| 
 | ||||
| impl From<(String, NodeInfo)> for NodeUpdate { | ||||
|     fn from((ip, info): (String, NodeInfo)) -> Self { | ||||
|         NodeUpdate { | ||||
|  | ||||
| @ -1,4 +1,5 @@ | ||||
| use super::challenge::{update_server::UpdateServer, Empty, Keys, NodeUpdate}; | ||||
| use super::InternalNodeUpdate; | ||||
| use crate::{datastore::State, grpc::challenge::update_server::Update}; | ||||
| use detee_sgx::RaTlsConfig; | ||||
| use rustls::pki_types::CertificateDer; | ||||
| @ -9,7 +10,7 @@ use tonic::{Request, Response, Status, Streaming}; | ||||
| 
 | ||||
| pub struct MyServer { | ||||
|     state: Arc<State>, | ||||
|     tx: Sender<NodeUpdate>, | ||||
|     tx: Sender<InternalNodeUpdate>, | ||||
|     ratls_config: RaTlsConfig, | ||||
|     keys: Keys, // For sending secret keys to new nodes ;)
 | ||||
| } | ||||
| @ -19,7 +20,7 @@ impl MyServer { | ||||
|         state: Arc<State>, | ||||
|         keys: Keys, | ||||
|         ratls_config: RaTlsConfig, | ||||
|         tx: Sender<NodeUpdate>, | ||||
|         tx: Sender<InternalNodeUpdate>, | ||||
|     ) -> Self { | ||||
|         Self { state, tx, keys, ratls_config } | ||||
|     } | ||||
| @ -160,10 +161,14 @@ impl Update for MyServer { | ||||
|                                 } else { | ||||
|                                     println!("Node {remote_ip} is forwarding the update of {}", update.ip); | ||||
|                                 } | ||||
|                                 if state.process_node_update(update.clone().into()) && tx.send(update.clone()).is_err() { | ||||
| 
 | ||||
|                                 if state.process_node_update(update.clone().into()) { | ||||
|                                     // if process update returns true, the update must be forwarded
 | ||||
|                                     if tx.send(InternalNodeUpdate { sender_ip: remote_ip.clone(), update: update.clone() }).is_err() { | ||||
|                                         println!("Tokio broadcast receivers had an issue consuming the channel"); | ||||
|                                     }; | ||||
|                                 } | ||||
|                             } | ||||
|                             Err(e) => { | ||||
|                                 error_status = Status::internal(format!("Error receiving client stream: {}", e)); | ||||
|                                 break; | ||||
| @ -171,7 +176,10 @@ impl Update for MyServer { | ||||
|                         } | ||||
|                     } | ||||
|                     Ok(update) = rx.recv() => { | ||||
|                         yield Ok(update); | ||||
|                         if update.sender_ip != remote_ip { | ||||
|                             // don't bounce back the update we just received
 | ||||
|                             yield Ok(update.update); | ||||
|                         } | ||||
|                         // disconnect client if too many connections are active
 | ||||
|                         if tx.receiver_count() > 9 { | ||||
|                             error_status = Status::internal("Already have too many clients. Connect to another server."); | ||||
|  | ||||
| @ -6,7 +6,8 @@ mod solana; | ||||
| 
 | ||||
| use crate::persistence::SealError; | ||||
| use crate::{ | ||||
|     grpc::challenge::NodeUpdate, persistence::KeysFile, persistence::SealedFile, solana::SolClient, | ||||
|     grpc::challenge::NodeUpdate, grpc::InternalNodeUpdate, persistence::KeysFile, | ||||
|     persistence::SealedFile, solana::SolClient, | ||||
| }; | ||||
| use datastore::State; | ||||
| use detee_sgx::{InstanceMeasurement, RaTlsConfig}; | ||||
| @ -45,11 +46,12 @@ async fn resolve_my_ip() -> Result<String, Error> { | ||||
|     Ok(format!("{}", ip)) | ||||
| } | ||||
| 
 | ||||
| pub async fn heartbeat_cron(my_ip: String, state: Arc<State>, tx: Sender<NodeUpdate>) { | ||||
| pub async fn heartbeat_cron(my_ip: String, state: Arc<State>, tx: Sender<InternalNodeUpdate>) { | ||||
|     loop { | ||||
|         sleep(Duration::from_secs(60)).await; | ||||
|         println!("Heartbeat..."); | ||||
|         let _ = tx.send((my_ip.clone(), state.get_my_info()).into()); | ||||
|         let update = (my_ip.clone(), state.get_my_info()).into(); | ||||
|         let _ = tx.send(InternalNodeUpdate { sender_ip: my_ip.clone(), update }); | ||||
|         state.remove_inactive_nodes(); | ||||
|     } | ||||
| } | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user