info forwarding optimisation

Signed-off-by: Valentyn Faychuk <valy@detee.ltd>
This commit is contained in:
Valentyn Faychuk 2024-12-22 23:03:20 +02:00
parent 0a8f5d29f4
commit 8fb3c4810f
Signed by: valy
GPG Key ID: F1AB995E20FEADC5
5 changed files with 11 additions and 5 deletions

@ -14,6 +14,8 @@ fn main() {
} }
fn send_sol(receiver: &str) { fn send_sol(receiver: &str) {
// If this account uses all SOL, feel free to top up at 4qhFFULVHMpemFKGhpLfsVv126uruJgCxg2idLxedoa3
// https://solana.com/es/developers/guides/getstarted/solana-token-airdrop-and-faucets#2-web-faucet
let private_key_with_sol: [u8; 64] = [ let private_key_with_sol: [u8; 64] = [
39, 134, 81, 114, 233, 110, 215, 232, 203, 125, 133, 232, 212, 223, 75, 196, 115, 246, 42, 39, 134, 81, 114, 233, 110, 215, 232, 203, 125, 133, 232, 212, 223, 75, 196, 115, 246, 42,
121, 212, 231, 156, 82, 191, 86, 7, 217, 17, 241, 98, 12, 57, 12, 114, 15, 167, 208, 130, 121, 212, 231, 156, 82, 191, 86, 7, 217, 17, 241, 98, 12, 57, 12, 114, 15, 167, 208, 130,

@ -187,8 +187,10 @@ impl State {
my_info.clone() my_info.clone()
} }
/// This returns true if NodeInfo got modified. /// This returns true if the update should be further forwarded
/// For example, we never forward our own updates that came back
pub fn process_node_update(&self, (ip, node_info): (String, NodeInfo)) -> bool { pub fn process_node_update(&self, (ip, node_info): (String, NodeInfo)) -> bool {
let is_update_mine = ip == self.my_ip;
let is_update_new = self let is_update_new = self
.nodes .nodes
.get(&ip) .get(&ip)
@ -199,7 +201,7 @@ impl State {
node_info.log(&ip); node_info.log(&ip);
self.nodes.insert(ip, node_info); self.nodes.insert(ip, node_info);
} }
is_update_new is_update_new && !is_update_mine
} }
pub fn get_node_list(&self) -> Vec<(String, NodeInfo)> { pub fn get_node_list(&self) -> Vec<(String, NodeInfo)> {

@ -36,6 +36,7 @@ impl ConnManager {
pub async fn start(self) { pub async fn start(self) {
loop { loop {
if let Some(ip) = self.state.get_random_disconnected_node() { if let Some(ip) = self.state.get_random_disconnected_node() {
println!("Found random disconnected node {ip}");
self.connect_wrapper(ip.clone()).await; self.connect_wrapper(ip.clone()).await;
} }
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(3)).await;
@ -107,7 +108,7 @@ impl ConnManager {
if self.state.process_node_update(update.clone().into()) if self.state.process_node_update(update.clone().into())
&& self.tx.send(update.clone()).is_err() && self.tx.send(update.clone()).is_err()
{ {
println!("tokio broadcast receivers had an issue consuming the channel"); println!("Tokio broadcast receivers had an issue consuming the channel");
}; };
} }

@ -153,10 +153,10 @@ impl Update for MyServer {
match msg { match msg {
Ok(update) => { Ok(update) => {
if update.ip != remote_ip { if update.ip != remote_ip {
println!("node {remote_ip} is forwarding us the update for {}", update.ip); println!("Node {remote_ip} is forwarding us the update for {}", update.ip);
} }
if state.process_node_update(update.clone().into()) && tx.send(update.clone()).is_err() { if state.process_node_update(update.clone().into()) && tx.send(update.clone()).is_err() {
println!("tokio broadcast receivers had an issue consuming the channel"); println!("Tokio broadcast receivers had an issue consuming the channel");
}; };
} }
Err(e) => { Err(e) => {

@ -48,6 +48,7 @@ async fn resolve_my_ip() -> Result<String, Error> {
pub async fn heartbeat_cron(my_ip: String, state: Arc<State>, tx: Sender<NodeUpdate>) { pub async fn heartbeat_cron(my_ip: String, state: Arc<State>, tx: Sender<NodeUpdate>) {
loop { loop {
sleep(Duration::from_secs(60)).await; sleep(Duration::from_secs(60)).await;
println!("Heartbeat...");
let _ = tx.send((my_ip.clone(), state.get_my_info()).into()); let _ = tx.send((my_ip.clone(), state.get_my_info()).into());
state.remove_inactive_nodes(); state.remove_inactive_nodes();
} }