diff --git a/src/db/app.rs b/src/db/app.rs index 3753d58..4561270 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -2,7 +2,7 @@ use std::time::Duration; use super::Error; use crate::constants::{ - ACCOUNT, ACTIVE_APP, APP_DAEMON_TIMEOUT, APP_NODE, DELETED_APP, NEW_APP_REQ, + ACCOUNT, ACTIVE_APP, APP_DAEMON_TIMEOUT, APP_NODE, DEFAULT_ENDPOINT, DELETED_APP, NEW_APP_REQ, }; use crate::db; use crate::db::general::Report; @@ -18,6 +18,7 @@ use tokio_stream::StreamExt; pub struct AppNode { pub id: RecordId, pub operator: RecordId, + pub pub_sub_node: String, pub country: String, pub region: String, pub city: String, @@ -35,7 +36,7 @@ impl AppNode { pub async fn register(self, db: &Surreal) -> Result { db::Account::get_or_create(db, &self.operator.key().to_string()).await?; let app_node_id = self.id.clone(); - let app_node: Option = db.upsert(app_node_id.clone()).content(self).await?; + let app_node: Option = db.upsert(app_node_id.clone()).content(self).await.unwrap(); app_node.ok_or(Error::FailedToCreateDBEntry(format!("{APP_NODE}:{app_node_id}"))) } } @@ -312,6 +313,15 @@ impl ActiveApp { (self.vcpus as f64 * 5f64) + (self.memory_mb as f64 / 200f64) + (self.disk_size_gb as f64) } + pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { + let contract: Option = db + .query("select * from $active_app_id;".to_string()) + .bind(("active_app_id", RecordId::from((ACTIVE_APP, uuid)))) + .await? + .take(0)?; + Ok(contract) + } + pub async fn activate( db: &Surreal, new_app_res: app_proto::NewAppRes, @@ -606,6 +616,7 @@ impl From<&old_brain::BrainData> for Vec { nodes.push(AppNode { id: RecordId::from((APP_NODE, old_node.node_pubkey.clone())), operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), + pub_sub_node: DEFAULT_ENDPOINT.to_string(), country: old_node.country.clone(), region: old_node.region.clone(), city: old_node.city.clone(), diff --git a/src/db/mod.rs b/src/db/mod.rs index ef55c45..9a4046a 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -203,7 +203,8 @@ pub async fn check_pubsub_node( let pub_endpoint = std::env::var("BRAIN_PUBLIC_ENDPOINT").unwrap_or(DEFAULT_ENDPOINT.to_string()); - let mut query_resp = db.query(format!("select pub_sub_node from {id}")).await?; + let mut query_resp = + db.query("select pub_sub_node from $record_id").bind(("record_id", id)).await?; let node_endpoint = query_resp .take::>(0)? .ok_or(tonic::Status::internal("Could not get current brain endpoint"))? diff --git a/src/db/vm.rs b/src/db/vm.rs index 4f1583a..a64e56b 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -432,7 +432,7 @@ impl ActiveVm { pub async fn get_by_uuid(db: &Surreal, uuid: &str) -> Result, Error> { let contract: Option = db - .query(format!("select * from $active_vm_id;")) + .query("select * from $active_vm_id;".to_string()) .bind(("active_vm_id", RecordId::from((ACTIVE_VM, uuid)))) .await? .take(0)?; diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 0f5094c..2464813 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -1,4 +1,4 @@ -use crate::constants::{ACCOUNT, APP_NODE}; +use crate::constants::{ACCOUNT, APP_NODE, DEFAULT_ENDPOINT}; use crate::db::app::ActiveApp; use crate::db::prelude as db; use crate::grpc::{check_sig_from_parts, check_sig_from_req}; @@ -42,6 +42,8 @@ impl BrainAppDaemon for AppDaemonServer { let app_node = db::AppNode { id: RecordId::from((APP_NODE, req.node_pubkey.clone())), operator: RecordId::from((ACCOUNT, req.operator_wallet)), + pub_sub_node: std::env::var("BRAIN_PUBLIC_ENDPOINT") + .unwrap_or(DEFAULT_ENDPOINT.to_string()), country: req.country, region: req.region, city: req.city, @@ -182,6 +184,13 @@ impl BrainAppCli for AppCliServer { async fn new_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; + + let id = surrealdb::RecordId::from((APP_NODE, req.node_pubkey.clone())); + if let Some(redirect) = db::check_pubsub_node(&self.db, id).await? { + log::info!("redirect: {redirect}"); + return Err(redirect); + } + // TODO: make it atleast 1 hour if req.locked_nano < 100 { log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); @@ -225,6 +234,15 @@ impl BrainAppCli for AppCliServer { async fn delete_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; + let app_node = db::ActiveApp::get_by_uuid(&self.db, &req.uuid) + .await? + .ok_or(Status::permission_denied("Unauthorized"))? + .app_node; + + if let Some(redirect) = db::check_pubsub_node(&self.db, app_node).await? { + log::info!("redirect: {redirect}"); + return Err(redirect); + } info!("delete_app process starting for {:?}", req); match ActiveApp::delete(&self.db, &req.admin_pubkey, &req.uuid).await { Ok(()) => Ok(Response::new(Empty {})), diff --git a/surql/tables.sql b/surql/tables.sql index 90012ff..5d0ba98 100644 --- a/surql/tables.sql +++ b/surql/tables.sql @@ -81,7 +81,7 @@ DEFINE FIELD price_per_unit ON TABLE deleted_vm TYPE int; DEFINE TABLE app_node SCHEMAFULL; DEFINE FIELD operator ON TABLE app_node TYPE record; -DEFINE FIELD pub_sub_node ON TABLE vm_node TYPE string default "127.0.0.1:31337"; +DEFINE FIELD pub_sub_node ON TABLE app_node TYPE string default "127.0.0.1:31337"; DEFINE FIELD country ON TABLE app_node TYPE string; DEFINE FIELD region ON TABLE app_node TYPE string; DEFINE FIELD city ON TABLE app_node TYPE string; diff --git a/tests/grpc_redirect_test.rs b/tests/grpc_redirect_test.rs index 73e5b68..8cb807a 100644 --- a/tests/grpc_redirect_test.rs +++ b/tests/grpc_redirect_test.rs @@ -1,14 +1,19 @@ use common::prepare_test_env::{prepare_test_db, run_service_for_stream}; use common::test_utils::{airdrop, Key}; use common::vm_daemon_utils::register_vm_node; +use detee_shared::app_proto; +use detee_shared::app_proto::brain_app_cli_client::BrainAppCliClient; +use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::brain_vm_daemon_client::BrainVmDaemonClient; +use crate::common::app_daemon_utils::register_app_node; + mod common; #[tokio::test] -async fn test_pub_sub_redirect() { +async fn test_vm_pub_sub_redirect() { let _ = prepare_test_db().await.unwrap(); let brain_channel = run_service_for_stream().await.unwrap(); @@ -44,3 +49,45 @@ async fn test_pub_sub_redirect() { redirect.metadata().get("location").and_then(|v| v.to_str().ok()).unwrap_or_default(); assert_eq!(redirect_url, pubsub_brain_endpoint); } + +#[tokio::test] +async fn test_app_pub_sub_redirect() { + let _ = prepare_test_db().await.unwrap(); + + let brain_channel = run_service_for_stream().await.unwrap(); + let mut app_daemon_client = BrainAppDaemonClient::new(brain_channel.clone()); + + let node_key = Key::new(); + + let pubsub_brain_endpoint = "192.168.1.1:31337"; + std::env::set_var("BRAIN_PUBLIC_ENDPOINT", pubsub_brain_endpoint); + register_app_node(&mut app_daemon_client, &node_key, &Key::new().pubkey).await.unwrap(); + std::env::set_var("BRAIN_PUBLIC_ENDPOINT", "127.0.0.1:31337"); + + let client_key = Key::new(); + + let new_app_req = app_proto::NewAppReq { + admin_pubkey: client_key.pubkey.clone(), + node_pubkey: node_key.pubkey.clone(), + price_per_unit: 1200, + resource: Some(app_proto::AppResource { ports: vec![8080, 8081], ..Default::default() }), + locked_nano: 100, + ..Default::default() + }; + + airdrop(&brain_channel, &client_key.pubkey, 10).await.unwrap(); + + std::env::set_var("BRAIN_PUBLIC_ENDPOINT", "10.0.0.1:31337"); + + let mut client_app_cli = BrainAppCliClient::new(brain_channel.clone()); + let redirect = client_app_cli + .new_app(client_key.sign_request(new_app_req.clone()).unwrap()) + .await + .err() + .unwrap(); + std::env::set_var("BRAIN_PUBLIC_ENDPOINT", "127.0.0.1:31337"); + assert_eq!(redirect.code(), tonic::Code::Unavailable); + let redirect_url = + redirect.metadata().get("location").and_then(|v| v.to_str().ok()).unwrap_or_default(); + assert_eq!(redirect_url, pubsub_brain_endpoint); +}