added auth

This commit is contained in:
ghe0 2025-02-03 20:29:20 +02:00
parent f74d2887fb
commit 0ed21fdf98
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
8 changed files with 179 additions and 40 deletions

71
Cargo.lock generated

@ -26,6 +26,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.18"
@ -257,6 +272,20 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets",
]
[[package]]
name = "colorchoice"
version = "1.0.3"
@ -347,6 +376,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bs58",
"chrono",
"ed25519-dalek",
"env_logger",
"lazy_static",
@ -766,6 +796,29 @@ dependencies = [
"tracing",
]
[[package]]
name = "iana-time-zone"
version = "0.1.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "1.5.0"
@ -1053,6 +1106,15 @@ dependencies = [
"tempfile",
]
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "object"
version = "0.36.5"
@ -2083,6 +2145,15 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-registry"
version = "0.2.0"

@ -22,6 +22,7 @@ tokio-stream = "0.1.17"
tonic = "0.12"
serde_json = "1.0.135"
bs58 = "0.5.1"
chrono = "0.4.39"
[build-dependencies]
tonic-build = "0.12"

@ -1,6 +1,7 @@
owner_wallet: "ThisIsNotARealWallet000000000000000000000000"
brain_url: "http://164.92.249.180:31337"
max_cores_per_vm: 4
max_vcpu_reservation: 8
max_cores_per_vm: 8
max_vcpu_reservation: 24
max_mem_reservation_mb: 25000
network_interfaces:
- driver: "MACVTAP"
@ -26,3 +27,4 @@ public_port_range:
start: 30000
end: 50000
max_ports_per_vm: 5
price: 20000

