added http_serve support for persistence

This commit is contained in:
ghe0 2024-08-23 03:05:30 +03:00
parent c7976ec44b
commit a20d46bb49
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
7 changed files with 158 additions and 38 deletions

@ -14,7 +14,7 @@ echo -n "Checking if containers connected to each other... "
for i in {2..12} for i in {2..12}
do do
ip="172.17.0.${i}" ip="172.17.0.${i}"
curl -s "${ip}:31372" | grep -e true -e false -c | grep 12 > /dev/null || curl -s "${ip}:31372/memory" | grep -e true -e false -c | grep 12 > /dev/null ||
echo Container at ip ${ip} did not connect to all other containers. echo Container at ip ${ip} did not connect to all other containers.
done done
echo OK! echo OK!
@ -23,24 +23,24 @@ echo -n "Checking if containers can sign data... "
for i in {2..12} for i in {2..12}
do do
ip="172.17.0.${i}" ip="172.17.0.${i}"
random_key=$(curl -s "${ip}:31372" | grep true | tail -1 | awk '{ print $4 }') random_key=$(curl -s "${ip}:31372/memory" | grep true | tail -1 | awk '{ print $4 }')
message="ValyDoesNotLikeMyCodeSoHeIsSilentAboutIt" message="ValyDoesNotLikeMyCodeSoHeIsSilentAboutIt"
mkdir -p .tmp mkdir -p .tmp
status=$(curl -sG \ status=$(curl -sG \
-o .tmp/output -w "%{http_code}\n" \ -o .tmp/output -w "%{http_code}\n" \
--data-urlencode "pubkey=${random_key}" \ --data-urlencode "pubkey=${random_key}" \
--data-urlencode "something=${message}" \ --data-urlencode "something=${message}" \
"172.17.0.${i}:31372/sign") "172.17.0.${i}:31372/memory/sign")
if (( "$status" != "200" )); then if (( "$status" != "200" )); then
echo Container at ip ${ip} could not sign string with key ${random_key} echo Container at ip ${ip} could not sign string with key ${random_key}
echo The status was $status echo The status was $status
echo The error was $(cat .tmp/output) echo The error was $(cat .tmp/output)
echo Output of keys on 172.17.0.${i}: echo Output of keys on 172.17.0.${i}:
curl 172.17.0.${i}:31372 curl "172.17.0.${i}:31372/memory"
father_of_key=$(curl 172.17.0.${i}:31372 | grep ${random_key} | awk '{ print $2 }') father_of_key=$(curl "172.17.0.${i}:31372/memory" | grep ${random_key} | awk '{ print $2 }')
echo Output of keys on ${father_of_key}: echo Output of keys on ${father_of_key}:
curl ${father_of_key}:31372 curl "${father_of_key}:31372/memory"
rm -rf .tmp rm -rf .tmp
exit 1 exit 1
fi fi

@ -4,5 +4,6 @@
# It's only purpose is to help bootstrap a test network. # It's only purpose is to help bootstrap a test network.
echo $INIT_NODES | tr ' ' '\n' > /detee_challenge_nodes echo $INIT_NODES | tr ' ' '\n' > /detee_challenge_nodes
touch /detee_challenge_node_history
/hacker-challenge /hacker-challenge

