improved integrations
communication between server and datstore improved - added full upgrades - propagate updated only when data is new
This commit is contained in:
		
							parent
							
								
									b21a9e0a10
								
							
						
					
					
						commit
						fe3d2e70ab
					
				| @ -1,9 +1,11 @@ | ||||
| #![allow(dead_code)] | ||||
| use crate::grpc::challenge::NodeUpdate; | ||||
| use ed25519_dalek::{Signer, SigningKey, VerifyingKey}; | ||||
| use rand::rngs::OsRng; | ||||
| use std::collections::HashMap; | ||||
| use std::time::Duration; | ||||
| use std::time::SystemTime; | ||||
| use std::time::UNIX_EPOCH; | ||||
| use tabled::{Table, Tabled}; | ||||
| use tokio::sync::Mutex; | ||||
| 
 | ||||
| @ -136,11 +138,54 @@ impl Store { | ||||
|     ///
 | ||||
|     /// On a side note, there are two types of people in this worlds:
 | ||||
|     ///     1. Those that can extrapolate...
 | ||||
|     pub async fn update_node(&self, ip: String, info: NodeInfo) -> bool { | ||||
|     pub async fn process_grpc_update(&self, node: NodeUpdate) -> bool { | ||||
|         let key_bytes = match hex::decode(node.keypair.clone()) { | ||||
|             Ok(k) => k, | ||||
|             Err(_) => return false, | ||||
|         }; | ||||
|         let privkey = SigningKey::from_bytes(match &key_bytes.as_slice().try_into() { | ||||
|             Ok(p) => p, | ||||
|             Err(_) => return false, | ||||
|         }); | ||||
|         let pubkey = privkey.verifying_key(); | ||||
|         let updated_at_std: std::time::SystemTime = match node.updated_at { | ||||
|             Some(ts) => { | ||||
|                 let duration = Duration::new(ts.seconds as u64, ts.nanos as u32); | ||||
|                 UNIX_EPOCH | ||||
|                     .checked_add(duration) | ||||
|                     .unwrap_or(SystemTime::now()) | ||||
|             } | ||||
|             None => SystemTime::now(), | ||||
|         }; | ||||
| 
 | ||||
|         if let Some(old_pubkey) = self | ||||
|             .update_node( | ||||
|                 node.ip, | ||||
|                 NodeInfo { | ||||
|                     pubkey, | ||||
|                     updated_at: updated_at_std, | ||||
|                     online: node.online, | ||||
|                 }, | ||||
|             ) | ||||
|             .await | ||||
|         { | ||||
|             self.remove_key(&old_pubkey).await; | ||||
|             self.add_key(pubkey, privkey).await; | ||||
|             true | ||||
|         } else { | ||||
|             false | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     // returns old pubkey if node got updated
 | ||||
|     async fn update_node(&self, ip: String, info: NodeInfo) -> Option<VerifyingKey> { | ||||
|         let mut nodes = self.nodes.lock().await; | ||||
|         match nodes.insert(ip, info.clone()) { | ||||
|             Some(old_info) => old_info.ne(&info), | ||||
|             None => false, | ||||
|             Some(old_info) => match old_info.pubkey.ne(&info.pubkey) { | ||||
|                 true => Some(old_info.pubkey), | ||||
|                 false => None, | ||||
|             }, | ||||
|             None => None, | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| @ -169,4 +214,20 @@ impl Store { | ||||
|         .await; | ||||
|         self.add_key(privkey.verifying_key(), privkey).await; | ||||
|     } | ||||
| 
 | ||||
|     pub async fn get_full_node_list(&self) -> Vec<NodeUpdate> { | ||||
|         let nodes = self.nodes.lock().await; | ||||
|         let keys = self.keys.lock().await; | ||||
|         nodes | ||||
|             .iter() | ||||
|             .filter_map(|(ip, node_info)| { | ||||
|                 keys.get(&node_info.pubkey).map(|signing_key| NodeUpdate { | ||||
|                     ip: ip.to_string(), | ||||
|                     keypair: hex::encode(signing_key.as_bytes()), | ||||
|                     updated_at: Some(prost_types::Timestamp::from(node_info.updated_at)), | ||||
|                     online: node_info.online, | ||||
|                 }) | ||||
|             }) | ||||
|             .collect() | ||||
|     } | ||||
| } | ||||
|  | ||||
							
								
								
									
										0
									
								
								src/grpc/client.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										0
									
								
								src/grpc/client.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										6
									
								
								src/grpc/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										6
									
								
								src/grpc/mod.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,6 @@ | ||||
| mod server; | ||||
| mod client; | ||||
| 
 | ||||
| pub mod challenge { | ||||
|     tonic::include_proto!("challenge"); | ||||
| } | ||||
| @ -1,7 +1,9 @@ | ||||
| #![allow(dead_code)] | ||||
| 
 | ||||
| use super::challenge::update_server::Update; | ||||
| use super::challenge::update_server::UpdateServer; | ||||
| use super::challenge::NodeUpdate; | ||||
| use crate::datastore::Store; | ||||
| use challenge::NodeUpdate; | ||||
| use std::pin::Pin; | ||||
| use std::sync::Arc; | ||||
| use std::thread::JoinHandle; | ||||
| @ -9,10 +11,6 @@ use tokio::sync::broadcast::Sender; | ||||
| use tokio_stream::{Stream, StreamExt}; | ||||
| use tonic::{transport::Server, Request, Response, Status, Streaming}; | ||||
| 
 | ||||
| pub mod challenge { | ||||
|     tonic::include_proto!("challenge"); | ||||
| } | ||||
| 
 | ||||
| pub struct MyServer { | ||||
|     ds: Arc<Store>, | ||||
|     tx: Sender<NodeUpdate>, | ||||
| @ -28,7 +26,7 @@ impl MyServer { | ||||
|             tokio::runtime::Runtime::new().unwrap().block_on(async { | ||||
|                 let addr = "0.0.0.0:31373".parse().unwrap(); | ||||
|                 if let Err(e) = Server::builder() | ||||
|                     .add_service(challenge::update_server::UpdateServer::new(self)) | ||||
|                     .add_service(UpdateServer::new(self)) | ||||
|                     .serve(addr) | ||||
|                     .await | ||||
|                 { | ||||
| @ -40,7 +38,7 @@ impl MyServer { | ||||
| } | ||||
| 
 | ||||
| #[tonic::async_trait] | ||||
| impl challenge::update_server::Update for MyServer { | ||||
| impl Update for MyServer { | ||||
|     type GetUpdatesStream = Pin<Box<dyn Stream<Item = Result<NodeUpdate, Status>> + Send>>; | ||||
| 
 | ||||
|     async fn get_updates( | ||||
| @ -51,16 +49,29 @@ impl challenge::update_server::Update for MyServer { | ||||
|         let tx = self.tx.clone(); | ||||
|         let mut rx = self.tx.subscribe(); | ||||
|         let mut inbound = req.into_inner(); | ||||
|         let ds = self.ds.clone(); | ||||
| 
 | ||||
|         let stream = async_stream::stream! { | ||||
|             let full_update_list = ds.get_full_node_list().await; | ||||
|             for update in full_update_list { | ||||
|                 yield Ok(update); | ||||
|             } | ||||
| 
 | ||||
|             if tx.receiver_count() > 10 { | ||||
|                 yield Err(Status::internal("Already have too many clients. Connect to another server.")); | ||||
|                 return; | ||||
|             } | ||||
| 
 | ||||
|             loop { | ||||
|                 tokio::select! { | ||||
|                     Some(msg) = inbound.next() => { | ||||
|                         match msg { | ||||
|                             Ok(update) => { | ||||
|                                 if let Err(_) = tx.send(update.clone()) { | ||||
|                                     yield Err(Status::internal("Failed to broadcast update")); | ||||
|                                 } | ||||
|                                 if ds.process_grpc_update(update.clone()).await { | ||||
|                                     if let Err(_) = tx.send(update.clone()) { | ||||
|                                         println!("tokio broadcast receivers had an issue consuming the channel"); | ||||
|                                     } | ||||
|                                 }; | ||||
|                             } | ||||
|                             Err(e) => { | ||||
|                                 yield Err(Status::internal(format!("Error receiving client stream: {}", e))); | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user