diff --git a/.sample.env b/.sample.env index 7341fdb..71bcfb9 100644 --- a/.sample.env +++ b/.sample.env @@ -5,4 +5,5 @@ DB_NAMESPACE = "brain" DB_NAME = "migration" CERT_PATH = "./tmp/brain-crt.pem" CERT_KEY_PATH = "./tmp/brain-key.pem" +BRAIN_PUBLIC_ENDPOINT = "127.0.0.1:31337" # ADMIN_PUB_KEYS = "admin_key01, admin_key02, admin_key03" diff --git a/src/bin/brain.rs b/src/bin/brain.rs index 4798063..339301b 100644 --- a/src/bin/brain.rs +++ b/src/bin/brain.rs @@ -4,6 +4,7 @@ use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; use dotenv::dotenv; +use std::net::SocketAddr; use std::sync::Arc; use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH}; use surreal_brain::db; @@ -29,6 +30,12 @@ async fn main() { let db_ns = std::env::var("DB_NAMESPACE").expect("the env variable DB_NAMESPACE is not set"); let db_name = std::env::var("DB_NAME").expect("the environment variable DB_NAME is not set"); + // To make sure env is set correctly + std::env::var("BRAIN_PUBLIC_ENDPOINT") + .expect("the environment variable BRAIN_PUBLIC_ENDPOINT is not set") + .parse::() + .expect("BRAIN_PUBLIC_ENDPOINT is not a socket address"); + let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap(); let db_arc = Arc::new(db); diff --git a/src/db/mod.rs b/src/db/mod.rs index 7075287..cdecd5a 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -11,7 +11,7 @@ use prelude::*; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::{Client, Ws}; use surrealdb::opt::auth::Root; -use surrealdb::{Notification, Surreal}; +use surrealdb::{Notification, RecordId, Surreal}; use tokio::sync::mpsc::Sender; use tokio_stream::StreamExt; @@ -196,7 +196,47 @@ pub async fn live_appnode_msgs< Ok(()) } +pub async fn set_pubsub_node(db: &Surreal, id: RecordId) { + let pub_endpoint = std::env::var("BRAIN_PUBLIC_ENDPOINT").unwrap_or_default(); + match db + .query(format!("UPDATE $id SET pub_sub_node = '{pub_endpoint}'",)) + .bind(("id", id)) + .await + { + Ok(res) => log::info!("Updated pub_sub_node {:?}", res), + Err(e) => log::error!("Could not update pub_sub_node {:?}", e), + } +} + +pub async fn check_pubsub_node( + db: &Surreal, + id: RecordId, +) -> Result, Error> { + let pub_endpoint = + std::env::var("BRAIN_PUBLIC_ENDPOINT").unwrap_or("127.0.0.1:31337".to_string()); + + let mut query_resp = db.query(format!("select pub_sub_node from {id}")).await?; + let node_endpoint = query_resp + .take::>(0)? + .ok_or(tonic::Status::internal("Could not get current brain endpoint"))? + .pub_sub_node; + + if pub_endpoint == node_endpoint { + return Ok(None); + } + + let mut status = tonic::Status::new(tonic::Code::Unavailable, "moved"); + status.metadata_mut().insert("location", node_endpoint.parse().unwrap()); + + Ok(Some(status)) +} + #[derive(Deserialize, Debug, Clone)] pub struct ErrorFromTable { pub error: String, } + +#[derive(Deserialize, Debug)] +struct PubSubNode { + pub pub_sub_node: String, +} diff --git a/src/db/vm.rs b/src/db/vm.rs index 8dabcac..5a45677 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -430,8 +430,11 @@ impl ActiveVm { } pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { - let contract: Option = - db.query(format!("select * from {ACTIVE_VM}:{uuid};")).await?.take(0)?; + let contract: Option = db + .query(format!("select * from $active_vm_id;")) + .bind(("active_vm_id", RecordId::from((ACTIVE_VM, uuid)))) + .await? + .take(0)?; Ok(contract) } diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs index 39d103c..c78c346 100644 --- a/src/grpc/mod.rs +++ b/src/grpc/mod.rs @@ -4,55 +4,14 @@ pub mod types; pub mod vm; use crate::constants::ADMIN_ACCOUNTS; -use crate::db::prelude as db; use detee_shared::app_proto::*; use detee_shared::common_proto::{Empty, Pubkey}; use detee_shared::general_proto::{ AirdropReq, BanUserReq, KickReq, RegOperatorReq, ReportNodeReq, SlashReq, }; use detee_shared::vm_proto::*; -use serde::Deserialize; -use surrealdb::engine::remote::ws::Client; -use surrealdb::{RecordId, Surreal}; use tonic::{Request, Status}; -pub async fn set_pubsub_node(db: &Surreal, local_addr: &str, id: RecordId) { - dbg!(&local_addr); - match db.query(format!("UPDATE $id SET pub_sub_node = '{local_addr}'",)).bind(("id", id)).await - { - Ok(res) => log::info!("Updated pub_sub_node {:?}", res), - Err(e) => log::error!("Could not update pub_sub_node {:?}", e), - } -} - -pub async fn check_pubsub_node( - db: &Surreal, - local_addr: &str, - id: RecordId, -) -> Result, db::Error> { - dbg!(&local_addr); - #[derive(Deserialize, Debug)] - struct PubSubNode { - pub pub_sub_node: String, - } - let mut query_resp = db.query(format!("select pub_sub_node from {id}")).await?; - let node_endpoint = query_resp - .take::>(0)? - .ok_or(Status::internal("Could not get current brain endpoint"))? - .pub_sub_node; - - dbg!(&local_addr, &node_endpoint); - - if local_addr == node_endpoint { - return Ok(None); - } - - let mut status = Status::new(tonic::Code::Unavailable, "moved"); - status.metadata_mut().insert("location", node_endpoint.parse().unwrap()); - - Ok(Some(status)) -} - pub trait PubkeyGetter { fn get_pubkey(&self) -> Option; } diff --git a/src/grpc/vm.rs b/src/grpc/vm.rs index 2e2d594..7d62cac 100644 --- a/src/grpc/vm.rs +++ b/src/grpc/vm.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] use crate::constants::{ACCOUNT, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE}; use crate::db::prelude as db; -use crate::grpc::{check_pubsub_node, check_sig_from_parts, check_sig_from_req, set_pubsub_node}; +use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use detee_shared::common_proto::Empty; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemon; @@ -36,7 +36,6 @@ impl BrainVmDaemon for VmDaemonServer { &self, req: Request, ) -> Result, Status> { - let local_addr = req.local_addr().map(|addr| addr.to_string()).unwrap_or_default(); let req = check_sig_from_req(req)?; info!("Starting registration process for {:?}", req); let id = surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())); @@ -60,7 +59,7 @@ impl BrainVmDaemon for VmDaemonServer { } .register(&self.db) .await?; - set_pubsub_node(&self.db, &local_addr, id).await; + db::set_pubsub_node(&self.db, id).await; info!("Sending deleted contracts to {}", req.node_pubkey); let deleted_vms = db::DeletedVm::list_by_node(&self.db, &req.node_pubkey).await?; @@ -224,10 +223,9 @@ impl BrainVmCli for VmCliServer { type ListVmNodesStream = Pin> + Send>>; async fn new_vm(&self, req: Request) -> Result, Status> { - let local_addr = req.local_addr().map(|addr| addr.to_string()).unwrap_or_default(); let req = check_sig_from_req(req)?; let id = surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())); - if let Some(redirect) = check_pubsub_node(&self.db, &local_addr, id).await? { + if let Some(redirect) = db::check_pubsub_node(&self.db, id).await? { log::info!("redirect: {redirect}"); return Err(redirect); } @@ -271,6 +269,11 @@ impl BrainVmCli for VmCliServer { info!("Update VM requested via CLI: {req:?}"); let db_req: db::UpdateVmReq = req.clone().into(); + + if let Some(redirect) = db::check_pubsub_node(&self.db, db_req.vm_node.clone()).await? { + log::info!("redirect: {redirect}"); + return Err(redirect); + } let id = db_req.id.key().to_string(); let mut hostname_changed = false; @@ -352,6 +355,15 @@ impl BrainVmCli for VmCliServer { async fn delete_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; + let vm_node = db::ActiveVm::get_by_uuid(&self.db, &req.uuid) + .await? + .ok_or(Status::permission_denied("Unauthorized"))? + .vm_node; + + if let Some(redirect) = db::check_pubsub_node(&self.db, vm_node).await? { + log::info!("redirect: {redirect}"); + return Err(redirect); + } match db::ActiveVm::delete(&self.db, &req.admin_pubkey, &req.uuid).await { Ok(()) => Ok(Response::new(Empty {})), Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")), diff --git a/tests/grpc_general_test.rs b/tests/grpc_general_test.rs index 7d42c59..1dd3401 100644 --- a/tests/grpc_general_test.rs +++ b/tests/grpc_general_test.rs @@ -20,7 +20,13 @@ mod common; #[tokio::test] async fn test_general_balance() { - // env_logger::builder().filter_level(log::LevelFilter::Trace).init(); + /* + env_logger::builder() + .filter_level(log::LevelFilter::Info) + .filter_module("tungstenite", log::LevelFilter::Debug) + .filter_module("tokio_tungstenite", log::LevelFilter::Debug) + .init(); + */ prepare_test_db().await.unwrap(); let addr = run_service_in_background().await.unwrap(); @@ -343,7 +349,6 @@ async fn test_ban_user() { let operator_banned_you = create_new_vm(&db_conn, &user_key, node_pubkey, &brain_channel).await.err().unwrap(); - dbg!(&operator_banned_you); assert!(operator_banned_you.to_string().contains("This operator banned you")); } diff --git a/tests/grpc_redirect_test.rs b/tests/grpc_redirect_test.rs new file mode 100644 index 0000000..73e5b68 --- /dev/null +++ b/tests/grpc_redirect_test.rs @@ -0,0 +1,46 @@ +use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; +use common::test_utils::{airdrop, Key}; +use common::vm_daemon_utils::register_vm_node; +use detee_shared::vm_proto; +use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; +use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; + +mod common; + +#[tokio::test] +async fn test_pub_sub_redirect() { + let _ = prepare_test_db().await.unwrap(); + + let brain_channel = run_service_for_stream().await.unwrap(); + let mut vm_daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); + + let node_key = Key::new(); + + let pubsub_brain_endpoint = "192.168.1.1:31337"; + std::env::set_var("BRAIN_PUBLIC_ENDPOINT", pubsub_brain_endpoint); + register_vm_node(&mut vm_daemon_client, &node_key, &Key::new().pubkey).await.unwrap(); + std::env::set_var("BRAIN_PUBLIC_ENDPOINT", "127.0.0.1:31337"); + + let client_key = Key::new(); + + let new_vm_req = vm_proto::NewVmReq { + admin_pubkey: client_key.pubkey.clone(), + node_pubkey: node_key.pubkey.clone(), + price_per_unit: 1200, + extra_ports: vec![8080, 8081], + locked_nano: 100, + ..Default::default() + }; + + airdrop(&brain_channel, &client_key.pubkey, 10).await.unwrap(); + + std::env::set_var("BRAIN_PUBLIC_ENDPOINT", "10.0.0.1:31337"); + let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); + let redirect = + client_vm_cli.new_vm(client_key.sign_request(new_vm_req).unwrap()).await.err().unwrap(); + std::env::set_var("BRAIN_PUBLIC_ENDPOINT", "127.0.0.1:31337"); + assert_eq!(redirect.code(), tonic::Code::Unavailable); + let redirect_url = + redirect.metadata().get("location").and_then(|v| v.to_str().ok()).unwrap_or_default(); + assert_eq!(redirect_url, pubsub_brain_endpoint); +} diff --git a/tests/grpc_vm_cli_test.rs b/tests/grpc_vm_cli_test.rs index 4530132..8615a68 100644 --- a/tests/grpc_vm_cli_test.rs +++ b/tests/grpc_vm_cli_test.rs @@ -2,17 +2,13 @@ use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; use common::test_utils::{airdrop, Key}; use common::vm_cli_utils::{create_new_vm, user_list_vm_contracts}; use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node}; +use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; -use detee_shared::vm_proto::{self, DeleteVmReq, RegisterVmNodeReq}; -use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq}; +use detee_shared::vm_proto::{DeleteVmReq, ExtendVmReq, ListVmContractsReq, NewVmReq}; use futures::StreamExt; -use std::vec; -use surreal_brain::constants::{ - ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, TOKEN_DECIMAL, VM_NODE, -}; +use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, TOKEN_DECIMAL}; use surreal_brain::db::prelude as db; -use surreal_brain::grpc::check_pubsub_node; mod common; @@ -304,49 +300,3 @@ async fn test_extend_vm() { } // TODO: test register vm node, delete vm contract while node offline, kick, etc.. - -#[tokio::test] -async fn test_pub_sub_redirect() { - let db = prepare_test_db().await.unwrap(); - - let brain_channel = run_service_for_stream().await.unwrap(); - let mut vm_daemon_client = BrainVmDaemonClient::new(brain_channel.clone()); - - let node_key = Key::new(); - let operator_wallet = "Hv5q3enK249RUnLRLi9YNQMrPCRxvL2XnhznkzrtCmkG"; - - let req = RegisterVmNodeReq { - node_pubkey: node_key.pubkey.clone(), - operator_wallet: operator_wallet.to_string(), - main_ip: String::from("185.243.218.213"), - city: String::from("Oslo"), - country: String::from("Norway"), - region: String::from("EU"), - price: 1200, - }; - - vm_daemon_client - .register_vm_node(node_key.sign_request(req).unwrap()) - .await - .unwrap() - .into_inner(); - - // let client_key = Key::new(); - - // let new_vm_req = vm_proto::NewVmReq { - // admin_pubkey: client_key.pubkey.clone(), - // node_pubkey: node_key.pubkey.clone(), - // price_per_unit: 1200, - // extra_ports: vec![8080, 8081], - // locked_nano: 100, - // ..Default::default() - // }; - - let id = surrealdb::RecordId::from((VM_NODE, node_key.pubkey.clone())); - - let endpoint = check_pubsub_node(&db, "", id).await; - dbg!(&endpoint); - - // let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone()); - // let redirect = client_vm_cli.new_vm(client_key.sign_request(new_vm_req).unwrap()).await; -}