brain/src/grpc/general.rs
Noor b6f7f641f6
kick contract implemented
app pricing calculation
add node in kick schema and type
improved handling
Clone on all app types
handle expected error on kick contract
validate both app and vm contracts
2025-05-15 19:12:18 +05:30

241 lines
9.2 KiB
Rust

use crate::db::prelude as db;
use crate::grpc::{check_admin_key, check_sig_from_req};
use detee_shared::app_proto::AppContract;
use detee_shared::common_proto::{Empty, Pubkey};
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCli;
use detee_shared::general_proto::{
Account, AccountBalance, AirdropReq, BanUserReq, InspectOperatorResp, KickReq, KickResp,
ListOperatorsResp, RegOperatorReq, ReportNodeReq, SlashReq,
};
use detee_shared::vm_proto::VmContract;
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
use tonic::{Request, Response, Status};
pub struct GeneralCliServer {
pub db: Arc<Surreal<Client>>,
}
impl GeneralCliServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db }
}
}
#[tonic::async_trait]
impl BrainGeneralCli for GeneralCliServer {
type ListAccountsStream = Pin<Box<dyn Stream<Item = Result<Account, Status>> + Send>>;
type ListAllAppContractsStream =
Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
type ListAllVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
type ListOperatorsStream =
Pin<Box<dyn Stream<Item = Result<ListOperatorsResp, Status>> + Send>>;
async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> {
let req = check_sig_from_req(req)?;
Ok(Response::new(db::Account::get(&self.db, &req.pubkey).await?.into()))
}
async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
let (account, node, contract_id) =
match db::ActiveVmWithNode::get_by_uuid(&self.db, &req.contract).await? {
Some(vm_contract)
if vm_contract.admin.key().to_string() == req.admin_pubkey
&& vm_contract.vm_node.id.key().to_string() == req.node_pubkey =>
{
(vm_contract.admin, vm_contract.vm_node.id, vm_contract.id.to_string())
}
_ => {
match db::app::ActiveAppWithNode::get_by_uuid(&self.db, &req.contract).await? {
Some(app_contract)
if app_contract.admin.key().to_string() == req.admin_pubkey
&& app_contract.app_node.id.key().to_string()
== req.node_pubkey =>
{
(
app_contract.admin,
app_contract.app_node.id,
app_contract.id.to_string(),
)
}
_ => {
return Err(Status::unauthenticated("No contract found by this ID."));
}
}
}
};
db::Report::create(&self.db, account, node, req.reason, contract_id).await?;
Ok(Response::new(Empty {}))
}
async fn list_operators(
&self,
req: Request<Empty>,
) -> Result<Response<Self::ListOperatorsStream>, Status> {
let _ = check_sig_from_req(req)?;
let operators = db::Operator::list(&self.db).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for op in operators {
let _ = tx.send(Ok(op.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream) as Self::ListOperatorsStream))
}
async fn inspect_operator(
&self,
req: Request<Pubkey>,
) -> Result<Response<InspectOperatorResp>, Status> {
match db::Operator::inspect_nodes(&self.db, &req.into_inner().pubkey).await? {
(Some(op), vm_nodes, app_nodes) => Ok(Response::new(InspectOperatorResp {
operator: Some(op.into()),
vm_nodes: vm_nodes.into_iter().map(|n| n.into()).collect(),
app_nodes: app_nodes.into_iter().map(|n| n.into()).collect(),
})),
(None, _, _) => Err(Status::not_found("The wallet you specified is not an operator")),
}
}
async fn register_operator(
&self,
req: Request<RegOperatorReq>,
) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
log::info!("Regitering new operator: {req:?}");
match db::Account::operator_reg(&self.db, &req.pubkey, &req.email, req.escrow).await {
Ok(()) => Ok(Response::new(Empty {})),
Err(e) if matches!(e, db::Error::InsufficientFunds | db::Error::MinimalEscrow) => {
Err(Status::failed_precondition(e.to_string()))
}
Err(e) => {
log::info!("Failed to register operator: {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn kick_contract(&self, req: Request<KickReq>) -> Result<Response<KickResp>, Status> {
let req = check_sig_from_req(req)?;
match db::WrapperContract::kick_contract(
&self.db,
&req.operator_wallet,
&req.contract_uuid,
&req.reason,
)
.await
{
Ok(nano_lp) => Ok(Response::new(KickResp { nano_lp })),
Err(e)
if matches!(
e,
db::Error::ContractNotFound
| db::Error::AccessDenied
| db::Error::FailedToDeleteContract(_)
) =>
{
Err(Status::failed_precondition(e.to_string()))
}
Err(e) => {
log::info!("Failed to kick contract: {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
// let req = check_sig_from_req(req)?;
// match self.data.kick_contract(&req.operator_wallet, &req.contract_uuid, &req.reason).await {
// Ok(nano_lp) => Ok(Response::new(KickResp { nano_lp })),
// Err(e) => Err(Status::permission_denied(e.to_string())),
// }
}
async fn ban_user(&self, _req: Request<BanUserReq>) -> Result<Response<Empty>, Status> {
todo!();
// let req = check_sig_from_req(req)?;
// self.data.ban_user(&req.operator_wallet, &req.user_wallet);
// Ok(Response::new(Empty {}))
}
// admin commands
async fn airdrop(&self, req: Request<AirdropReq>) -> Result<Response<Empty>, Status> {
check_admin_key(&req)?;
let req = check_sig_from_req(req)?;
db::Account::airdrop(&self.db, &req.pubkey, req.tokens).await?;
Ok(Response::new(Empty {}))
}
async fn slash(&self, _req: Request<SlashReq>) -> Result<Response<Empty>, Status> {
todo!();
// check_admin_key(&req)?;
// let req = check_sig_from_req(req)?;
// self.data.slash_account(&req.pubkey, req.tokens);
// Ok(Response::new(Empty {}))
}
async fn list_accounts(
&self,
_req: Request<Empty>,
) -> Result<Response<Self::ListAccountsStream>, Status> {
todo!();
// check_admin_key(&req)?;
// let _ = check_sig_from_req(req)?;
// let accounts = self.data.list_accounts();
// let (tx, rx) = mpsc::channel(6);
// tokio::spawn(async move {
// for account in accounts {
// let _ = tx.send(Ok(account.into())).await;
// }
// });
// let output_stream = ReceiverStream::new(rx);
// Ok(Response::new(Box::pin(output_stream) as Self::ListAccountsStream))
}
async fn list_all_vm_contracts(
&self,
_req: Request<Empty>,
) -> Result<Response<Self::ListAllVmContractsStream>, Status> {
todo!();
// check_admin_key(&req)?;
// let _ = check_sig_from_req(req)?;
// let contracts = self.data.list_all_contracts();
// let (tx, rx) = mpsc::channel(6);
// tokio::spawn(async move {
// for contract in contracts {
// let _ = tx.send(Ok(contract.into())).await;
// }
// });
// let output_stream = ReceiverStream::new(rx);
// Ok(Response::new(Box::pin(output_stream) as Self::ListAllVmContractsStream))
}
async fn list_all_app_contracts(
&self,
_req: tonic::Request<Empty>,
) -> Result<tonic::Response<Self::ListAllAppContractsStream>, Status> {
todo!();
// check_admin_key(&req)?;
// let _ = check_sig_from_req(req)?;
// let contracts = self.data.list_all_app_contracts();
// let (tx, rx) = mpsc::channel(6);
// tokio::spawn(async move {
// for contract in contracts {
// let _ = tx.send(Ok(contract.into())).await;
// }
// });
// let output_stream = ReceiverStream::new(rx);
// Ok(Response::new(Box::pin(output_stream)))
}
}