diff --git a/Cargo.lock b/Cargo.lock index 6cb831b..eb58192 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.18" @@ -257,6 +272,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -347,6 +376,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bs58", + "chrono", "ed25519-dalek", "env_logger", "lazy_static", @@ -766,6 +796,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "1.5.0" @@ -1053,6 +1106,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "object" version = "0.36.5" @@ -2083,6 +2145,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-registry" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index cf07e2b..14a366e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ tokio-stream = "0.1.17" tonic = "0.12" serde_json = "1.0.135" bs58 = "0.5.1" +chrono = "0.4.39" [build-dependencies] tonic-build = "0.12" diff --git a/prod_setting/config1.yaml b/prod_setting/config1.yaml index 338b36c..e2048ec 100644 --- a/prod_setting/config1.yaml +++ b/prod_setting/config1.yaml @@ -1,6 +1,7 @@ +owner_wallet: "ThisIsNotARealWallet000000000000000000000000" brain_url: "http://164.92.249.180:31337" -max_cores_per_vm: 4 -max_vcpu_reservation: 8 +max_cores_per_vm: 8 +max_vcpu_reservation: 24 max_mem_reservation_mb: 25000 network_interfaces: - driver: "MACVTAP" @@ -26,3 +27,4 @@ public_port_range: start: 30000 end: 50000 max_ports_per_vm: 5 +price: 20000 diff --git a/snp.proto b/snp.proto index 528573f..fa9654e 100644 --- a/snp.proto +++ b/snp.proto @@ -8,6 +8,11 @@ message Pubkey { string pubkey = 1; } +message AccountBalance { + uint64 balance = 1; + uint64 tmp_locked = 2; +} + message Contract { string uuid = 1; string hostname = 2; @@ -23,7 +28,7 @@ message Contract { string dtrfs_sha = 12; string created_at = 13; string updated_at = 14; - // total nanotoken cost per minute (for all units) + // total nanoLP cost per minute (for all units) uint64 nano_per_minute = 15; uint64 locked_nano = 16; string collected_at = 17; @@ -47,6 +52,7 @@ message MeasurementIP { string gateway = 4; } +// This should also include a block hash or similar, for auth message RegisterNodeReq { string node_pubkey = 1; string owner_pubkey = 2; @@ -54,7 +60,7 @@ message RegisterNodeReq { string country = 4; string region = 5; string city = 6; - // nanotokens per unit per minute + // nanoLP per unit per minute uint64 price = 7; } @@ -96,13 +102,14 @@ message NewVmResp { message UpdateVmReq { string uuid = 1; - uint32 disk_size_gb = 2; - uint32 vcpus = 3; - uint32 memory_mb = 4; - string kernel_url = 5; - string kernel_sha = 6; - string dtrfs_url = 7; - string dtrfs_sha = 8; + string admin_pubkey = 2; + uint32 disk_size_gb = 3; + uint32 vcpus = 4; + uint32 memory_mb = 5; + string kernel_url = 6; + string kernel_sha = 7; + string dtrfs_url = 8; + string dtrfs_sha = 9; } message UpdateVmResp { @@ -113,6 +120,7 @@ message UpdateVmResp { message DeleteVmReq { string uuid = 1; + string admin_pubkey = 2; } message BrainMessage { @@ -123,9 +131,16 @@ message BrainMessage { } } +message DaemonStreamAuth { + string timestamp = 1; + string pubkey = 2; + repeated string contracts = 3; + string signature = 4; +} + message DaemonMessage { oneof Msg { - Pubkey pubkey = 1; + DaemonStreamAuth auth = 1; NewVmResp new_vm_resp = 2; UpdateVmResp update_vm_resp = 3; NodeResources node_resources = 4; @@ -134,7 +149,7 @@ message DaemonMessage { service BrainDaemon { rpc RegisterNode (RegisterNodeReq) returns (stream Contract); - rpc BrainMessages (Pubkey) returns (stream BrainMessage); + rpc BrainMessages (DaemonStreamAuth) returns (stream BrainMessage); rpc DaemonMessages (stream DaemonMessage) returns (Empty); } @@ -155,6 +170,7 @@ message NodeFilters { string region = 8; string city = 9; string ip = 10; + string node_pubkey = 11; } message NodeListResp { @@ -165,15 +181,24 @@ message NodeListResp { string ip = 5; // required for latency test uint32 server_rating = 6; uint32 provider_rating = 7; - // nanotokens per unit per minute + // nanoLP per unit per minute uint64 price = 8; } +message ExtendVmReq { + string uuid = 1; + string admin_pubkey = 2; + uint64 locked_nano = 3; +} + service BrainCli { + rpc GetAirdrop (Pubkey) returns (Empty); + rpc GetBalance (Pubkey) returns (AccountBalance); rpc NewVm (NewVmReq) returns (NewVmResp); rpc ListContracts (ListContractsReq) returns (stream Contract); rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc GetOneNode (NodeFilters) returns (NodeListResp); rpc DeleteVm (DeleteVmReq) returns (Empty); rpc UpdateVm (UpdateVmReq) returns (UpdateVmResp); + rpc ExtendVm (ExtendVmReq) returns (Empty); } diff --git a/src/config.rs b/src/config.rs index 04527a6..17cf5da 100644 --- a/src/config.rs +++ b/src/config.rs @@ -44,6 +44,7 @@ pub enum InterfaceType { #[derive(Deserialize, Debug)] pub struct Config { + pub owner_wallet: String, pub brain_url: String, pub max_cores_per_vm: usize, pub max_vcpu_reservation: usize, diff --git a/src/global.rs b/src/global.rs index 511144d..67e0994 100644 --- a/src/global.rs +++ b/src/global.rs @@ -49,6 +49,12 @@ fn load_secret_key() -> Result { )) } +pub fn sign_message(msg: &str) -> Result { + use ed25519_dalek::Signer; + let key = load_secret_key()?; + Ok(bs58::encode(key.sign(msg.as_bytes()).to_bytes()).into_string()) +} + pub fn get_public_key() -> String { let pubkey = bs58::encode(load_secret_key().unwrap().verifying_key().to_bytes()).into_string(); log::info!("Loaded the following public key: {pubkey}"); diff --git a/src/grpc.rs b/src/grpc.rs index 32cad9a..ccd1880 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,16 +1,14 @@ +use crate::global::*; use crate::snp_proto::DaemonMessage; use anyhow::Result; use log::{debug, info, warn}; -use snp_proto::{ - brain_daemon_client::BrainDaemonClient, BrainMessage, Contract, Pubkey, RegisterNodeReq, -}; +use snp_proto::{brain_daemon_client::BrainDaemonClient, BrainMessage, Contract, RegisterNodeReq}; use tokio::{ sync::mpsc::{Receiver, Sender}, task::JoinSet, }; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::transport::Channel; -use crate::global::*; pub mod snp_proto { tonic::include_proto!("snp_proto"); @@ -35,18 +33,32 @@ impl From for snp_proto::DaemonMessage { } pub async fn register_node(config: &crate::config::Config) -> Result> { + use tonic::metadata::AsciiMetadataValue; + use tonic::Request; let mut client = BrainDaemonClient::connect(config.brain_url.clone()).await?; debug!("Starting node registration..."); let ip_info = IP_INFO.clone(); let req = RegisterNodeReq { node_pubkey: PUBLIC_KEY.clone(), - owner_pubkey: "IamTheOwnerOf".to_string() + &PUBLIC_KEY, + owner_pubkey: config.owner_wallet.clone(), main_ip: ip_info.ip, country: ip_info.country, region: ip_info.region, city: ip_info.city, price: config.price, }; + + let pubkey = PUBLIC_KEY.clone(); + let timestamp = chrono::Utc::now().to_rfc3339(); + let signature = crate::global::sign_message(&format!("{timestamp}{req:?}"))?; + let timestamp: AsciiMetadataValue = timestamp.parse()?; + let pubkey: AsciiMetadataValue = pubkey.parse()?; + let signature: AsciiMetadataValue = signature.parse()?; + let mut req = Request::new(req); + req.metadata_mut().insert("timestamp", timestamp); + req.metadata_mut().insert("pubkey", pubkey); + req.metadata_mut().insert("request-signature", signature); + let mut contracts = Vec::new(); let mut grpc_stream = client.register_node(req).await?.into_inner(); while let Some(stream_update) = grpc_stream.next().await { @@ -64,13 +76,21 @@ pub async fn register_node(config: &crate::config::Config) -> Result) -> Result { + let pubkey = PUBLIC_KEY.clone(); + let timestamp = chrono::Utc::now().to_rfc3339(); + let signature = + crate::global::sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; + Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature }) +} + async fn receive_messages( mut client: BrainDaemonClient, + contracts: Vec, tx: Sender, ) -> Result<()> { debug!("starting to listen for messages from brain"); - let pubkey = PUBLIC_KEY.clone(); - let mut grpc_stream = client.brain_messages(Pubkey { pubkey }).await?.into_inner(); + let mut grpc_stream = client.brain_messages(sign_stream_auth(contracts)?).await?.into_inner(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(msg) => { @@ -88,20 +108,23 @@ async fn receive_messages( async fn send_messages( mut client: BrainDaemonClient, + contracts: Vec, rx: Receiver, tx: Sender, ) -> Result<()> { debug!("starting daemon message stream to brain"); - let pubkey = PUBLIC_KEY.clone(); let rx_stream = ReceiverStream::new(rx); - tx.send(DaemonMessage { msg: Some(snp_proto::daemon_message::Msg::Pubkey(Pubkey { pubkey })) }) - .await?; + tx.send(DaemonMessage { + msg: Some(snp_proto::daemon_message::Msg::Auth(sign_stream_auth(contracts)?)), + }) + .await?; client.daemon_messages(rx_stream).await?; debug!("send_newvm_resp is about to exit"); Ok(()) } pub struct ConnectionData { + pub contracts: Vec, pub brain_url: String, pub brain_msg_tx: Sender, pub daemon_msg_rx: Receiver, @@ -112,8 +135,13 @@ pub async fn connect_and_run(cd: ConnectionData) -> Result<()> { let client = BrainDaemonClient::connect(cd.brain_url).await?; let mut streaming_tasks = JoinSet::new(); - streaming_tasks.spawn(receive_messages(client.clone(), cd.brain_msg_tx)); - streaming_tasks.spawn(send_messages(client.clone(), cd.daemon_msg_rx, cd.daemon_msg_tx)); + streaming_tasks.spawn(receive_messages(client.clone(), cd.contracts.clone(), cd.brain_msg_tx)); + streaming_tasks.spawn(send_messages( + client.clone(), + cd.contracts, + cd.daemon_msg_rx, + cd.daemon_msg_tx, + )); let task_output = streaming_tasks.join_next().await; warn!("One stream exited: {task_output:?}"); diff --git a/src/main.rs b/src/main.rs index 8b91716..5f468f4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -140,8 +140,7 @@ impl VMHandler { async fn handle_update_vm_req(&mut self, update_vm_req: snp_proto::UpdateVmReq) -> Result<()> { debug!("Processing update vm request: {update_vm_req:?}"); let vm_id = update_vm_req.uuid.clone(); - let content = - std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; + let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; let mut vm: state::VM = serde_yaml::from_str(&content)?; match vm.update(update_vm_req.into(), &self.config, &mut self.res) { Ok(_) => { @@ -170,14 +169,14 @@ impl VMHandler { fn handle_delete_vm(&mut self, delete_vm_req: snp_proto::DeleteVmReq) -> Result<()> { let vm_id = delete_vm_req.uuid; - let content = - std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; + let content = std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &vm_id + ".yaml")?; let vm: state::VM = serde_yaml::from_str(&content)?; vm.delete(&mut self.res)?; Ok(()) } async fn run(mut self) { + sleep(Duration::from_millis(100)).await; self.send_node_resources().await; while let Some(brain_msg) = self.receiver.recv().await { match brain_msg.msg { @@ -206,15 +205,16 @@ impl VMHandler { for uuid in self.res.existing_vms.clone() { if contracts.iter().find(|c| c.uuid == uuid).is_none() { info!("VM {uuid} exists locally but not found in brain. Deleting..."); - let content = match std::fs::read_to_string( - VM_CONFIG_DIR.to_string() + &uuid + ".yaml", - ) { - Ok(content) => content, - Err(e) => { - log::error!("Could not find VM config for {uuid}. Cannot delete VM: {e:?}"); - continue; - } - }; + let content = + match std::fs::read_to_string(VM_CONFIG_DIR.to_string() + &uuid + ".yaml") { + Ok(content) => content, + Err(e) => { + log::error!( + "Could not find VM config for {uuid}. Cannot delete VM: {e:?}" + ); + continue; + } + }; let vm: crate::state::VM = match serde_yaml::from_str(&content) { Ok(vm) => vm, Err(e) => { @@ -243,8 +243,12 @@ async fn main() { let brain_url = vm_handler.config.brain_url.clone(); info!("Registering with the brain and getting back VM Contracts (if they exist)."); + let mut contracts: Vec = Vec::new(); match grpc::register_node(&vm_handler.config).await { - Ok(contracts) => vm_handler.clear_deleted_contracts(contracts), + Ok(c) => { + contracts.append(&mut c.iter().map(|c| c.uuid.clone()).collect()); + vm_handler.clear_deleted_contracts(c) + } Err(e) => log::error!("Could not get contracts from brain: {e:?}"), }; @@ -254,6 +258,7 @@ async fn main() { info!("Connecting to brain..."); if let Err(e) = grpc::connect_and_run(grpc::ConnectionData { + contracts, brain_url, brain_msg_tx, daemon_msg_rx,