improved logic for online/offline status

This commit is contained in:
ghe0 2024-08-19 16:29:16 +03:00
parent 438c34de3a
commit d47f71c941
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
3 changed files with 52 additions and 19 deletions

@ -182,6 +182,7 @@ impl Store {
let mut nodes = self.nodes.lock().await; let mut nodes = self.nodes.lock().await;
match nodes.insert(ip, info.clone()) { match nodes.insert(ip, info.clone()) {
Some(old_info) => match old_info.pubkey.ne(&info.pubkey) { Some(old_info) => match old_info.pubkey.ne(&info.pubkey) {
// TODO: save old private key to disk using SGX Sealing
true => Some(old_info.pubkey), true => Some(old_info.pubkey),
false => None, false => None,
}, },
@ -199,20 +200,32 @@ impl Store {
nodes.get(ip).cloned() nodes.get(ip).cloned()
} }
pub async fn cycle_self_key(&self) { /// freshes the keys of the node and returns a protobuf for the network
pub async fn reset_localhost_keys(&self) -> NodeUpdate {
let mut csprng = OsRng; let mut csprng = OsRng;
// TODO: save old private key to disk using SGX Sealing let keypair_raw = ed25519_dalek::SigningKey::generate(&mut csprng);
let privkey = ed25519_dalek::SigningKey::generate(&mut csprng); let keypair = hex::encode(keypair_raw.as_bytes());
let pubkey = keypair_raw.verifying_key();
let ip = "localhost".to_string();
let updated_at = std::time::SystemTime::now();
let online = false;
self.update_node( self.update_node(
"localhost".to_string(), ip.clone(),
NodeInfo { NodeInfo {
pubkey: privkey.verifying_key(), pubkey,
updated_at: std::time::SystemTime::now(), updated_at,
online: true, online,
}, },
) )
.await; .await;
self.add_key(privkey.verifying_key(), privkey).await; self.add_key(pubkey, keypair_raw.clone()).await;
let updated_at = Some(prost_types::Timestamp::from(updated_at));
NodeUpdate {
ip,
keypair,
updated_at,
online,
}
} }
pub async fn get_full_node_list(&self) -> Vec<NodeUpdate> { pub async fn get_full_node_list(&self) -> Vec<NodeUpdate> {
@ -253,4 +266,11 @@ impl Store {
} }
random_nodes random_nodes
} }
pub async fn set_online(&self, ip: &str, online: bool) {
let mut nodes = self.nodes.lock().await;
if let Some(node) = nodes.get_mut(ip) {
node.online = online;
}
}
} }

@ -35,20 +35,30 @@ impl ConnManager {
let response = client.get_updates(rx_stream).await.unwrap(); let response = client.get_updates(rx_stream).await.unwrap();
let mut resp_stream = response.into_inner(); let mut resp_stream = response.into_inner();
while let Some(update) = resp_stream.message().await.unwrap() { while let Some(mut update) = resp_stream.message().await.unwrap() {
// "localhost" IPs need to be changed to the real IP of the counterpart
if update.ip == "localhost" {
update.ip = node_ip.clone();
// since we are connecting TO this server, we have a guarantee that this
// server is not behind NAT, so we can set it online
update.online = true;
}
// update the entire network in case the information is new
if self.ds.process_grpc_update(update.clone()).await { if self.ds.process_grpc_update(update.clone()).await {
if let Err(_) = self.tx.send(update.clone()) { if let Err(_) = self.tx.send(update.clone()) {
println!("tokio broadcast receivers had an issue consuming the channel"); println!("tokio broadcast receivers had an issue consuming the channel");
} }
}; };
} }
} }
} }
fn init_connections(ds: Arc<Store>, tx: Sender<NodeUpdate>) -> JoinHandle<()> { fn init_connections(ds: Arc<Store>, tx: Sender<NodeUpdate>) -> JoinHandle<()> {
std::thread::spawn(move || { std::thread::spawn(move || {
tokio::runtime::Runtime::new().unwrap().block_on(async { tokio::runtime::Runtime::new().unwrap().block_on(async {
// we rotate online and offline nodes, to constantly check new nodes
let mut only_online_nodes = true; let mut only_online_nodes = true;
loop { loop {
let mut set = JoinSet::new(); let mut set = JoinSet::new();

@ -57,21 +57,26 @@ impl Update for MyServer {
yield Ok(update); yield Ok(update);
} }
if tx.receiver_count() > 10 {
yield Err(Status::internal("Already have too many clients. Connect to another server."));
return;
}
loop { loop {
tokio::select! { tokio::select! {
Some(msg) = inbound.next() => { Some(msg) = inbound.next() => {
match msg { match msg {
Ok(update) => { Ok(mut update) => {
if update.ip == "localhost" {
update.ip = remote_ip.clone();
// note that we don't set this node online,
// as it can be behind NAT
}
if ds.process_grpc_update(update.clone()).await { if ds.process_grpc_update(update.clone()).await {
if let Err(_) = tx.send(update.clone()) { if let Err(_) = tx.send(update.clone()) {
println!("tokio broadcast receivers had an issue consuming the channel"); println!("tokio broadcast receivers had an issue consuming the channel");
} }
}; };
// disconnect client if too many connections are active
if tx.receiver_count() > 9 {
yield Err(Status::internal("Already have too many clients. Connect to another server."));
return;
}
} }
Err(e) => { Err(e) => {
yield Err(Status::internal(format!("Error receiving client stream: {}", e))); yield Err(Status::internal(format!("Error receiving client stream: {}", e)));
@ -80,10 +85,8 @@ impl Update for MyServer {
} }
} }
Ok(update) = rx.recv() => { Ok(update) = rx.recv() => {
if update.ip != remote_ip {
yield Ok(update); yield Ok(update);
} }
}
} }
} }