@ -1,4 +1,5 @@
use crate::grpc::challenge::NodeUpdate; use crate::grpc::challenge::NodeUpdate;
use crate::persistence::FileManager;
use dashmap::{DashMap, DashSet}; use dashmap::{DashMap, DashSet};
use ed25519_dalek::{Signer, SigningKey, VerifyingKey, PUBLIC_KEY_LENGTH}; use ed25519_dalek::{Signer, SigningKey, VerifyingKey, PUBLIC_KEY_LENGTH};
use rand::rngs::OsRng; use rand::rngs::OsRng;
@ -19,6 +20,7 @@ pub struct Store {
nodes: DashMap<IP, NodeInfo>, nodes: DashMap<IP, NodeInfo>,
conns: DashSet<IP>, conns: DashSet<IP>,
keys: DashMap<VerifyingKey, SigningKey>, keys: DashMap<VerifyingKey, SigningKey>,
persistence: FileManager,
} }
pub enum SigningError { pub enum SigningError {
CorruptedKey, CorruptedKey,
@ -57,11 +59,12 @@ impl std::fmt::Display for SigningError {
impl Store { impl Store {
// app should exit if any error happens here so unwrap() is good // app should exit if any error happens here so unwrap() is good
pub fn init() -> Self { pub async fn init(path: &str) -> Self {
Self { Self {
nodes: DashMap::new(), nodes: DashMap::new(),
keys: DashMap::new(), keys: DashMap::new(),
conns: DashSet::new(), conns: DashSet::new(),
persistence: FileManager::init(path).await.unwrap(),
} }
} }
@ -73,7 +76,7 @@ impl Store {
self.conns.remove(ip); self.conns.remove(ip);
} }
pub async fn tabled_node_list(&self) -> String { pub async fn tabled_memory_list(&self) -> String {
#[derive(Tabled)] #[derive(Tabled)]
struct OutputRow { struct OutputRow {
ip: String, ip: String,
@ -103,6 +106,47 @@ impl Store {
Table::new(output).to_string() Table::new(output).to_string()
} }
pub async fn tabled_disk_list(&self, page: u64) -> String {
let mut offset = page.wrapping_mul(10);
#[derive(Tabled)]
struct OutputRow {
id: u64,
ip: String,
pubkey: String,
timestamp: String,
}
let mut output = vec![];
for (ip, keypair, timestamp) in self
.persistence
.get_page_of_20(offset)
.await
.unwrap()
.iter()
.map(|n| {
(
n.ip.to_string(),
n.keypair.clone(),
n.joined_at
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
.to_string(),
)
})
{
let id = offset;
let pubkey = hex::encode(keypair.verifying_key().as_bytes());
output.push(OutputRow {
id,
ip,
pubkey,
timestamp,
});
offset += 1;
}
Table::new(output).to_string()
}
pub async fn sign_message_with_key( pub async fn sign_message_with_key(
&self, &self,
message: &str, message: &str,
@ -166,13 +210,14 @@ impl Store {
None => SystemTime::now(), None => SystemTime::now(),
}; };
self.add_key(pubkey, privkey).await; self.add_key(pubkey, privkey.clone()).await;
let node_info = NodeInfo { let node_info = NodeInfo {
pubkey, pubkey,
updated_at: updated_at_std, updated_at: updated_at_std,
public: node.public, public: node.public,
}; };
if let Some(mut old_node_info) = self.update_node(node.ip, node_info.clone()).await { if let Some(mut old_node_info) = self.update_node(node.ip.clone(), node_info.clone()).await
{
if !node_info.public { if !node_info.public {
old_node_info.public = node_info.public; old_node_info.public = node_info.public;
} }
@ -184,6 +229,11 @@ impl Store {
false => false, false => false,
} }
} else { } else {
if let Ok(persistence_node) = (node.ip.as_str(), privkey, updated_at_std).try_into() {
if let Err(e) = self.persistence.append_node(persistence_node).await {
println!("Could not save data to disk: {e}.");
}
}
true true
} }
} }
@ -288,6 +338,17 @@ mod tests {
use ed25519_dalek::SigningKey; use ed25519_dalek::SigningKey;
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
async fn setup_file_manager(function: &str) -> std::io::Result<FileManager> {
let _ = tokio::fs::create_dir_all(".tmp").await;
let path = ".tmp/ds_".to_string() + function;
let mut file = File::create(path.clone()).await?;
file.flush().await?;
drop(file);
FileManager::init(&path).await
}
#[test] #[test]
fn node_info_creation() { fn node_info_creation() {
@ -303,12 +364,13 @@ mod tests {
assert!(node_info.public); assert!(node_info.public);
} }
#[test] #[tokio::test]
fn store_creation() { async fn store_creation() {
let store = Store { let store = Store {
nodes: DashMap::new(), nodes: DashMap::new(),
conns: DashSet::new(), conns: DashSet::new(),
keys: DashMap::new(), keys: DashMap::new(),
persistence: setup_file_manager("store_creation").await.unwrap(),
}; };
assert!(store.nodes.is_empty()); assert!(store.nodes.is_empty());
@ -336,6 +398,7 @@ mod tests {
nodes: DashMap::new(), nodes: DashMap::new(),
conns: DashSet::new(), conns: DashSet::new(),
keys: DashMap::new(), keys: DashMap::new(),
persistence: setup_file_manager("sign_message_with_key").await.unwrap(),
}; };
store.keys.insert(keypair.verifying_key(), keypair); store.keys.insert(keypair.verifying_key(), keypair);
@ -368,6 +431,7 @@ mod tests {
nodes: DashMap::new(), nodes: DashMap::new(),
conns: DashSet::new(), conns: DashSet::new(),
keys: DashMap::new(), keys: DashMap::new(),
persistence: setup_file_manager("process_node_update").await.unwrap(),
}; };
let result = store.process_node_update(node_update).await; let result = store.process_node_update(node_update).await;
@ -389,6 +453,7 @@ mod tests {
nodes: DashMap::new(), nodes: DashMap::new(),
conns: DashSet::new(), conns: DashSet::new(),
keys: DashMap::new(), keys: DashMap::new(),
persistence: setup_file_manager("get_full_node_list").await.unwrap(),
}; };
store.nodes.insert("127.0.0.1".to_string(), node_info); store.nodes.insert("127.0.0.1".to_string(), node_info);

@ -51,7 +51,6 @@ impl ConnManager {
let response = client.get_updates(rx_stream).await?; let response = client.get_updates(rx_stream).await?;
let mut resp_stream = response.into_inner(); let mut resp_stream = response.into_inner();
// TODO: this is a hack to send the localhost node to the peer
let _ = self.tx.send(self.ds.get_localhost().await); let _ = self.tx.send(self.ds.get_localhost().await);
while let Some(mut update) = resp_stream.message().await? { while let Some(mut update) = resp_stream.message().await? {

@ -4,54 +4,105 @@ use std::sync::Arc;
use salvo::affix; use salvo::affix;
use salvo::prelude::*; use salvo::prelude::*;
enum SignError { const HOMEPAGE: &str = r#"Welcome, beloved hacker!
I am a node that is part of the DeTEE Hacker Challenge network.
I will allow you to sign messages using private ed25519 keys that I have in memory and on disk.
If you want to run your own instance of this enclave, go to https://detee.cloud/hacker-challenge
To access keys that are saved in memory, navigate to /memory. To sign something using a key that
is in memory, curl /memory/sign with the params "pubkey" and "something". Example:
curl -G \
--data-urlencode "pubkey=THE_PUBKEY_IN_HEX_FORMAT_HERE" \
--data-urlencode "something=YOUR_MESSAGE_HERE" \
'IP_OF_THE_NODE:31372/memory/sign'
Each node publishes a new key to the cluster every 60 seconds. Old keys are deleted.
The first key that each node publiches when joining the network is permanently saved to disk.
To access keys that are saved on disk, navigate to /disk. Disk entries are paginated.
You can navigate to a specific page by using get params. Example: https://{ip}/disk?page={number}.
To sign a random message using a key from disk, use /disk/sign and send the key id as a get param:
curl -G \
--data-urlencode "pubkey_id=1337" \
--data-urlencode "something=YOUR_MESSAGE_HERE" \
'IP_OF_THE_NODE:31372/disk/sign'
Your goal is to obtain a public key by any means necessary.
If you manage to steal a key, contact us at https://detee.cloud
Good luck!
"#;
enum HTTPError {
NoPubkey, NoPubkey,
NoMessage, NoMessage,
Store(SigningError), Store(SigningError),
} }
#[async_trait] #[async_trait]
impl Writer for SignError { impl Writer for HTTPError {
async fn write(self, _req: &mut Request, _depot: &mut Depot, res: &mut Response) { async fn write(self, _req: &mut Request, _depot: &mut Depot, res: &mut Response) {
res.status_code(StatusCode::BAD_REQUEST); res.status_code(StatusCode::BAD_REQUEST);
match self { match self {
SignError::NoPubkey => res.render("pubkey must be specified as GET param"), HTTPError::NoPubkey => res.render("pubkey must be specified as GET param"),
SignError::NoMessage => res.render("something must be specified as GET param"), HTTPError::NoMessage => res.render("something must be specified as GET param"),
SignError::Store(e) => res.render(format!("{e}")), HTTPError::Store(e) => res.render(format!("{e}")),
}; };
} }
} }
#[handler] #[handler]
async fn homepage(depot: &mut Depot) -> String { async fn homepage() -> String {
let ds = depot.obtain::<Arc<Store>>().unwrap(); HOMEPAGE.to_string()
ds.tabled_node_list().await // TODO: make this paginated
} }
#[handler] #[handler]
async fn sign(req: &mut Request, depot: &mut Depot) -> Result<String, SignError> { async fn memory_list(depot: &mut Depot) -> String {
let ds = depot.obtain::<Arc<Store>>().unwrap();
ds.tabled_memory_list().await // TODO: make this paginated
}
#[handler]
async fn memory_sign(req: &mut Request, depot: &mut Depot) -> Result<String, HTTPError> {
let ds = depot.obtain::<Arc<Store>>().unwrap(); let ds = depot.obtain::<Arc<Store>>().unwrap();
let pubkey = match req.query::<String>("pubkey") { let pubkey = match req.query::<String>("pubkey") {
Some(k) => k, Some(k) => k,
None => return Err(SignError::NoPubkey), None => return Err(HTTPError::NoPubkey),
}; };
let something = match req.query::<String>("something") { let something = match req.query::<String>("something") {
Some(k) => k, Some(k) => k,
None => return Err(SignError::NoMessage), None => return Err(HTTPError::NoMessage),
}; };
match ds.sign_message_with_key(&something, &pubkey).await { match ds.sign_message_with_key(&something, &pubkey).await {
Ok(s) => Ok(s), Ok(s) => Ok(s),
Err(e) => Err(SignError::Store(e)), Err(e) => Err(HTTPError::Store(e)),
} }
} }
#[handler]
async fn disk_list(req: &mut Request, depot: &mut Depot) -> Result<String, HTTPError> {
let ds = depot.obtain::<Arc<Store>>().unwrap();
let page = match req.query::<u64>("page") {
Some(n) => n,
None => 0,
};
Ok(ds.tabled_disk_list(page).await)
}
pub async fn init(ds: Arc<Store>) { pub async fn init(ds: Arc<Store>) {
let acceptor = TcpListener::new("0.0.0.0:31372").bind().await; let acceptor = TcpListener::new("0.0.0.0:31372").bind().await;
let router = Router::new() let router = Router::new()
.hoop(affix::inject(ds)) .hoop(affix::inject(ds))
.get(homepage) .get(homepage)
.push(Router::with_path("sign").get(sign)); .push(
Router::with_path("memory")
.get(memory_list)
.push(Router::with_path("sign").get(memory_sign)),
)
.push(Router::with_path("disk").get(disk_list));
println!("{:?}", router); println!("{:?}", router);
Server::new(acceptor).serve(router).await; Server::new(acceptor).serve(router).await;
} }

@ -12,6 +12,9 @@ use std::sync::Arc;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
const INIT_NODES: &str = "detee_challenge_nodes";
const DISK_PERSISTENCE: &str = "detee_challenge_node_history";
async fn cycle_keys(ds: Arc<Store>, tx: Sender<NodeUpdate>) { async fn cycle_keys(ds: Arc<Store>, tx: Sender<NodeUpdate>) {
loop { loop {
sleep(Duration::from_secs(60)).await; sleep(Duration::from_secs(60)).await;
@ -21,7 +24,7 @@ async fn cycle_keys(ds: Arc<Store>, tx: Sender<NodeUpdate>) {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let ds: Arc<Store> = Arc::new(Store::init()); let ds: Arc<Store> = Arc::new(Store::init(DISK_PERSISTENCE).await);
ds.reset_localhost_keys().await; ds.reset_localhost_keys().await;
let (tx, mut _rx) = broadcast::channel(500); let (tx, mut _rx) = broadcast::channel(500);
@ -32,7 +35,7 @@ async fn main() {
long_term_tasks.spawn(http_server::init(ds.clone())); long_term_tasks.spawn(http_server::init(ds.clone()));
long_term_tasks.spawn(grpc::server::MyServer::init(ds.clone(), tx.clone()).start()); long_term_tasks.spawn(grpc::server::MyServer::init(ds.clone(), tx.clone()).start());
let input = File::open("detee_challenge_nodes").unwrap(); let input = File::open(INIT_NODES).unwrap();
let buffered = BufReader::new(input); let buffered = BufReader::new(input);
for line in buffered.lines() { for line in buffered.lines() {
init_tasks.spawn( init_tasks.spawn(

@ -10,7 +10,7 @@ use tokio::sync::Mutex;
const DATA_SIZE: usize = 76; const DATA_SIZE: usize = 76;
enum Error { pub enum Error {
CorruptedIP, CorruptedIP,
} }
@ -21,10 +21,10 @@ impl From<AddrParseError> for Error {
} }
#[derive(Clone)] #[derive(Clone)]
struct Node { pub struct Node {
ip: Ipv4Addr, pub ip: Ipv4Addr,
keypair: SigningKey, pub keypair: SigningKey,
joined_at: SystemTime, pub joined_at: SystemTime,
} }
impl TryFrom<(&str, SigningKey, SystemTime)> for Node { impl TryFrom<(&str, SigningKey, SystemTime)> for Node {
@ -78,19 +78,19 @@ impl Node {
} }
} }
struct FileManager { pub struct FileManager {
file: Mutex<File>, file: Mutex<File>,
} }
impl FileManager { impl FileManager {
async fn init(path: &str) -> std::io::Result<Self> { pub async fn init(path: &str) -> std::io::Result<Self> {
let file = File::options().read(true).append(true).open(path).await?; let file = File::options().read(true).append(true).open(path).await?;
Ok(Self { Ok(Self {
file: Mutex::new(file), file: Mutex::new(file),
}) })
} }
async fn append_node(&self, node: Node) -> std::io::Result<()> { pub async fn append_node(&self, node: Node) -> std::io::Result<()> {
let mut file = self.file.lock().await; let mut file = self.file.lock().await;
file.seek(SeekFrom::End(0)).await?; file.seek(SeekFrom::End(0)).await?;
file.write_all(&node.to_bytes()).await?; file.write_all(&node.to_bytes()).await?;
@ -98,7 +98,7 @@ impl FileManager {
Ok(()) Ok(())
} }
async fn get_node_by_id(&self, id: u64) -> std::io::Result<Node> { pub async fn get_node_by_id(&self, id: u64) -> std::io::Result<Node> {
let mut file = self.file.lock().await; let mut file = self.file.lock().await;
file.seek(SeekFrom::Start( file.seek(SeekFrom::Start(
id.wrapping_mul(DATA_SIZE.try_into().unwrap_or(0)), id.wrapping_mul(DATA_SIZE.try_into().unwrap_or(0)),
@ -111,7 +111,7 @@ impl FileManager {
/// Returns 20 nodes from the disk. /// Returns 20 nodes from the disk.
/// Specify offset (the number of nodes to skip). /// Specify offset (the number of nodes to skip).
async fn get_page_of_20(&self, offset: u64) -> std::io::Result<Vec<Node>> { pub async fn get_page_of_20(&self, offset: u64) -> std::io::Result<Vec<Node>> {
let mut file = self.file.lock().await; let mut file = self.file.lock().await;
file.seek(SeekFrom::Start( file.seek(SeekFrom::Start(
offset offset
@ -150,6 +150,7 @@ mod tests {
fn get_test_file_name(function: &str) -> String { fn get_test_file_name(function: &str) -> String {
TEST_FILE_PREFIX.to_string() + function TEST_FILE_PREFIX.to_string() + function
} }
async fn setup_test_file(function: &str) -> Result<FileManager> { async fn setup_test_file(function: &str) -> Result<FileManager> {
let _ = tokio::fs::create_dir_all(".tmp").await; let _ = tokio::fs::create_dir_all(".tmp").await;
let path = get_test_file_name(function); let path = get_test_file_name(function);