From c8363b5e07d94fc02bedb3867bc15e8d47512434 Mon Sep 17 00:00:00 2001 From: Noor Date: Fri, 30 May 2025 18:52:32 +0530 Subject: [PATCH] fix new_vm error reverted stream query checking err on db before listening error stream refactor app grpc code --- src/db/mod.rs | 2 +- src/db/vm.rs | 25 +++++++++++++------------ src/grpc/app.rs | 46 ++++++++++++++++++++-------------------------- 3 files changed, 34 insertions(+), 39 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 8cd5ce3..7f22fdb 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -196,7 +196,7 @@ pub async fn live_appnode_msgs< Ok(()) } -#[derive(Deserialize)] +#[derive(Deserialize, Debug, Clone)] pub struct ErrorFromTable { pub error: String, } diff --git a/src/db/vm.rs b/src/db/vm.rs index 9841aba..b216206 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -329,18 +329,22 @@ impl WrappedMeasurement { UPDATE_VM_REQ => UPDATE_VM_REQ, _ => NEW_VM_REQ, }; - - let mut args_stream = db + let mut resp = db + .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) .query(format!( "live select * from measurement_args where id = measurement_args:{vm_id};" )) - .await? - .stream::>(0)?; + .await?; + let mut error_stream = resp.stream::>(0)?; + let mut args_stream = resp.stream::>(1)?; - let mut error_stream = db - .query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};")) - .await? - .stream::>(0)?; + let mut error = + db.query(format!("select error from {table} where id = {NEW_VM_REQ}:{vm_id}")).await?; + if let Some(error_on_newvm_req) = error.take::>(0)? { + if !error_on_newvm_req.error.is_empty() { + return Ok(Self::Error(vm_id.to_string(), error_on_newvm_req.error)); + } + } let args: Option = db.delete(("measurement_args", vm_id)).await?; @@ -358,10 +362,7 @@ impl WrappedMeasurement { Ok(err_notif) => { if err_notif.action == surrealdb::Action::Update && !err_notif.data.error.is_empty() { - return Ok::( - Self::Error(vm_id.to_string(), - err_notif.data.error) - ); + return Ok(Self::Error(vm_id.to_string(), err_notif.data.error)); }; }, Err(e) => return Err(e.into()), diff --git a/src/grpc/app.rs b/src/grpc/app.rs index 78be5af..ec742d7 100644 --- a/src/grpc/app.rs +++ b/src/grpc/app.rs @@ -15,7 +15,7 @@ use surrealdb::Surreal; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; -use tonic::{Response, Status, Streaming}; +use tonic::{Request, Response, Status, Streaming}; pub struct AppDaemonServer { pub db: Arc>, @@ -34,8 +34,8 @@ impl BrainAppDaemon for AppDaemonServer { async fn register_app_node( &self, - req: tonic::Request, - ) -> Result::RegisterAppNodeStream>, Status> { + req: Request, + ) -> Result::RegisterAppNodeStream>, Status> { let req = check_sig_from_req(req)?; info!("Starting app_node registration process for {:?}", req); @@ -73,8 +73,8 @@ impl BrainAppDaemon for AppDaemonServer { async fn brain_messages( &self, - req: tonic::Request, - ) -> Result::BrainMessagesStream>, Status> { + req: Request, + ) -> Result::BrainMessagesStream>, Status> { let auth = req.into_inner(); let pubkey = auth.pubkey.clone(); check_sig_from_parts( @@ -110,8 +110,8 @@ impl BrainAppDaemon for AppDaemonServer { async fn daemon_messages( &self, - req: tonic::Request>, - ) -> Result, Status> { + req: Request>, + ) -> Result, Status> { let mut req_stream = req.into_inner(); let pubkey: String; if let Some(Ok(msg)) = req_stream.next().await { @@ -180,18 +180,15 @@ impl BrainAppCli for AppCliServer { type ListAppContractsStream = Pin> + Send>>; type ListAppNodesStream = Pin> + Send>>; - async fn new_app( - &self, - req: tonic::Request, - ) -> Result, Status> { + async fn new_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; - info!("deploy_app process starting for {:?}", req); - + info!("new_app process starting for {:?}", req); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); } - let new_app_req: db::NewAppReq = req.into(); - let id = new_app_req.id.to_string(); + + let db_req: db::NewAppReq = req.into(); + let id = db_req.id.to_string(); let (tx, rx) = tokio::sync::oneshot::channel(); let db = self.db.clone(); @@ -199,7 +196,7 @@ impl BrainAppCli for AppCliServer { let _ = tx.send(db::ActiveApp::listen(&db, &id).await); }); - new_app_req.submit(&self.db).await?; + db_req.submit(&self.db).await?; match rx.await { Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( @@ -216,10 +213,7 @@ impl BrainAppCli for AppCliServer { } } - async fn delete_app( - &self, - req: tonic::Request, - ) -> Result, Status> { + async fn delete_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("delete_app process starting for {:?}", req); match ActiveApp::delete(&self.db, &req.uuid).await? { @@ -230,8 +224,8 @@ impl BrainAppCli for AppCliServer { async fn list_app_contracts( &self, - req: tonic::Request, - ) -> Result::ListAppContractsStream>, Status> { + req: Request, + ) -> Result::ListAppContractsStream>, Status> { let req = check_sig_from_req(req)?; info!("list_app_contracts process starting for {:?}", req); @@ -270,8 +264,8 @@ impl BrainAppCli for AppCliServer { async fn list_app_nodes( &self, - req: tonic::Request, - ) -> Result::ListAppNodesStream>, Status> { + req: Request, + ) -> Result::ListAppNodesStream>, Status> { let req = check_sig_from_req(req)?; info!("list_app_nodes process starting for {:?}", req); let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?; @@ -287,8 +281,8 @@ impl BrainAppCli for AppCliServer { async fn get_one_app_node( &self, - req: tonic::Request, - ) -> Result, Status> { + req: Request, + ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("get_one_app_node process starting for {:?}", req); let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true)