From d47f71c9414806a313a6087afccf27d14c7f65ad Mon Sep 17 00:00:00 2001 From: ghe0 Date: Mon, 19 Aug 2024 16:29:16 +0300 Subject: [PATCH] improved logic for online/offline status --- src/datastore.rs | 36 ++++++++++++++++++++++++++++-------- src/grpc/client.rs | 14 ++++++++++++-- src/grpc/server.rs | 21 ++++++++++++--------- 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/src/datastore.rs b/src/datastore.rs index 6b5acbb..bfe6e71 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -182,6 +182,7 @@ impl Store { let mut nodes = self.nodes.lock().await; match nodes.insert(ip, info.clone()) { Some(old_info) => match old_info.pubkey.ne(&info.pubkey) { + // TODO: save old private key to disk using SGX Sealing true => Some(old_info.pubkey), false => None, }, @@ -199,20 +200,32 @@ impl Store { nodes.get(ip).cloned() } - pub async fn cycle_self_key(&self) { + /// freshes the keys of the node and returns a protobuf for the network + pub async fn reset_localhost_keys(&self) -> NodeUpdate { let mut csprng = OsRng; - // TODO: save old private key to disk using SGX Sealing - let privkey = ed25519_dalek::SigningKey::generate(&mut csprng); + let keypair_raw = ed25519_dalek::SigningKey::generate(&mut csprng); + let keypair = hex::encode(keypair_raw.as_bytes()); + let pubkey = keypair_raw.verifying_key(); + let ip = "localhost".to_string(); + let updated_at = std::time::SystemTime::now(); + let online = false; self.update_node( - "localhost".to_string(), + ip.clone(), NodeInfo { - pubkey: privkey.verifying_key(), - updated_at: std::time::SystemTime::now(), - online: true, + pubkey, + updated_at, + online, }, ) .await; - self.add_key(privkey.verifying_key(), privkey).await; + self.add_key(pubkey, keypair_raw.clone()).await; + let updated_at = Some(prost_types::Timestamp::from(updated_at)); + NodeUpdate { + ip, + keypair, + updated_at, + online, + } } pub async fn get_full_node_list(&self) -> Vec { @@ -253,4 +266,11 @@ impl Store { } random_nodes } + + pub async fn set_online(&self, ip: &str, online: bool) { + let mut nodes = self.nodes.lock().await; + if let Some(node) = nodes.get_mut(ip) { + node.online = online; + } + } } diff --git a/src/grpc/client.rs b/src/grpc/client.rs index 15fc38c..ea1796e 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -35,20 +35,30 @@ impl ConnManager { let response = client.get_updates(rx_stream).await.unwrap(); let mut resp_stream = response.into_inner(); - while let Some(update) = resp_stream.message().await.unwrap() { + while let Some(mut update) = resp_stream.message().await.unwrap() { + + // "localhost" IPs need to be changed to the real IP of the counterpart + if update.ip == "localhost" { + update.ip = node_ip.clone(); + // since we are connecting TO this server, we have a guarantee that this + // server is not behind NAT, so we can set it online + update.online = true; + } + + // update the entire network in case the information is new if self.ds.process_grpc_update(update.clone()).await { if let Err(_) = self.tx.send(update.clone()) { println!("tokio broadcast receivers had an issue consuming the channel"); } }; } - } } fn init_connections(ds: Arc, tx: Sender) -> JoinHandle<()> { std::thread::spawn(move || { tokio::runtime::Runtime::new().unwrap().block_on(async { + // we rotate online and offline nodes, to constantly check new nodes let mut only_online_nodes = true; loop { let mut set = JoinSet::new(); diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 5d45618..f9dcdff 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -57,21 +57,26 @@ impl Update for MyServer { yield Ok(update); } - if tx.receiver_count() > 10 { - yield Err(Status::internal("Already have too many clients. Connect to another server.")); - return; - } - loop { tokio::select! { Some(msg) = inbound.next() => { match msg { - Ok(update) => { + Ok(mut update) => { + if update.ip == "localhost" { + update.ip = remote_ip.clone(); + // note that we don't set this node online, + // as it can be behind NAT + } if ds.process_grpc_update(update.clone()).await { if let Err(_) = tx.send(update.clone()) { println!("tokio broadcast receivers had an issue consuming the channel"); } }; + // disconnect client if too many connections are active + if tx.receiver_count() > 9 { + yield Err(Status::internal("Already have too many clients. Connect to another server.")); + return; + } } Err(e) => { yield Err(Status::internal(format!("Error receiving client stream: {}", e))); @@ -80,9 +85,7 @@ impl Update for MyServer { } } Ok(update) = rx.recv() => { - if update.ip != remote_ip { - yield Ok(update); - } + yield Ok(update); } }