@ -8,6 +8,11 @@ message Pubkey {
string pubkey = 1;
}
message AccountBalance {
uint64 balance = 1;
uint64 tmp_locked = 2;
}
message Contract {
string uuid = 1;
string hostname = 2;
@ -23,7 +28,7 @@ message Contract {
string dtrfs_sha = 12;
string created_at = 13;
string updated_at = 14;
// total nanotoken cost per minute (for all units)
// total nanoLP cost per minute (for all units)
uint64 nano_per_minute = 15;
uint64 locked_nano = 16;
string collected_at = 17;
@ -47,6 +52,7 @@ message MeasurementIP {
string gateway = 4;
}
// This should also include a block hash or similar, for auth
message RegisterNodeReq {
string node_pubkey = 1;
string owner_pubkey = 2;
@ -54,7 +60,7 @@ message RegisterNodeReq {
string country = 4;
string region = 5;
string city = 6;
// nanotokens per unit per minute
// nanoLP per unit per minute
uint64 price = 7;
}
@ -96,13 +102,14 @@ message NewVmResp {
message UpdateVmReq {
string uuid = 1;
uint32 disk_size_gb = 2;
uint32 vcpus = 3;
uint32 memory_mb = 4;
string kernel_url = 5;
string kernel_sha = 6;
string dtrfs_url = 7;
string dtrfs_sha = 8;
string admin_pubkey = 2;
uint32 disk_size_gb = 3;
uint32 vcpus = 4;
uint32 memory_mb = 5;
string kernel_url = 6;
string kernel_sha = 7;
string dtrfs_url = 8;
string dtrfs_sha = 9;
}
message UpdateVmResp {
@ -113,6 +120,7 @@ message UpdateVmResp {
message DeleteVmReq {
string uuid = 1;
string admin_pubkey = 2;
}
message BrainMessage {
@ -123,9 +131,16 @@ message BrainMessage {
}
}
message DaemonStreamAuth {
string timestamp = 1;
string pubkey = 2;
repeated string contracts = 3;
string signature = 4;
}
message DaemonMessage {
oneof Msg {
Pubkey pubkey = 1;
DaemonStreamAuth auth = 1;
NewVmResp new_vm_resp = 2;
UpdateVmResp update_vm_resp = 3;
NodeResources node_resources = 4;
@ -134,7 +149,7 @@ message DaemonMessage {
service BrainDaemon {
rpc RegisterNode (RegisterNodeReq) returns (stream Contract);
rpc BrainMessages (Pubkey) returns (stream BrainMessage);
rpc BrainMessages (DaemonStreamAuth) returns (stream BrainMessage);
rpc DaemonMessages (stream DaemonMessage) returns (Empty);
}
@ -155,6 +170,7 @@ message NodeFilters {
string region = 8;
string city = 9;
string ip = 10;
string node_pubkey = 11;
}
message NodeListResp {
@ -165,15 +181,24 @@ message NodeListResp {
string ip = 5; // required for latency test
uint32 server_rating = 6;
uint32 provider_rating = 7;
// nanotokens per unit per minute
// nanoLP per unit per minute
uint64 price = 8;
}
message ExtendVmReq {
string uuid = 1;
string admin_pubkey = 2;
uint64 locked_nano = 3;
}
service BrainCli {
rpc GetAirdrop (Pubkey) returns (Empty);
rpc GetBalance (Pubkey) returns (AccountBalance);
rpc NewVm (NewVmReq) returns (NewVmResp);
rpc ListContracts (ListContractsReq) returns (stream Contract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp);
rpc GetOneNode (NodeFilters) returns (NodeListResp);
rpc DeleteVm (DeleteVmReq) returns (Empty);
rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp);
rpc ExtendVm (ExtendVmReq) returns (Empty);
}

@ -44,6 +44,7 @@ pub enum InterfaceType {
#[derive(Deserialize, Debug)]
pub struct Config {
pub owner_wallet: String,
pub brain_url: String,
pub max_cores_per_vm: usize,
pub max_vcpu_reservation: usize,

@ -49,6 +49,12 @@ fn load_secret_key() -> Result<ed25519_dalek::SigningKey> {
))
}
pub fn sign_message(msg: &str) -> Result<String> {
use ed25519_dalek::Signer;
let key = load_secret_key()?;
Ok(bs58::encode(key.sign(msg.as_bytes()).to_bytes()).into_string())
}
pub fn get_public_key() -> String {
let pubkey = bs58::encode(load_secret_key().unwrap().verifying_key().to_bytes()).into_string();
log::info!("Loaded the following public key: {pubkey}");

@ -1,16 +1,14 @@
use crate::global::*;
use crate::snp_proto::DaemonMessage;
use anyhow::Result;
use log::{debug, info, warn};
use snp_proto::{
brain_daemon_client::BrainDaemonClient, BrainMessage, Contract, Pubkey, RegisterNodeReq,
};
use snp_proto::{brain_daemon_client::BrainDaemonClient, BrainMessage, Contract, RegisterNodeReq};
use tokio::{
sync::mpsc::{Receiver, Sender},
task::JoinSet,
};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tonic::transport::Channel;
use crate::global::*;
pub mod snp_proto {
tonic::include_proto!("snp_proto");
@ -35,18 +33,32 @@ impl From<snp_proto::NodeResources> for snp_proto::DaemonMessage {
}
pub async fn register_node(config: &crate::config::Config) -> Result<Vec<Contract>> {
use tonic::metadata::AsciiMetadataValue;
use tonic::Request;
let mut client = BrainDaemonClient::connect(config.brain_url.clone()).await?;
debug!("Starting node registration...");
let ip_info = IP_INFO.clone();
let req = RegisterNodeReq {
node_pubkey: PUBLIC_KEY.clone(),
owner_pubkey: "IamTheOwnerOf".to_string() + &PUBLIC_KEY,
owner_pubkey: config.owner_wallet.clone(),
main_ip: ip_info.ip,
country: ip_info.country,
region: ip_info.region,
city: ip_info.city,
price: config.price,
};
let pubkey = PUBLIC_KEY.clone();
let timestamp = chrono::Utc::now().to_rfc3339();
let signature = crate::global::sign_message(&format!("{timestamp}{req:?}"))?;
let timestamp: AsciiMetadataValue = timestamp.parse()?;
let pubkey: AsciiMetadataValue = pubkey.parse()?;
let signature: AsciiMetadataValue = signature.parse()?;
let mut req = Request::new(req);
req.metadata_mut().insert("timestamp", timestamp);
req.metadata_mut().insert("pubkey", pubkey);
req.metadata_mut().insert("request-signature", signature);
let mut contracts = Vec::new();
let mut grpc_stream = client.register_node(req).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
@ -64,13 +76,21 @@ pub async fn register_node(config: &crate::config::Config) -> Result<Vec<Contrac
Ok(contracts)
}
fn sign_stream_auth(contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> {
let pubkey = PUBLIC_KEY.clone();
let timestamp = chrono::Utc::now().to_rfc3339();
let signature =
crate::global::sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?;
Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature })
}
async fn receive_messages(
mut client: BrainDaemonClient<Channel>,
contracts: Vec<String>,
tx: Sender<BrainMessage>,
) -> Result<()> {
debug!("starting to listen for messages from brain");
let pubkey = PUBLIC_KEY.clone();
let mut grpc_stream = client.brain_messages(Pubkey { pubkey }).await?.into_inner();
let mut grpc_stream = client.brain_messages(sign_stream_auth(contracts)?).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(msg) => {
@ -88,13 +108,15 @@ async fn receive_messages(
async fn send_messages(
mut client: BrainDaemonClient<Channel>,
contracts: Vec<String>,
rx: Receiver<DaemonMessage>,
tx: Sender<DaemonMessage>,
) -> Result<()> {
debug!("starting daemon message stream to brain");
let pubkey = PUBLIC_KEY.clone();
let rx_stream = ReceiverStream::new(rx);
tx.send(DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::Pubkey(Pubkey { pubkey })) })
tx.send(DaemonMessage {
msg: Some(snp_proto::daemon_message::Msg::Auth(sign_stream_auth(contracts)?)),
})
.await?;
client.daemon_messages(rx_stream).await?;
debug!("send_newvm_resp is about to exit");
@ -102,6 +124,7 @@ async fn send_messages(
}
pub struct ConnectionData {
pub contracts: Vec<String>,
pub brain_url: String,
pub brain_msg_tx: Sender<BrainMessage>,
pub daemon_msg_rx: Receiver<DaemonMessage>,
@ -112,8 +135,13 @@ pub async fn connect_and_run(cd: ConnectionData) -> Result<()> {
let client = BrainDaemonClient::connect(cd.brain_url).await?;
let mut streaming_tasks = JoinSet::new();
streaming_tasks.spawn(receive_messages(client.clone(), cd.brain_msg_tx));
streaming_tasks.spawn(send_messages(client.clone(), cd.daemon_msg_rx, cd.daemon_msg_tx));
streaming_tasks.spawn(receive_messages(client.clone(), cd.contracts.clone(), cd.brain_msg_tx));
streaming_tasks.spawn(send_messages(
client.clone(),
cd.contracts,
cd.daemon_msg_rx,
cd.daemon_msg_tx,
));
let task_output = streaming_tasks.join_next().await;
warn!("One stream exited: {task_output:?}");

@ -140,8 +140,7 @@ impl VMHandler {
async fn handle_update_vm_req(&mut self, update_vm_req: snp_proto::UpdateVmReq) -> Result<()> {
debug!("Processing update vm request: {update_vm_req:?}");
let vm_id = update_vm_req.uuid.clone();
let content =
std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
let mut vm: state::VM = serde_yaml::from_str(&content)?;
match vm.update(update_vm_req.into(), &self.config, &mut self.res) {
Ok(_) => {
@ -170,14 +169,14 @@ impl VMHandler {
fn handle_delete_vm(&mut self, delete_vm_req: snp_proto::DeleteVmReq) -> Result<()> {
let vm_id = delete_vm_req.uuid;
let content =
std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?;
let vm: state::VM = serde_yaml::from_str(&content)?;
vm.delete(&mut self.res)?;
Ok(())
}
async fn run(mut self) {
sleep(Duration::from_millis(100)).await;
self.send_node_resources().await;
while let Some(brain_msg) = self.receiver.recv().await {
match brain_msg.msg {
@ -206,12 +205,13 @@ impl VMHandler {
for uuid in self.res.existing_vms.clone() {
if contracts.iter().find(|c| c.uuid == uuid).is_none() {
info!("VM {uuid} exists locally but not found in brain. Deleting...");
let content = match std::fs::read_to_string(
VM_CONFIG_DIR.to_string() + &uuid + ".yaml",
) {
let content =
match std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &uuid + ".yaml") {
Ok(content) => content,
Err(e) => {
log::error!("Could not find VM config for {uuid}. Cannot delete VM: {e:?}");
log::error!(
"Could not find VM config for {uuid}. Cannot delete VM: {e:?}"
);
continue;
}
};
@ -243,8 +243,12 @@ async fn main() {
let brain_url = vm_handler.config.brain_url.clone();
info!("Registering with the brain and getting back VM Contracts (if they exist).");
let mut contracts: Vec<String> = Vec::new();
match grpc::register_node(&vm_handler.config).await {
Ok(contracts) => vm_handler.clear_deleted_contracts(contracts),
Ok(c) => {
contracts.append(&mut c.iter().map(|c| c.uuid.clone()).collect());
vm_handler.clear_deleted_contracts(c)
}
Err(e) => log::error!("Could not get contracts from brain: {e:?}"),
};
@ -254,6 +258,7 @@ async fn main() {
info!("Connecting to brain...");
if let Err(e) = grpc::connect_and_run(grpc::ConnectionData {
contracts,
brain_url,
brain_msg_tx,
daemon_msg_rx,