#![allow(dead_code)] use super::challenge::NodeUpdate; use crate::datastore::Store; use crate::grpc::challenge::update_client::UpdateClient; use crate::grpc::challenge::Empty; use solana_sdk::signature::keypair::Keypair; use std::sync::Arc; use tokio::sync::broadcast::Sender; use tokio::time::{sleep, Duration}; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; use crate::datastore::LOCALHOST; #[derive(Clone)] pub struct ConnManager { ds: Arc, tx: Sender, } impl ConnManager { pub fn init(ds: Arc, tx: Sender) -> Self { Self { ds, tx } } pub async fn start_with_node(self, node_ip: String) { self.connect_wrapper(node_ip).await; } pub async fn start(self) { loop { if let Some(node) = self.ds.get_random_node() { if node != LOCALHOST { self.connect_wrapper(node.clone()).await; } } sleep(Duration::from_secs(3)).await; } } async fn connect_wrapper(&self, node_ip: String) { let ds = self.ds.clone(); ds.add_conn(&node_ip); if let Err(e) = self.connect(node_ip.clone()).await { println!("Client connection for {node_ip} failed: {e:?}"); } ds.delete_conn(&node_ip); } async fn connect(&self, node_ip: String) -> Result<(), Box> { println!("Connecting to {node_ip}..."); let mut client = UpdateClient::connect(format!("http://{node_ip}:31373")).await?; let rx = self.tx.subscribe(); let rx_stream = BroadcastStream::new(rx).filter_map(|n| n.ok()); let response = client.get_updates(rx_stream).await?; let mut resp_stream = response.into_inner(); let _ = self.tx.send((LOCALHOST.to_string(), self.ds.get_localhost()).into()); while let Some(mut update) = resp_stream.message().await? { // "localhost" IPs need to be changed to the real IP of the counterpart if update.ip == LOCALHOST { update.ip = node_ip.clone(); // since we are connecting TO this server, we have a guarantee that this // server is not behind NAT, so we can set it public update.public = true; } // update the entire network in case the information is new if self.ds.process_node_update(update.clone().into()).await { if let Err(_) = self.tx.send(update.clone()) { println!("tokio broadcast receivers had an issue consuming the channel"); } }; } Ok(()) } } pub async fn key_grabber(node_ip: String) -> Result> { let mut client = UpdateClient::connect(format!("http://{node_ip}:31373")).await?; let response = client.get_keypair(tonic::Request::new(Empty {})).await?; let response = &response.into_inner().keypair; let keypair = match std::panic::catch_unwind(|| Keypair::from_base58_string(&response)) { Ok(k) => k, Err(_) => return Err("Could not parse key".into()), }; Ok(keypair) }