brain/src/grpc/app.rs
Noor d1e85ec03e
Kick Contract (#2)
Production grade kick contract code for financial transaction tested for both vm and app
modified schema to include node in kick, default time for deleted_app and removed unwanted fields
app data migration for testing kick contract for active_app
fixed some tests
register operator

Reviewed-on: #2
Co-authored-by: Noor <noormohammedb@protonmail.com>
Co-committed-by: Noor <noormohammedb@protonmail.com>
Signed-off-by: Noor <noormohammedb@protonmail.com>
2025-05-26 03:17:00 +05:30

303 lines
11 KiB
Rust

use crate::constants::{ACCOUNT, APP_NODE};
use crate::db::app::ActiveApp;
use crate::db::prelude as db;
use crate::grpc::{check_sig_from_parts, check_sig_from_req};
use detee_shared::app_proto::brain_app_cli_server::BrainAppCli;
use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemon;
use detee_shared::app_proto::*;
use detee_shared::common_proto::Empty;
use log::info;
use std::pin::Pin;
use std::sync::Arc;
use surrealdb::engine::remote::ws::Client;
use surrealdb::RecordId;
use surrealdb::Surreal;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};
use tonic::{Response, Status, Streaming};
pub struct AppDaemonServer {
pub db: Arc<Surreal<Client>>,
}
impl AppDaemonServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db }
}
}
#[tonic::async_trait]
impl BrainAppDaemon for AppDaemonServer {
type RegisterAppNodeStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessageApp, Status>> + Send>>;
async fn register_app_node(
&self,
req: tonic::Request<RegisterAppNodeReq>,
) -> Result<tonic::Response<<Self as BrainAppDaemon>::RegisterAppNodeStream>, Status> {
let req = check_sig_from_req(req)?;
info!("Starting app_node registration process for {:?}", req);
let app_node = db::AppNode {
id: RecordId::from((APP_NODE, req.node_pubkey.clone())),
operator: RecordId::from((ACCOUNT, req.operator_wallet)),
country: req.country,
region: req.region,
city: req.city,
ip: req.main_ip,
price: req.price,
avail_mem_mb: 0,
avail_vcpus: 0,
avail_storage_gbs: 0,
avail_ports: 0,
max_ports_per_app: 0,
offline_minutes: 0,
};
app_node.register(&self.db).await?;
info!("Sending existing contracts to {}", req.node_pubkey);
let contracts = db::ActiveAppWithNode::list_by_node(&self.db, &req.node_pubkey).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let resp_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(resp_stream)))
}
async fn brain_messages(
&self,
req: tonic::Request<DaemonAuth>,
) -> Result<tonic::Response<<Self as BrainAppDaemon>::BrainMessagesStream>, Status> {
let auth = req.into_inner();
let pubkey = auth.pubkey.clone();
check_sig_from_parts(
&pubkey,
&auth.timestamp,
&format!("{:?}", auth.contracts),
&auth.signature,
)?;
info!("App Daemon {} connected to receive brain messages", pubkey);
let (tx, rx) = mpsc::channel(6);
{
let db = self.db.clone();
let pubkey = pubkey.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _ = db::live_appnode_msgs::<db::NewAppReq>(&db, &pubkey, tx).await;
});
}
{
let db = self.db.clone();
let pubkey = pubkey.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _ = db::live_appnode_msgs::<db::DeletedApp>(&db, &pubkey, tx).await;
});
}
let resp_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into()));
Ok(Response::new(Box::pin(resp_stream)))
}
async fn daemon_messages(
&self,
req: tonic::Request<Streaming<DaemonMessageApp>>,
) -> Result<tonic::Response<Empty>, Status> {
let mut req_stream = req.into_inner();
let pubkey: String;
if let Some(Ok(msg)) = req_stream.next().await {
log::debug!("App daemon_messages received auth message: {msg:?}");
if let Some(daemon_message_app::Msg::Auth(auth)) = msg.msg {
pubkey = auth.pubkey.clone();
check_sig_from_parts(
&pubkey,
&auth.timestamp,
&format!("{:?}", &auth.contracts),
&auth.signature,
)?;
} else {
return Err(Status::unauthenticated(
"Could not authenticate the app daemon: could not extract auth signature",
));
}
} else {
return Err(Status::unauthenticated("Could not authenticate the app daemon"));
}
while let Some(daemon_message) = req_stream.next().await {
match daemon_message {
Ok(msg) => match msg.msg {
Some(daemon_message_app::Msg::NewAppRes(new_app_resp)) => {
if !new_app_resp.error.is_empty() {
db::NewAppReq::submit_error(
&self.db,
&new_app_resp.uuid,
new_app_resp.error,
)
.await?;
} else {
db::ActiveApp::activate(&self.db, &new_app_resp.uuid).await?;
}
}
Some(daemon_message_app::Msg::AppNodeResources(app_node_resources)) => {
let node_resource: db::AppNodeResources = app_node_resources.into();
node_resource.merge(&self.db, &pubkey).await?;
}
_ => {}
},
Err(e) => {
log::warn!("App Daemon Disconnected: {e:?}")
}
}
}
Ok(Response::new(Empty {}))
}
}
pub struct AppCliServer {
pub db: Arc<Surreal<Client>>,
}
impl AppCliServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db }
}
}
#[tonic::async_trait]
impl BrainAppCli for AppCliServer {
type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>;
async fn new_app(
&self,
req: tonic::Request<detee_shared::app_proto::NewAppReq>,
) -> Result<tonic::Response<detee_shared::app_proto::NewAppRes>, Status> {
let req = check_sig_from_req(req)?;
info!("deploy_app process starting for {:?}", req);
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
return Err(Status::permission_denied("This operator banned you. What did you do?"));
}
let new_app_req: db::NewAppReq = req.into();
let id = new_app_req.id.to_string();
let (tx, rx) = tokio::sync::oneshot::channel();
let db = self.db.clone();
tokio::spawn(async move {
let _ = tx.send(db::ActiveApp::listen(&db, &id).await);
});
new_app_req.submit(&self.db).await?;
match rx.await {
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
"Network timeout. Please try again later or contact the DeTEE devs team.",
)),
Ok(Err(db::Error::NewAppDaemonResp(err))) => Err(Status::internal(err)),
Ok(new_app_resp) => Ok(Response::new(new_app_resp?.into())),
Err(e) => {
log::error!("Something weird happened during CLI NewAppReq. Reached error {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn delete_app(
&self,
req: tonic::Request<DelAppReq>,
) -> Result<tonic::Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
info!("delete_app process starting for {:?}", req);
match ActiveApp::delete(&self.db, &req.uuid).await? {
true => Ok(Response::new(Empty {})),
false => Err(Status::not_found(format!("Could not find App contract {}", &req.uuid))),
}
}
async fn list_app_contracts(
&self,
req: tonic::Request<ListAppContractsReq>,
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppContractsStream>, Status> {
let req = check_sig_from_req(req)?;
info!("list_app_contracts process starting for {:?}", req);
let mut app_contracts = Vec::new();
if !req.uuid.is_empty() {
if let Some(app_contract) =
db::ActiveAppWithNode::get_by_uuid(&self.db, &req.uuid).await?
{
if app_contract.admin.key().to_string() == req.admin_pubkey
|| app_contract.app_node.operator.key().to_string() == req.admin_pubkey
{
app_contracts.push(app_contract);
}
}
} else if req.as_operator {
app_contracts.append(
&mut db::ActiveAppWithNode::list_by_operator(&self.db, &req.admin_pubkey).await?,
);
} else {
app_contracts.append(
&mut db::ActiveAppWithNode::list_by_admin(&self.db, &req.admin_pubkey).await?,
);
}
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for app_contract in app_contracts {
let _ = tx.send(Ok(app_contract.into())).await;
}
});
let resp_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(resp_stream)))
}
async fn list_app_nodes(
&self,
req: tonic::Request<AppNodeFilters>,
) -> Result<tonic::Response<<Self as BrainAppCli>::ListAppNodesStream>, Status> {
let req = check_sig_from_req(req)?;
info!("list_app_nodes process starting for {:?}", req);
let app_nodes = db::AppNodeWithReports::find_by_filters(&self.db, req, false).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for app_node in app_nodes {
let _ = tx.send(Ok(app_node.into())).await;
}
});
let resp_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(resp_stream)))
}
async fn get_one_app_node(
&self,
req: tonic::Request<AppNodeFilters>,
) -> Result<tonic::Response<AppNodeListResp>, Status> {
let req = check_sig_from_req(req)?;
info!("get_one_app_node process starting for {:?}", req);
let app_node = db::AppNodeWithReports::find_by_filters(&self.db, req, true)
.await?
.first()
.ok_or(Status::not_found("No app node found"))?
.clone();
Ok(Response::new(app_node.into()))
}
}