use crate::constants::{ACCOUNT, APP_NODE, DEFAULT_ENDPOINT}; use crate::db::app::ActiveApp; use crate::db::prelude as db; use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use detee_shared::app_proto::brain_app_cli_server::BrainAppCli; use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon; use detee_shared::app_proto::*; use detee_shared::common_proto::Empty; use log::info; use std::pin::Pin; use std::sync::Arc; use surrealdb::engine::remote::ws::Client; use surrealdb::RecordId; use surrealdb::Surreal; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; pub struct AppDaemonServer { pub db: Arc>, } impl AppDaemonServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] impl BrainAppDaemon for AppDaemonServer { type RegisterAppNodeStream = Pin> + Send>>; type BrainMessagesStream = Pin> + Send>>; async fn register_app_node( &self, req: Request, ) -> Result::RegisterAppNodeStream>, Status> { let req = check_sig_from_req(req)?; info!("Starting app_node registration process for {:?}", req); let app_node = db::AppNode { id: RecordId::from((APP_NODE, req.node_pubkey.clone())), operator: RecordId::from((ACCOUNT, req.operator_wallet)), pub_sub_node: std::env::var("BRAIN_PUBLIC_ENDPOINT") .unwrap_or(DEFAULT_ENDPOINT.to_string()), country: req.country, region: req.region, city: req.city, ip: req.main_ip, price: req.price, avail_mem_mb: 0, avail_vcpus: 0, avail_storage_gbs: 0, avail_ports: 0, max_ports_per_app: 0, offline_minutes: 0, }; app_node.register(&self.db).await?; info!("Sending existing contracts to {}", req.node_pubkey); let deleted_apps = db::DeletedApp::list_by_node(&self.db, &req.node_pubkey).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for deleted_app in deleted_apps { let _ = tx.send(Ok(deleted_app.into())).await; } }); let resp_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(resp_stream))) } async fn brain_messages( &self, req: Request, ) -> Result::BrainMessagesStream>, Status> { let auth = req.into_inner(); let pubkey = auth.pubkey.clone(); check_sig_from_parts( &pubkey, &auth.timestamp, &format!("{:?}", auth.contracts), &auth.signature, )?; info!("App Daemon {} connected to receive brain messages", pubkey); let (tx, rx) = mpsc::channel(6); { let db = self.db.clone(); let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { let _ = db::live_appnode_msgs::(&db, &pubkey, tx).await; }); } { let db = self.db.clone(); let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { let _ = db::live_appnode_msgs::(&db, &pubkey, tx).await; }); } let resp_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into())); Ok(Response::new(Box::pin(resp_stream))) } async fn daemon_messages( &self, req: Request>, ) -> Result, Status> { let mut req_stream = req.into_inner(); let pubkey: String; if let Some(Ok(msg)) = req_stream.next().await { log::debug!("App daemon_messages received auth message: {msg:?}"); if let Some(daemon_message_app::Msg::Auth(auth)) = msg.msg { pubkey = auth.pubkey.clone(); check_sig_from_parts( &pubkey, &auth.timestamp, &format!("{:?}", &auth.contracts), &auth.signature, )?; } else { return Err(Status::unauthenticated( "Could not authenticate the app daemon: could not extract auth signature", )); } } else { return Err(Status::unauthenticated("Could not authenticate the app daemon")); } while let Some(daemon_message) = req_stream.next().await { match daemon_message { Ok(msg) => match msg.msg { Some(daemon_message_app::Msg::NewAppRes(new_app_resp)) => { if !new_app_resp.error.is_empty() { db::NewAppReq::submit_error( &self.db, &new_app_resp.uuid, new_app_resp.error, ) .await?; } else { db::ActiveApp::activate(&self.db, new_app_resp).await?; } } Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => { let node_resource: db::AppNodeResources = app_node_resources.into(); node_resource.merge(&self.db, &pubkey).await?; } _ => {} }, Err(e) => { log::warn!("App Daemon Disconnected: {e:?}") } } } Ok(Response::new(Empty {})) } } pub struct AppCliServer { pub db: Arc>, } impl AppCliServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] impl BrainAppCli for AppCliServer { type ListAppContractsStream = Pin> + Send>>; type ListAppNodesStream = Pin> + Send>>; async fn new_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; let id = surrealdb::RecordId::from((APP_NODE, req.node_pubkey.clone())); if let Some(redirect) = db::check_pubsub_node(&self.db, id).await? { log::info!("redirect: {redirect}"); return Err(redirect); } // TODO: make it atleast 1 hour if req.locked_nano < 100 { log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); return Err(Status::unknown("lock atleaset 100 nano lps")); } info!("new_app process starting for {:?}", req); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); } let db_req: db::NewAppReq = req.into(); let id = db_req.id.key().to_string(); let (tx, rx) = tokio::sync::oneshot::channel(); let db = self.db.clone(); tokio::spawn(async move { let _ = tx.send(db::WrappedAppResp::listen(&db, &id).await); }); db_req.submit(&self.db).await?; match rx.await { Ok(Ok(db::WrappedAppResp::NewAppRes(new_app_resp))) => Ok(Response::new(new_app_resp)), Ok(Ok(db::WrappedAppResp::Error(err))) => Ok(Response::new(err)), Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( "Network timeout. Please try again later or contact the DeTEE devs team.", )), Ok(Err(e)) => { log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); Err(Status::unknown( "Unknown error. Please try again or contact the DeTEE devs team.", )) } Err(e) => { log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}"); Err(Status::unknown( "Unknown error. Please try again or contact the DeTEE devs team.", )) } } } async fn delete_app(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; let app_node = db::ActiveApp::get_by_uuid(&self.db, &req.uuid) .await? .ok_or(Status::permission_denied("Unauthorized"))? .app_node; if let Some(redirect) = db::check_pubsub_node(&self.db, app_node).await? { log::info!("redirect: {redirect}"); return Err(redirect); } info!("delete_app process starting for {:?}", req); match ActiveApp::delete(&self.db, &req.admin_pubkey, &req.uuid).await { Ok(()) => Ok(Response::new(Empty {})), Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")), Err(e) => { log::error!("Error deleting app contract {}: {e}", &req.uuid); Err(Status::unknown( "Unknown error. Please try again or contact the DeTEE devs team.", )) } } } async fn list_app_contracts( &self, req: Request, ) -> Result::ListAppContractsStream>, Status> { let req = check_sig_from_req(req)?; info!("list_app_contracts process starting for {:?}", req); let mut app_contracts = Vec::new(); if !req.uuid.is_empty() { if let Some(app_contract) = db::ActiveAppWithNode::get_by_uuid(&self.db, &req.uuid).await? { if app_contract.admin.key().to_string() == req.admin_pubkey || app_contract.app_node.operator.key().to_string() == req.admin_pubkey { app_contracts.push(app_contract); } } } else if req.as_operator { app_contracts.append( &mut db::ActiveAppWithNode::list_by_operator(&self.db, &req.admin_pubkey).await?, ); } else { app_contracts.append( &mut db::ActiveAppWithNode::list_by_admin(&self.db, &req.admin_pubkey).await?, ); } let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for app_contract in app_contracts { let _ = tx.send(Ok(app_contract.into())).await; } }); let resp_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(resp_stream))) } async fn list_app_nodes( &self, req: Request, ) -> Result::ListAppNodesStream>, Status> { let req = check_sig_from_req(req)?; info!("list_app_nodes process starting for {:?}", req); let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for app_node in app_nodes { let _ = tx.send(Ok(app_node.into())).await; } }); let resp_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(resp_stream))) } async fn get_one_app_node( &self, req: Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("get_one_app_node process starting for {:?}", req); let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true) .await? .first() .ok_or(Status::not_found("No app node found"))? .clone(); Ok(Response::new(app_node.into())) } }