info forwarding optimisation

Signed-off-by: Valentyn Faychuk <valy@detee.ltd>
This commit is contained in:
Valentyn Faychuk 2024-12-22 23:03:20 +02:00
parent 0a8f5d29f4
commit e15aa63af9
Signed by: valy
GPG Key ID: F1AB995E20FEADC5
5 changed files with 28 additions and 11 deletions

@ -14,6 +14,8 @@ fn main() {
} }
fn send_sol(receiver: &str) { fn send_sol(receiver: &str) {
// If this account uses all SOL, feel free to top up at 4qhFFULVHMpemFKGhpLfsVv126uruJgCxg2idLxedoa3
// https://solana.com/es/developers/guides/getstarted/solana-token-airdrop-and-faucets#2-web-faucet
let private_key_with_sol: [u8; 64] = [ let private_key_with_sol: [u8; 64] = [
39, 134, 81, 114, 233, 110, 215, 232, 203, 125, 133, 232, 212, 223, 75, 196, 115, 246, 42, 39, 134, 81, 114, 233, 110, 215, 232, 203, 125, 133, 232, 212, 223, 75, 196, 115, 246, 42,
121, 212, 231, 156, 82, 191, 86, 7, 217, 17, 241, 98, 12, 57, 12, 114, 15, 167, 208, 130, 121, 212, 231, 156, 82, 191, 86, 7, 217, 17, 241, 98, 12, 57, 12, 114, 15, 167, 208, 130,

@ -117,10 +117,12 @@ impl State {
pub fn add_conn(&self, ip: &str) { pub fn add_conn(&self, ip: &str) {
self.conns.insert(ip.to_string()); self.conns.insert(ip.to_string());
self.increase_mratls_conns();
} }
pub fn delete_conn(&self, ip: &str) { pub fn delete_conn(&self, ip: &str) {
self.conns.remove(ip); self.conns.remove(ip);
self.decrease_mratls_conns();
} }
pub fn increase_mint_requests(&self) { pub fn increase_mint_requests(&self) {
@ -187,8 +189,10 @@ impl State {
my_info.clone() my_info.clone()
} }
/// This returns true if NodeInfo got modified. /// This returns true if the update should be further forwarded
/// For example, we never forward our own updates that came back
pub fn process_node_update(&self, (ip, node_info): (String, NodeInfo)) -> bool { pub fn process_node_update(&self, (ip, node_info): (String, NodeInfo)) -> bool {
let is_update_mine = ip == self.my_ip;
let is_update_new = self let is_update_new = self
.nodes .nodes
.get(&ip) .get(&ip)
@ -199,13 +203,17 @@ impl State {
node_info.log(&ip); node_info.log(&ip);
self.nodes.insert(ip, node_info); self.nodes.insert(ip, node_info);
} }
is_update_new is_update_new && !is_update_mine
} }
pub fn get_node_list(&self) -> Vec<(String, NodeInfo)> { pub fn get_node_list(&self) -> Vec<(String, NodeInfo)> {
self.nodes.clone().into_iter().collect() self.nodes.clone().into_iter().collect()
} }
pub fn get_my_ip(&self) -> String {
self.my_ip.clone()
}
// returns a random node that does not have an active connection // returns a random node that does not have an active connection
pub fn get_random_disconnected_node(&self) -> Option<String> { pub fn get_random_disconnected_node(&self) -> Option<String> {
use rand::{rngs::OsRng, RngCore}; use rand::{rngs::OsRng, RngCore};

@ -36,6 +36,7 @@ impl ConnManager {
pub async fn start(self) { pub async fn start(self) {
loop { loop {
if let Some(ip) = self.state.get_random_disconnected_node() { if let Some(ip) = self.state.get_random_disconnected_node() {
println!("Found random disconnected node {ip}");
self.connect_wrapper(ip.clone()).await; self.connect_wrapper(ip.clone()).await;
} }
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(3)).await;
@ -43,12 +44,12 @@ impl ConnManager {
} }
async fn connect_wrapper(&self, node_ip: String) { async fn connect_wrapper(&self, node_ip: String) {
let ds = self.state.clone(); let state = self.state.clone();
ds.add_conn(&node_ip); state.add_conn(&node_ip);
if let Err(e) = self.connect(node_ip.clone()).await { if let Err(e) = self.connect(node_ip.clone()).await {
println!("Client connection for {node_ip} failed: {e:?}"); println!("Client connection for {node_ip} failed: {e:?}");
} }
ds.delete_conn(&node_ip); state.delete_conn(&node_ip);
} }
async fn connect(&self, node_ip: String) -> Result<(), Box<dyn std::error::Error>> { async fn connect(&self, node_ip: String) -> Result<(), Box<dyn std::error::Error>> {
@ -107,7 +108,7 @@ impl ConnManager {
if self.state.process_node_update(update.clone().into()) if self.state.process_node_update(update.clone().into())
&& self.tx.send(update.clone()).is_err() && self.tx.send(update.clone()).is_err()
{ {
println!("tokio broadcast receivers had an issue consuming the channel"); println!("Tokio broadcast receivers had an issue consuming the channel");
}; };
} }

@ -136,10 +136,11 @@ impl Update for MyServer {
let mut rx = self.tx.subscribe(); let mut rx = self.tx.subscribe();
let mut inbound = req.into_inner(); let mut inbound = req.into_inner();
let state = self.state.clone(); let state = self.state.clone();
let my_ip = self.state.get_my_ip();
let stream = async_stream::stream! { let stream = async_stream::stream! {
state.declare_myself_public(); state.declare_myself_public();
state.increase_mratls_conns(); state.add_conn(&remote_ip);
let full_update_list: Vec<NodeUpdate> = state.get_node_list().into_iter().map(Into::<NodeUpdate>::into).collect(); let full_update_list: Vec<NodeUpdate> = state.get_node_list().into_iter().map(Into::<NodeUpdate>::into).collect();
for update in full_update_list { for update in full_update_list {
@ -152,11 +153,15 @@ impl Update for MyServer {
Some(msg) = inbound.next() => { Some(msg) = inbound.next() => {
match msg { match msg {
Ok(update) => { Ok(update) => {
if update.ip != remote_ip { if update.ip == remote_ip {
println!("node {remote_ip} is forwarding us the update for {}", update.ip); println!("Node {remote_ip} is sending it's own update");
} else if update.ip == my_ip {
println!("Node {remote_ip} is forwarding our past update");
} else {
println!("Node {remote_ip} is forwarding the update of {}", update.ip);
} }
if state.process_node_update(update.clone().into()) && tx.send(update.clone()).is_err() { if state.process_node_update(update.clone().into()) && tx.send(update.clone()).is_err() {
println!("tokio broadcast receivers had an issue consuming the channel"); println!("Tokio broadcast receivers had an issue consuming the channel");
}; };
} }
Err(e) => { Err(e) => {
@ -175,7 +180,7 @@ impl Update for MyServer {
} }
} }
} }
state.decrease_mratls_conns(); state.delete_conn(&remote_ip);
yield Err(error_status); yield Err(error_status);
}; };

@ -48,6 +48,7 @@ async fn resolve_my_ip() -> Result<String, Error> {
pub async fn heartbeat_cron(my_ip: String, state: Arc<State>, tx: Sender<NodeUpdate>) { pub async fn heartbeat_cron(my_ip: String, state: Arc<State>, tx: Sender<NodeUpdate>) {
loop { loop {
sleep(Duration::from_secs(60)).await; sleep(Duration::from_secs(60)).await;
println!("Heartbeat...");
let _ = tx.send((my_ip.clone(), state.get_my_info()).into()); let _ = tx.send((my_ip.clone(), state.get_my_info()).into());
state.remove_inactive_nodes(); state.remove_inactive_nodes();
} }