use super::challenge::NodeUpdate; use crate::datastore::Store; use crate::grpc::challenge::update_client::UpdateClient; use std::sync::Arc; use tokio::sync::broadcast::Sender; use tokio::time::{sleep, Duration}; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; #[derive(Clone)] pub struct ConnManager { ds: Arc, tx: Sender, } impl ConnManager { pub fn init(ds: Arc, tx: Sender) -> Self { Self { ds, tx } } pub async fn start_with_node(self, node_ip: String) { self.connect_wrapper(node_ip).await; } pub async fn start(self) { loop { if let Some(node) = self.ds.get_random_node().await { if node != "localhost" { self.connect_wrapper(node.clone()).await; } } sleep(Duration::from_secs(3)).await; } } async fn connect_wrapper(&self, node_ip: String) { let ds = self.ds.clone(); ds.add_conn(&node_ip).await; if let Err(e) = self.connect(node_ip.clone()).await { println!("Client connection for {node_ip} failed: {e:?}"); } ds.delete_conn(&node_ip).await; } async fn connect(&self, node_ip: String) -> Result<(), Box> { println!("Connecting to {node_ip}..."); let mut client = UpdateClient::connect(format!("http://{node_ip}:31373")).await?; let rx = self.tx.subscribe(); let rx_stream = BroadcastStream::new(rx).filter_map(|n| n.ok()); let response = client.get_updates(rx_stream).await?; let mut resp_stream = response.into_inner(); let _ = self.tx.send(self.ds.get_localhost().await); while let Some(mut update) = resp_stream.message().await? { // "localhost" IPs need to be changed to the real IP of the counterpart if update.ip == "localhost" { update.ip = node_ip.clone(); // since we are connecting TO this server, we have a guarantee that this // server is not behind NAT, so we can set it public update.public = true; } // update the entire network in case the information is new if self.ds.process_node_update(update.clone()).await { if let Err(_) = self.tx.send(update.clone()) { println!("tokio broadcast receivers had an issue consuming the channel"); } }; } Ok(()) } }