fix new_vm error

reverted stream query
checking err on db before listening error stream
refactor app grpc code
This commit is contained in:
Noor 2025-05-30 18:52:32 +05:30 committed by ghe0
parent 965ac90319
commit c8363b5e07
3 changed files with 34 additions and 39 deletions

@ -196,7 +196,7 @@ pub async fn live_appnode_msgs<
Ok(())
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug, Clone)]
pub struct ErrorFromTable {
pub error: String,
}

@ -329,18 +329,22 @@ impl WrappedMeasurement {
UPDATE_VM_REQ => UPDATE_VM_REQ,
_ => NEW_VM_REQ,
};
let mut args_stream = db
let mut resp = db
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
.query(format!(
"live select * from measurement_args where id = measurement_args:{vm_id};"
))
.await?
.stream::<Notification<vm_proto::MeasurementArgs>>(0)?;
.await?;
let mut error_stream = resp.stream::<Notification<ErrorFromTable>>(0)?;
let mut args_stream = resp.stream::<Notification<vm_proto::MeasurementArgs>>(1)?;
let mut error_stream = db
.query(format!("live select error from {table} where id = {NEW_VM_REQ}:{vm_id};"))
.await?
.stream::<Notification<ErrorFromTable>>(0)?;
let mut error =
db.query(format!("select error from {table} where id = {NEW_VM_REQ}:{vm_id}")).await?;
if let Some(error_on_newvm_req) = error.take::<Option<ErrorFromTable>>(0)? {
if !error_on_newvm_req.error.is_empty() {
return Ok(Self::Error(vm_id.to_string(), error_on_newvm_req.error));
}
}
let args: Option<vm_proto::MeasurementArgs> =
db.delete(("measurement_args", vm_id)).await?;
@ -358,10 +362,7 @@ impl WrappedMeasurement {
Ok(err_notif) => {
if err_notif.action == surrealdb::Action::Update
&& !err_notif.data.error.is_empty() {
return Ok::<WrappedMeasurement, Error>(
Self::Error(vm_id.to_string(),
err_notif.data.error)
);
return Ok(Self::Error(vm_id.to_string(), err_notif.data.error));
};
},
Err(e) => return Err(e.into()),

@ -15,7 +15,7 @@ use surrealdb::Surreal;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};
use tonic::{Response, Status, Streaming};
use tonic::{Request, Response, Status, Streaming};
pub struct AppDaemonServer {
pub db: Arc<Surreal<Client>>,
@ -34,8 +34,8 @@ impl BrainAppDaemon for AppDaemonServer {
async fn register_app_node(
&self,
req: tonic::Request<RegisterAppNodeReq>,
) -> Result<tonic::Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> {
req: Request<RegisterAppNodeReq>,
) -> Result<Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> {
let req = check_sig_from_req(req)?;
info!("Starting app_node registration process for {:?}", req);
@ -73,8 +73,8 @@ impl BrainAppDaemon for AppDaemonServer {
async fn brain_messages(
&self,
req: tonic::Request<DaemonAuth>,
) -> Result<tonic::Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> {
req: Request<DaemonAuth>,
) -> Result<Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> {
let auth = req.into_inner();
let pubkey = auth.pubkey.clone();
check_sig_from_parts(
@ -110,8 +110,8 @@ impl BrainAppDaemon for AppDaemonServer {
async fn daemon_messages(
&self,
req: tonic::Request<Streaming<DaemonMessageApp>>,
) -> Result<tonic::Response<Empty>, Status> {
req: Request<Streaming<DaemonMessageApp>>,
) -> Result<Response<Empty>, Status> {
let mut req_stream = req.into_inner();
let pubkey: String;
if let Some(Ok(msg)) = req_stream.next().await {
@ -180,18 +180,15 @@ impl BrainAppCli for AppCliServer {
type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>;
async fn new_app(
&self,
req: tonic::Request<detee_shared::app_proto::NewAppReq>,
) -> Result<tonic::Response<detee_shared::app_proto::NewAppRes>, Status> {
async fn new_app(&self, req: Request<NewAppReq>) -> Result<Response<NewAppRes>, Status> {
let req = check_sig_from_req(req)?;
info!("deploy_app process starting for {:?}", req);
info!("new_app process starting for {:?}", req);
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
return Err(Status::permission_denied("This operator banned you. What did you do?"));
}
let new_app_req: db::NewAppReq = req.into();
let id = new_app_req.id.to_string();
let db_req: db::NewAppReq = req.into();
let id = db_req.id.to_string();
let (tx, rx) = tokio::sync::oneshot::channel();
let db = self.db.clone();
@ -199,7 +196,7 @@ impl BrainAppCli for AppCliServer {
let _ = tx.send(db::ActiveApp::listen(&db, &id).await);
});
new_app_req.submit(&self.db).await?;
db_req.submit(&self.db).await?;
match rx.await {
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
@ -216,10 +213,7 @@ impl BrainAppCli for AppCliServer {
}
}
async fn delete_app(
&self,
req: tonic::Request<DelAppReq>,
) -> Result<tonic::Response<Empty>, Status> {
async fn delete_app(&self, req: Request<DelAppReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
info!("delete_app process starting for {:?}", req);
match ActiveApp::delete(&self.db, &req.uuid).await? {
@ -230,8 +224,8 @@ impl BrainAppCli for AppCliServer {
async fn list_app_contracts(
&self,
req: tonic::Request<ListAppContractsReq>,
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> {
req: Request<ListAppContractsReq>,
) -> Result<Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> {
let req = check_sig_from_req(req)?;
info!("list_app_contracts process starting for {:?}", req);
@ -270,8 +264,8 @@ impl BrainAppCli for AppCliServer {
async fn list_app_nodes(
&self,
req: tonic::Request<AppNodeFilters>,
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> {
req: Request<AppNodeFilters>,
) -> Result<Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> {
let req = check_sig_from_req(req)?;
info!("list_app_nodes process starting for {:?}", req);
let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?;
@ -287,8 +281,8 @@ impl BrainAppCli for AppCliServer {
async fn get_one_app_node(
&self,
req: tonic::Request<AppNodeFilters>,
) -> Result<tonic::Response<AppNodeListResp>, Status> {
req: Request<AppNodeFilters>,
) -> Result<Response<AppNodeListResp>, Status> {
let req = check_sig_from_req(req)?;
info!("get_one_app_node process starting for {:?}", req);
let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true)