From efa36737a0d1036919bc898a3ecff2ca51a4b896 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Tue, 20 Aug 2024 02:10:41 +0300 Subject: [PATCH] network connecting --- scripts/start.sh | 2 +- src/datastore.rs | 86 ++++++++++++++++++++++++---------------------- src/grpc/client.rs | 6 +++- src/grpc/server.rs | 13 +++---- src/main.rs | 4 +-- 5 files changed, 59 insertions(+), 52 deletions(-) diff --git a/scripts/start.sh b/scripts/start.sh index 64de8fc..6ca118c 100755 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -3,6 +3,6 @@ # This script start the hacker challenge from within the docker container. # It's only purpose is to help bootstrap a test network. -echo $INIT_NODES > /detee_challenge_nodes +echo $INIT_NODES | tr ' ' '\n' > /detee_challenge_nodes /hacker-challenge diff --git a/src/datastore.rs b/src/datastore.rs index 5742170..6dc37b9 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -9,7 +9,7 @@ use std::time::UNIX_EPOCH; use tabled::{Table, Tabled}; use tokio::sync::Mutex; -#[derive(Clone, PartialEq)] +#[derive(Clone, PartialEq, Debug)] pub struct NodeInfo { pub pubkey: VerifyingKey, pub updated_at: SystemTime, @@ -64,27 +64,13 @@ impl Store { } } - pub async fn add_mock_node(&self, ip: IP) { - let mut csprng = OsRng; - let privkey = ed25519_dalek::SigningKey::generate(&mut csprng); - self.update_node( - ip, - NodeInfo { - pubkey: privkey.verifying_key(), - updated_at: std::time::SystemTime::now(), - online: true, - }, - ) - .await; - self.add_key(privkey.verifying_key(), privkey).await; - } - pub async fn tabled_node_list(&self) -> String { #[derive(Tabled)] struct OutputRow { ip: String, pubkey: String, age: u64, + online: bool, } let mut output = vec![]; for (ip, node_info) in self.nodes.lock().await.iter() { @@ -94,7 +80,13 @@ impl Store { .duration_since(node_info.updated_at) .unwrap_or(Duration::ZERO) .as_secs(); - output.push(OutputRow { ip, pubkey, age }); + let online = node_info.online; + output.push(OutputRow { + ip, + pubkey, + age, + online, + }); } Table::new(output).to_string() } @@ -158,36 +150,32 @@ impl Store { None => SystemTime::now(), }; - if let Some(old_pubkey) = self - .update_node( - node.ip, - NodeInfo { - pubkey, - updated_at: updated_at_std, - online: node.online, - }, - ) - .await - { - self.remove_key(&old_pubkey).await; - self.add_key(pubkey, privkey).await; - true + self.add_key(pubkey, privkey).await; + let node_info = NodeInfo { + pubkey, + updated_at: updated_at_std, + online: node.online, + }; + if let Some(mut old_node_info) = self.update_node(node.ip, node_info.clone()).await { + if !node_info.online { + old_node_info.online = false; + } + match old_node_info.ne(&node_info) { + true => { + self.remove_key(&old_node_info.pubkey).await; + true + } + false => false, + } } else { - false + true } } /// returns old pubkey if node got updated - async fn update_node(&self, ip: String, info: NodeInfo) -> Option { + async fn update_node(&self, ip: String, info: NodeInfo) -> Option { let mut nodes = self.nodes.lock().await; - match nodes.insert(ip, info.clone()) { - Some(old_info) => match old_info.pubkey.ne(&info.pubkey) { - // TODO: save old private key to disk using SGX Sealing - true => Some(old_info.pubkey), - false => None, - }, - None => None, - } + nodes.insert(ip, info.clone()) } pub async fn remove_node(&self, ip: &str) { @@ -200,6 +188,22 @@ impl Store { nodes.get(ip).cloned() } + pub async fn get_localhost(&self) -> NodeUpdate { + // these unwrap never fail + // you could trigger reset_localhost_keys on error, though + // TODO: do that so that code looks clean + let nodes = self.nodes.lock().await; + let keys = self.keys.lock().await; + let node_info = nodes.get("localhost").unwrap(); + let keypair = keys.get(&node_info.pubkey).unwrap(); + NodeUpdate { + ip: "localhost".to_string(), + keypair: hex::encode(keypair.as_bytes()), + updated_at: Some(prost_types::Timestamp::from(node_info.updated_at)), + online: false, + } + } + /// freshes the keys of the node and returns a protobuf for the network pub async fn reset_localhost_keys(&self) -> NodeUpdate { let mut csprng = OsRng; diff --git a/src/grpc/client.rs b/src/grpc/client.rs index d50abf2..83d58ae 100644 --- a/src/grpc/client.rs +++ b/src/grpc/client.rs @@ -27,7 +27,8 @@ impl ConnManager { } async fn connect(self, node_ip: String) { - let mut client = UpdateClient::connect(format!("{node_ip}:31373")) + println!("Connecting to {node_ip}..."); + let mut client = UpdateClient::connect(format!("http://{node_ip}:31373")) .await .unwrap(); @@ -36,7 +37,10 @@ impl ConnManager { let response = client.get_updates(rx_stream).await.unwrap(); let mut resp_stream = response.into_inner(); + let _ = self.tx.send(self.ds.get_localhost().await); + while let Some(mut update) = resp_stream.message().await.unwrap() { + println!("Received message"); // "localhost" IPs need to be changed to the real IP of the counterpart if update.ip == "localhost" { update.ip = node_ip.clone(); diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 8d0c486..99d95e0 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -62,16 +62,11 @@ impl Update for MyServer { // note that we don't set this node online, // as it can be behind NAT } - if ds.process_grpc_update(update.clone()).await { + if update.ip != "127.0.0.1" && ds.process_grpc_update(update.clone()).await { if let Err(_) = tx.send(update.clone()) { println!("tokio broadcast receivers had an issue consuming the channel"); } }; - // disconnect client if too many connections are active - if tx.receiver_count() > 9 { - yield Err(Status::internal("Already have too many clients. Connect to another server.")); - return; - } } Err(e) => { yield Err(Status::internal(format!("Error receiving client stream: {}", e))); @@ -80,7 +75,13 @@ impl Update for MyServer { } } Ok(update) = rx.recv() => { + println!("Sending message."); yield Ok(update); + // disconnect client if too many connections are active + if tx.receiver_count() > 9 { + yield Err(Status::internal("Already have too many clients. Connect to another server.")); + return; + } } } diff --git a/src/main.rs b/src/main.rs index e8527db..94a5d47 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,9 +11,7 @@ async fn main() { let ds: Arc = Arc::new(Store::init()); let (tx, mut _rx) = broadcast::channel(500); - // ds.add_mock_node("1.1.1.1".to_string()).await; - // ds.add_mock_node("1.2.3.4".to_string()).await; - // ds.add_mock_node("1.2.2.2".to_string()).await; + ds.reset_localhost_keys().await; let mut join_set = JoinSet::new();