network connecting

This commit is contained in:
ghe0 2024-08-20 02:10:41 +03:00
parent 5da4ec3785
commit efa36737a0
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
5 changed files with 59 additions and 52 deletions

@ -3,6 +3,6 @@
# This script start the hacker challenge from within the docker container.
# It's only purpose is to help bootstrap a test network.
echo $INIT_NODES > /detee_challenge_nodes
echo $INIT_NODES | tr ' ' '\n' > /detee_challenge_nodes
/hacker-challenge

@ -9,7 +9,7 @@ use std::time::UNIX_EPOCH;
use tabled::{Table, Tabled};
use tokio::sync::Mutex;
#[derive(Clone, PartialEq)]
#[derive(Clone, PartialEq, Debug)]
pub struct NodeInfo {
pub pubkey: VerifyingKey,
pub updated_at: SystemTime,
@ -64,27 +64,13 @@ impl Store {
}
}
pub async fn add_mock_node(&self, ip: IP) {
let mut csprng = OsRng;
let privkey = ed25519_dalek::SigningKey::generate(&mut csprng);
self.update_node(
ip,
NodeInfo {
pubkey: privkey.verifying_key(),
updated_at: std::time::SystemTime::now(),
online: true,
},
)
.await;
self.add_key(privkey.verifying_key(), privkey).await;
}
pub async fn tabled_node_list(&self) -> String {
#[derive(Tabled)]
struct OutputRow {
ip: String,
pubkey: String,
age: u64,
online: bool,
}
let mut output = vec![];
for (ip, node_info) in self.nodes.lock().await.iter() {
@ -94,7 +80,13 @@ impl Store {
.duration_since(node_info.updated_at)
.unwrap_or(Duration::ZERO)
.as_secs();
output.push(OutputRow { ip, pubkey, age });
let online = node_info.online;
output.push(OutputRow {
ip,
pubkey,
age,
online,
});
}
Table::new(output).to_string()
}
@ -158,36 +150,32 @@ impl Store {
None => SystemTime::now(),
};
if let Some(old_pubkey) = self
.update_node(
node.ip,
NodeInfo {
pubkey,
updated_at: updated_at_std,
online: node.online,
},
)
.await
{
self.remove_key(&old_pubkey).await;
self.add_key(pubkey, privkey).await;
true
self.add_key(pubkey, privkey).await;
let node_info = NodeInfo {
pubkey,
updated_at: updated_at_std,
online: node.online,
};
if let Some(mut old_node_info) = self.update_node(node.ip, node_info.clone()).await {
if !node_info.online {
old_node_info.online = false;
}
match old_node_info.ne(&node_info) {
true => {
self.remove_key(&old_node_info.pubkey).await;
true
}
false => false,
}
} else {
false
true
}
}
/// returns old pubkey if node got updated
async fn update_node(&self, ip: String, info: NodeInfo) -> Option<VerifyingKey> {
async fn update_node(&self, ip: String, info: NodeInfo) -> Option<NodeInfo> {
let mut nodes = self.nodes.lock().await;
match nodes.insert(ip, info.clone()) {
Some(old_info) => match old_info.pubkey.ne(&info.pubkey) {
// TODO: save old private key to disk using SGX Sealing
true => Some(old_info.pubkey),
false => None,
},
None => None,
}
nodes.insert(ip, info.clone())
}
pub async fn remove_node(&self, ip: &str) {
@ -200,6 +188,22 @@ impl Store {
nodes.get(ip).cloned()
}
pub async fn get_localhost(&self) -> NodeUpdate {
// these unwrap never fail
// you could trigger reset_localhost_keys on error, though
// TODO: do that so that code looks clean
let nodes = self.nodes.lock().await;
let keys = self.keys.lock().await;
let node_info = nodes.get("localhost").unwrap();
let keypair = keys.get(&node_info.pubkey).unwrap();
NodeUpdate {
ip: "localhost".to_string(),
keypair: hex::encode(keypair.as_bytes()),
updated_at: Some(prost_types::Timestamp::from(node_info.updated_at)),
online: false,
}
}
/// freshes the keys of the node and returns a protobuf for the network
pub async fn reset_localhost_keys(&self) -> NodeUpdate {
let mut csprng = OsRng;

@ -27,7 +27,8 @@ impl ConnManager {
}
async fn connect(self, node_ip: String) {
let mut client = UpdateClient::connect(format!("{node_ip}:31373"))
println!("Connecting to {node_ip}...");
let mut client = UpdateClient::connect(format!("http://{node_ip}:31373"))
.await
.unwrap();
@ -36,7 +37,10 @@ impl ConnManager {
let response = client.get_updates(rx_stream).await.unwrap();
let mut resp_stream = response.into_inner();
let _ = self.tx.send(self.ds.get_localhost().await);
while let Some(mut update) = resp_stream.message().await.unwrap() {
println!("Received message");
// "localhost" IPs need to be changed to the real IP of the counterpart
if update.ip == "localhost" {
update.ip = node_ip.clone();

@ -62,16 +62,11 @@ impl Update for MyServer {
// note that we don't set this node online,
// as it can be behind NAT
}
if ds.process_grpc_update(update.clone()).await {
if update.ip != "127.0.0.1" && ds.process_grpc_update(update.clone()).await {
if let Err(_) = tx.send(update.clone()) {
println!("tokio broadcast receivers had an issue consuming the channel");
}
};
// disconnect client if too many connections are active
if tx.receiver_count() > 9 {
yield Err(Status::internal("Already have too many clients. Connect to another server."));
return;
}
}
Err(e) => {
yield Err(Status::internal(format!("Error receiving client stream: {}", e)));
@ -80,7 +75,13 @@ impl Update for MyServer {
}
}
Ok(update) = rx.recv() => {
println!("Sending message.");
yield Ok(update);
// disconnect client if too many connections are active
if tx.receiver_count() > 9 {
yield Err(Status::internal("Already have too many clients. Connect to another server."));
return;
}
}
}

@ -11,9 +11,7 @@ async fn main() {
let ds: Arc<Store> = Arc::new(Store::init());
let (tx, mut _rx) = broadcast::channel(500);
// ds.add_mock_node("1.1.1.1".to_string()).await;
// ds.add_mock_node("1.2.3.4".to_string()).await;
// ds.add_mock_node("1.2.2.2".to_string()).await;
ds.reset_localhost_keys().await;
let mut join_set = JoinSet::new();