use crate::constants::{ACCOUNT, APP_NODE}; use crate::db::prelude as db; use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use detee_shared::app_proto::brain_app_cli_server::BrainAppCli; use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon; use detee_shared::app_proto::{ daemon_message_app, AppContract, AppNodeFilters, AppNodeListResp, BrainMessageApp, DaemonAuth, DaemonMessageApp, DelAppReq, ListAppContractsReq, RegisterAppNodeReq, }; use detee_shared::common_proto::Empty; use log::info; use std::pin::Pin; use std::sync::Arc; use surrealdb::engine::remote::ws::Client; use surrealdb::RecordId; use surrealdb::Surreal; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; use tonic::{Response, Status, Streaming}; pub struct AppDaemonServer { pub db: Arc>, } impl AppDaemonServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] impl BrainAppDaemon for AppDaemonServer { type RegisterAppNodeStream = Pin> + Send>>; type BrainMessagesStream = Pin> + Send>>; async fn register_app_node( &self, req: tonic::Request, ) -> Result::RegisterAppNodeStream>, tonic::Status> { let req = check_sig_from_req(req)?; info!("Starting app_node registration process for {:?}", req); let app_node = db::AppNode { id: RecordId::from((APP_NODE, req.node_pubkey.clone())), operator: RecordId::from((ACCOUNT, req.operator_wallet)), country: req.country, region: req.region, city: req.city, ip: req.main_ip, price: req.price, avail_mem_mb: 0, avail_vcpus: 0, avail_storage_gbs: 0, avail_ports: 0, max_ports_per_app: 0, offline_minutes: 0, }; app_node.register(&self.db).await?; info!("Sending existing contracts to {}", req.node_pubkey); let contracts = db::ActiveAppWithNode::list_by_node(&self.db, &req.node_pubkey).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { let _ = tx.send(Ok(contract.into())).await; } }); let resp_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(resp_stream))) } async fn brain_messages( &self, req: tonic::Request, ) -> Result::BrainMessagesStream>, tonic::Status> { let auth = req.into_inner(); let pubkey = auth.pubkey.clone(); check_sig_from_parts( &pubkey, &auth.timestamp, &format!("{:?}", auth.contracts), &auth.signature, )?; info!("App Daemon {} connected to receive brain messages", pubkey); let (tx, rx) = mpsc::channel(6); { let db = self.db.clone(); let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { let _ = db::live_appnode_msgs::(&db, &pubkey, tx).await; }); } { let db = self.db.clone(); let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { let _ = db::live_appnode_msgs::(&db, &pubkey, tx).await; }); } let resp_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into())); Ok(Response::new(Box::pin(resp_stream))) } async fn daemon_messages( &self, req: tonic::Request>, ) -> Result, tonic::Status> { let mut req_stream = req.into_inner(); let pubkey: String; if let Some(Ok(msg)) = req_stream.next().await { log::debug!("App daemon_messages received auth message: {:?}", msg); if let Some(daemon_message_app::Msg::Auth(auth)) = msg.msg { pubkey = auth.pubkey.clone(); check_sig_from_parts( &pubkey, &auth.timestamp, &format!("{:?}", &auth.contracts), &auth.signature, )?; } else { return Err(Status::unauthenticated( "Could not authenticate the app daemon: could not extract auth signature", )); } } else { return Err(Status::unauthenticated("Could not authenticate the app daemon")); } todo!() } } pub struct AppCliServer { pub db: Arc>, } impl AppCliServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] impl BrainAppCli for AppCliServer { type ListAppContractsStream = Pin> + Send>>; type ListAppNodesStream = Pin> + Send>>; async fn deploy_app( &self, req: tonic::Request, ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("deploy_app process starting for {:?}", req); todo!() } async fn delete_app( &self, req: tonic::Request, ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("delete_app process starting for {:?}", req); todo!() } async fn list_app_contracts( &self, req: tonic::Request, ) -> Result::ListAppContractsStream>, tonic::Status> { let req = check_sig_from_req(req)?; info!("list_app_contracts process starting for {:?}", req); todo!() } async fn list_app_nodes( &self, req: tonic::Request, ) -> Result::ListAppNodesStream>, tonic::Status> { let req = check_sig_from_req(req)?; info!("list_app_nodes process starting for {:?}", req); todo!() } async fn get_one_app_node( &self, req: tonic::Request, ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("get_one_app_node process starting for {:?}", req); todo!() } }