implemented redirect vm methods

Introduces new environment variable to set public endpoint of the brain
refactor redirect validation to db module
validating pubsub node on all vm
fixed some vm tests
This commit is contained in:
Noor 2025-06-12 18:27:26 +05:30
parent 5de6bd7cbf
commit 9363750d5b
Signed by: noormohammedb
GPG Key ID: D83EFB8B3B967146
9 changed files with 127 additions and 104 deletions

@ -5,4 +5,5 @@ DB_NAMESPACE = "brain"
DB_NAME = "migration"
CERT_PATH = "./tmp/brain-crt.pem"
CERT_KEY_PATH = "./tmp/brain-key.pem"
BRAIN_PUBLIC_ENDPOINT = "127.0.0.1:31337"
# ADMIN_PUB_KEYS = "admin_key01, admin_key02, admin_key03"

@ -4,6 +4,7 @@ use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer;
use dotenv::dotenv;
use std::net::SocketAddr;
use std::sync::Arc;
use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH};
use surreal_brain::db;
@ -29,6 +30,12 @@ async fn main() {
let db_ns = std::env::var("DB_NAMESPACE").expect("the env variable DB_NAMESPACE is not set");
let db_name = std::env::var("DB_NAME").expect("the environment variable DB_NAME is not set");
// To make sure env is set correctly
std::env::var("BRAIN_PUBLIC_ENDPOINT")
.expect("the environment variable BRAIN_PUBLIC_ENDPOINT is not set")
.parse::<SocketAddr>()
.expect("BRAIN_PUBLIC_ENDPOINT is not a socket address");
let db = db::db_connection(&db_url, &db_user, &db_pass, &db_ns, &db_name).await.unwrap();
let db_arc = Arc::new(db);

@ -11,7 +11,7 @@ use prelude::*;
use serde::{Deserialize, Serialize};
use surrealdb::engine::remote::ws::{Client, Ws};
use surrealdb::opt::auth::Root;
use surrealdb::{Notification, Surreal};
use surrealdb::{Notification, RecordId, Surreal};
use tokio::sync::mpsc::Sender;
use tokio_stream::StreamExt;
@ -196,7 +196,47 @@ pub async fn live_appnode_msgs<
Ok(())
}
pub async fn set_pubsub_node(db: &Surreal<Client>, id: RecordId) {
let pub_endpoint = std::env::var("BRAIN_PUBLIC_ENDPOINT").unwrap_or_default();
match db
.query(format!("UPDATE $id SET pub_sub_node = '{pub_endpoint}'",))
.bind(("id", id))
.await
{
Ok(res) => log::info!("Updated pub_sub_node {:?}", res),
Err(e) => log::error!("Could not update pub_sub_node {:?}", e),
}
}
pub async fn check_pubsub_node(
db: &Surreal<Client>,
id: RecordId,
) -> Result<Option<tonic::Status>, Error> {
let pub_endpoint =
std::env::var("BRAIN_PUBLIC_ENDPOINT").unwrap_or("127.0.0.1:31337".to_string());
let mut query_resp = db.query(format!("select pub_sub_node from {id}")).await?;
let node_endpoint = query_resp
.take::<Option<PubSubNode>>(0)?
.ok_or(tonic::Status::internal("Could not get current brain endpoint"))?
.pub_sub_node;
if pub_endpoint == node_endpoint {
return Ok(None);
}
let mut status = tonic::Status::new(tonic::Code::Unavailable, "moved");
status.metadata_mut().insert("location", node_endpoint.parse().unwrap());
Ok(Some(status))
}
#[derive(Deserialize, Debug, Clone)]
pub struct ErrorFromTable {
pub error: String,
}
#[derive(Deserialize, Debug)]
struct PubSubNode {
pub pub_sub_node: String,
}

@ -430,8 +430,11 @@ impl ActiveVm {
}
pub async fn get_by_uuid(db: &Surreal<Client>, uuid: &str) -> Result<Option<Self>, Error> {
let contract: Option<Self> =
db.query(format!("select * from {ACTIVE_VM}:{uuid};")).await?.take(0)?;
let contract: Option<Self> = db
.query(format!("select * from $active_vm_id;"))
.bind(("active_vm_id", RecordId::from((ACTIVE_VM, uuid))))
.await?
.take(0)?;
Ok(contract)
}

