diff --git a/mint_sol/src/main.rs b/mint_sol/src/main.rs index 9f86767..7633dfc 100644 --- a/mint_sol/src/main.rs +++ b/mint_sol/src/main.rs @@ -14,6 +14,8 @@ fn main() { } fn send_sol(receiver: &str) { + // If this account uses all SOL, feel free to top up at 4qhFFULVHMpemFKGhpLfsVv126uruJgCxg2idLxedoa3 + // https://solana.com/es/developers/guides/getstarted/solana-token-airdrop-and-faucets#2-web-faucet let private_key_with_sol: [u8; 64] = [ 39, 134, 81, 114, 233, 110, 215, 232, 203, 125, 133, 232, 212, 223, 75, 196, 115, 246, 42, 121, 212, 231, 156, 82, 191, 86, 7, 217, 17, 241, 98, 12, 57, 12, 114, 15, 167, 208, 130, diff --git a/src/datastore.rs b/src/datastore.rs index a080673..727adc9 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -117,10 +117,12 @@ impl State { pub fn add_conn(&self, ip: &str) { self.conns.insert(ip.to_string()); + self.increase_mratls_conns(); } pub fn delete_conn(&self, ip: &str) { self.conns.remove(ip); + self.decrease_mratls_conns(); } pub fn increase_mint_requests(&self) { @@ -187,8 +189,10 @@ impl State { my_info.clone() } - /// This returns true if NodeInfo got modified. + /// This returns true if the update should be further forwarded + /// For example, we never forward our own updates that came back pub fn process_node_update(&self, (ip, node_info): (String, NodeInfo)) -> bool { + let is_update_mine = ip == self.my_ip; let is_update_new = self .nodes .get(&ip) @@ -199,13 +203,17 @@ impl State { node_info.log(&ip); self.nodes.insert(ip, node_info); } - is_update_new + is_update_new && !is_update_mine } pub fn get_node_list(&self) -> Vec<(String, NodeInfo)> { self.nodes.clone().into_iter().collect() } + pub fn get_my_ip(&self) -> String { + self.my_ip.clone() + } + // returns a random node that does not have an active connection pub fn get_random_disconnected_node(&self) -> Option { use rand::{rngs::OsRng, RngCore}; diff --git a/src/grpc/client.rs b/src/grpc/client.rs index 31a4355..3ae2f4a 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -36,6 +36,7 @@ impl ConnManager { pub async fn start(self) { loop { if let Some(ip) = self.state.get_random_disconnected_node() { + println!("Found random disconnected node {ip}"); self.connect_wrapper(ip.clone()).await; } sleep(Duration::from_secs(3)).await; @@ -43,12 +44,12 @@ impl ConnManager { } async fn connect_wrapper(&self, node_ip: String) { - let ds = self.state.clone(); - ds.add_conn(&node_ip); + let state = self.state.clone(); + state.add_conn(&node_ip); if let Err(e) = self.connect(node_ip.clone()).await { println!("Client connection for {node_ip} failed: {e:?}"); } - ds.delete_conn(&node_ip); + state.delete_conn(&node_ip); } async fn connect(&self, node_ip: String) -> Result<(), Box> { @@ -107,7 +108,7 @@ impl ConnManager { if self.state.process_node_update(update.clone().into()) && self.tx.send(update.clone()).is_err() { - println!("tokio broadcast receivers had an issue consuming the channel"); + println!("Tokio broadcast receivers had an issue consuming the channel"); }; } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index a11b13e..25a7ec7 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -136,10 +136,11 @@ impl Update for MyServer { let mut rx = self.tx.subscribe(); let mut inbound = req.into_inner(); let state = self.state.clone(); + let my_ip = self.state.get_my_ip(); let stream = async_stream::stream! { state.declare_myself_public(); - state.increase_mratls_conns(); + state.add_conn(&remote_ip); let full_update_list: Vec = state.get_node_list().into_iter().map(Into::::into).collect(); for update in full_update_list { @@ -152,11 +153,15 @@ impl Update for MyServer { Some(msg) = inbound.next() => { match msg { Ok(update) => { - if update.ip != remote_ip { - println!("node {remote_ip} is forwarding us the update for {}", update.ip); + if update.ip == remote_ip { + println!("Node {remote_ip} is sending it's own update"); + } else if update.ip == my_ip { + println!("Node {remote_ip} is forwarding our past update"); + } else { + println!("Node {remote_ip} is forwarding the update of {}", update.ip); } if state.process_node_update(update.clone().into()) && tx.send(update.clone()).is_err() { - println!("tokio broadcast receivers had an issue consuming the channel"); + println!("Tokio broadcast receivers had an issue consuming the channel"); }; } Err(e) => { @@ -175,7 +180,7 @@ impl Update for MyServer { } } } - state.decrease_mratls_conns(); + state.delete_conn(&remote_ip); yield Err(error_status); }; diff --git a/src/main.rs b/src/main.rs index 2a08c28..4278009 100644 --- a/src/main.rs +++ b/src/main.rs @@ -48,6 +48,7 @@ async fn resolve_my_ip() -> Result { pub async fn heartbeat_cron(my_ip: String, state: Arc, tx: Sender) { loop { sleep(Duration::from_secs(60)).await; + println!("Heartbeat..."); let _ = tx.send((my_ip.clone(), state.get_my_info()).into()); state.remove_inactive_nodes(); }