@ -4,55 +4,14 @@ pub mod types;
pub mod vm;
use crate::constants::ADMIN_ACCOUNTS;
use crate::db::prelude as db;
use detee_shared::app_proto::*;
use detee_shared::common_proto::{Empty, Pubkey};
use detee_shared::general_proto::{
AirdropReq, BanUserReq, KickReq, RegOperatorReq, ReportNodeReq, SlashReq,
};
use detee_shared::vm_proto::*;
use serde::Deserialize;
use surrealdb::engine::remote::ws::Client;
use surrealdb::{RecordId, Surreal};
use tonic::{Request, Status};
pub async fn set_pubsub_node(db: &Surreal<Client>, local_addr: &str, id: RecordId) {
dbg!(&local_addr);
match db.query(format!("UPDATE $id SET pub_sub_node = '{local_addr}'",)).bind(("id", id)).await
{
Ok(res) => log::info!("Updated pub_sub_node {:?}", res),
Err(e) => log::error!("Could not update pub_sub_node {:?}", e),
}
}
pub async fn check_pubsub_node(
db: &Surreal<Client>,
local_addr: &str,
id: RecordId,
) -> Result<Option<Status>, db::Error> {
dbg!(&local_addr);
#[derive(Deserialize, Debug)]
struct PubSubNode {
pub pub_sub_node: String,
}
let mut query_resp = db.query(format!("select pub_sub_node from {id}")).await?;
let node_endpoint = query_resp
.take::<Option<PubSubNode>>(0)?
.ok_or(Status::internal("Could not get current brain endpoint"))?
.pub_sub_node;
dbg!(&local_addr, &node_endpoint);
if local_addr == node_endpoint {
return Ok(None);
}
let mut status = Status::new(tonic::Code::Unavailable, "moved");
status.metadata_mut().insert("location", node_endpoint.parse().unwrap());
Ok(Some(status))
}
pub trait PubkeyGetter {
fn get_pubkey(&self) -> Option<String>;
}

@ -1,7 +1,7 @@
#![allow(dead_code)]
use crate::constants::{ACCOUNT, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE};
use crate::db::prelude as db;
use crate::grpc::{check_pubsub_node, check_sig_from_parts, check_sig_from_req, set_pubsub_node};
use crate::grpc::{check_sig_from_parts, check_sig_from_req};
use detee_shared::common_proto::Empty;
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli;
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemon;
@ -36,7 +36,6 @@ impl BrainVmDaemon for VmDaemonServer {
&self,
req: Request<RegisterVmNodeReq>,
) -> Result<Response<Self::RegisterVmNodeStream>, Status> {
let local_addr = req.local_addr().map(|addr| addr.to_string()).unwrap_or_default();
let req = check_sig_from_req(req)?;
info!("Starting registration process for {:?}", req);
let id = surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone()));
@ -60,7 +59,7 @@ impl BrainVmDaemon for VmDaemonServer {
}
.register(&self.db)
.await?;
set_pubsub_node(&self.db, &local_addr, id).await;
db::set_pubsub_node(&self.db, id).await;
info!("Sending deleted contracts to {}", req.node_pubkey);
let deleted_vms = db::DeletedVm::list_by_node(&self.db, &req.node_pubkey).await?;
@ -224,10 +223,9 @@ impl BrainVmCli for VmCliServer {
type ListVmNodesStream = Pin<Box<dyn Stream<Item = Result<VmNodeListResp, Status>> + Send>>;
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
let local_addr = req.local_addr().map(|addr| addr.to_string()).unwrap_or_default();
let req = check_sig_from_req(req)?;
let id = surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone()));
if let Some(redirect) = check_pubsub_node(&self.db, &local_addr, id).await? {
if let Some(redirect) = db::check_pubsub_node(&self.db, id).await? {
log::info!("redirect: {redirect}");
return Err(redirect);
}
@ -271,6 +269,11 @@ impl BrainVmCli for VmCliServer {
info!("Update VM requested via CLI: {req:?}");
let db_req: db::UpdateVmReq = req.clone().into();
if let Some(redirect) = db::check_pubsub_node(&self.db, db_req.vm_node.clone()).await? {
log::info!("redirect: {redirect}");
return Err(redirect);
}
let id = db_req.id.key().to_string();
let mut hostname_changed = false;
@ -352,6 +355,15 @@ impl BrainVmCli for VmCliServer {
async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
let vm_node = db::ActiveVm::get_by_uuid(&self.db, &req.uuid)
.await?
.ok_or(Status::permission_denied("Unauthorized"))?
.vm_node;
if let Some(redirect) = db::check_pubsub_node(&self.db, vm_node).await? {
log::info!("redirect: {redirect}");
return Err(redirect);
}
match db::ActiveVm::delete(&self.db, &req.admin_pubkey, &req.uuid).await {
Ok(()) => Ok(Response::new(Empty {})),
Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")),

@ -20,7 +20,13 @@ mod common;
#[tokio::test]
async fn test_general_balance() {
// env_logger::builder().filter_level(log::LevelFilter::Trace).init();
/*
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.filter_module("tungstenite", log::LevelFilter::Debug)
.filter_module("tokio_tungstenite", log::LevelFilter::Debug)
.init();
*/
prepare_test_db().await.unwrap();
let addr = run_service_in_background().await.unwrap();
@ -343,7 +349,6 @@ async fn test_ban_user() {
let operator_banned_you =
create_new_vm(&db_conn, &user_key, node_pubkey, &brain_channel).await.err().unwrap();
dbg!(&operator_banned_you);
assert!(operator_banned_you.to_string().contains("This operator banned you"));
}

@ -0,0 +1,46 @@
use common::prepare_test_env::{prepare_test_db, run_service_for_stream};
use common::test_utils::{airdrop, Key};
use common::vm_daemon_utils::register_vm_node;
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
mod common;
#[tokio::test]
async fn test_pub_sub_redirect() {
let _ = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let mut vm_daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
let node_key = Key::new();
let pubsub_brain_endpoint = "192.168.1.1:31337";
std::env::set_var("BRAIN_PUBLIC_ENDPOINT", pubsub_brain_endpoint);
register_vm_node(&mut vm_daemon_client, &node_key, &Key::new().pubkey).await.unwrap();
std::env::set_var("BRAIN_PUBLIC_ENDPOINT", "127.0.0.1:31337");
let client_key = Key::new();
let new_vm_req = vm_proto::NewVmReq {
admin_pubkey: client_key.pubkey.clone(),
node_pubkey: node_key.pubkey.clone(),
price_per_unit: 1200,
extra_ports: vec![8080, 8081],
locked_nano: 100,
..Default::default()
};
airdrop(&brain_channel, &client_key.pubkey, 10).await.unwrap();
std::env::set_var("BRAIN_PUBLIC_ENDPOINT", "10.0.0.1:31337");
let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
let redirect =
client_vm_cli.new_vm(client_key.sign_request(new_vm_req).unwrap()).await.err().unwrap();
std::env::set_var("BRAIN_PUBLIC_ENDPOINT", "127.0.0.1:31337");
assert_eq!(redirect.code(), tonic::Code::Unavailable);
let redirect_url =
redirect.metadata().get("location").and_then(|v| v.to_str().ok()).unwrap_or_default();
assert_eq!(redirect_url, pubsub_brain_endpoint);
}

@ -2,17 +2,13 @@ use common::prepare_test_env::{prepare_test_db, run_service_for_stream};
use common::test_utils::{airdrop, Key};
use common::vm_cli_utils::{create_new_vm, user_list_vm_contracts};
use common::vm_daemon_utils::{mock_vm_daemon, register_vm_node};
use detee_shared::vm_proto;
use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient;
use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient;
use detee_shared::vm_proto::{self, DeleteVmReq, RegisterVmNodeReq};
use detee_shared::vm_proto::{ExtendVmReq, ListVmContractsReq, NewVmReq};
use detee_shared::vm_proto::{DeleteVmReq, ExtendVmReq, ListVmContractsReq, NewVmReq};
use futures::StreamExt;
use std::vec;
use surreal_brain::constants::{
ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, TOKEN_DECIMAL, VM_NODE,
};
use surreal_brain::constants::{ACCOUNT, ACTIVE_VM, DELETED_VM, NEW_VM_REQ, TOKEN_DECIMAL};
use surreal_brain::db::prelude as db;
use surreal_brain::grpc::check_pubsub_node;
mod common;
@ -304,49 +300,3 @@ async fn test_extend_vm() {
}
// TODO: test register vm node, delete vm contract while node offline, kick, etc..
#[tokio::test]
async fn test_pub_sub_redirect() {
let db = prepare_test_db().await.unwrap();
let brain_channel = run_service_for_stream().await.unwrap();
let mut vm_daemon_client = BrainVmDaemonClient::new(brain_channel.clone());
let node_key = Key::new();
let operator_wallet = "Hv5q3enK249RUnLRLi9YNQMrPCRxvL2XnhznkzrtCmkG";
let req = RegisterVmNodeReq {
node_pubkey: node_key.pubkey.clone(),
operator_wallet: operator_wallet.to_string(),
main_ip: String::from("185.243.218.213"),
city: String::from("Oslo"),
country: String::from("Norway"),
region: String::from("EU"),
price: 1200,
};
vm_daemon_client
.register_vm_node(node_key.sign_request(req).unwrap())
.await
.unwrap()
.into_inner();
// let client_key = Key::new();
// let new_vm_req = vm_proto::NewVmReq {
// admin_pubkey: client_key.pubkey.clone(),
// node_pubkey: node_key.pubkey.clone(),
// price_per_unit: 1200,
// extra_ports: vec![8080, 8081],
// locked_nano: 100,
// ..Default::default()
// };
let id = surrealdb::RecordId::from((VM_NODE, node_key.pubkey.clone()));
let endpoint = check_pubsub_node(&db, "", id).await;
dbg!(&endpoint);
// let mut client_vm_cli = BrainVmCliClient::new(brain_channel.clone());
// let redirect = client_vm_cli.new_vm(client_key.sign_request(new_vm_req).unwrap()).await;
}