brain/src/grpc/general.rs

245 lines
9.4 KiB
Rust

// SPDX-License-Identifier: Apache-2.0
// SPDX-License-Identifier: Unlicense
use crate::db::prelude as db;
use crate::grpc::{check_admin_key, check_sig_from_req};
use detee_shared::app_proto::AppContract;
use detee_shared::common_proto::{Empty, Pubkey};
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCli;
use detee_shared::general_proto::{
Account, AccountBalance, AirdropReq, BanUserReq, InspectOperatorResp, KickReq, KickResp,
ListOperatorsResp, RecommendedVersions, RegOperatorReq, ReportNodeReq, SlashReq,
};
use detee_shared::vm_proto::VmContract;
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
use tonic::{Request, Response, Status};
pub struct GeneralCliServer {
pub db: Arc<Surreal<Client>>,
}
impl GeneralCliServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db }
}
}
#[tonic::async_trait]
impl BrainGeneralCli for GeneralCliServer {
type ListAccountsStream = Pin<Box<dyn Stream<Item = Result<Account, Status>> + Send>>;
type ListAllAppContractsStream =
Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
type ListAllVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
type ListOperatorsStream =
Pin<Box<dyn Stream<Item = Result<ListOperatorsResp, Status>> + Send>>;
async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> {
let req = check_sig_from_req(req)?;
Ok(Response::new(db::Account::get(&self.db, &req.pubkey).await?.into()))
}
async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
let (account, node, contract_id) =
match db::ActiveVmWithNode::get_by_id(&self.db, &req.contract).await? {
Some(vm_contract)
if vm_contract.admin.key().to_string() == req.admin_pubkey
&& vm_contract.vm_node.id.key().to_string() == req.node_pubkey =>
{
(vm_contract.admin, vm_contract.vm_node.id, vm_contract.id.to_string())
}
_ => {
match db::app::ActiveAppWithNode::get_by_app_id(&self.db, &req.contract).await?
{
Some(app_contract)
if app_contract.admin.key().to_string() == req.admin_pubkey
&& app_contract.app_node.id.key().to_string()
== req.node_pubkey =>
{
(
app_contract.admin,
app_contract.app_node.id,
app_contract.id.to_string(),
)
}
_ => {
return Err(Status::unauthenticated("No contract found by this ID."));
}
}
}
};
db::Report::create(&self.db, account, node, req.reason, contract_id).await?;
Ok(Response::new(Empty {}))
}
async fn list_operators(
&self,
req: Request<Empty>,
) -> Result<Response<Self::ListOperatorsStream>, Status> {
let _ = check_sig_from_req(req)?;
let operators = db::Operator::list(&self.db).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for op in operators {
let _ = tx.send(Ok(op.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream) as Self::ListOperatorsStream))
}
async fn inspect_operator(
&self,
req: Request<Pubkey>,
) -> Result<Response<InspectOperatorResp>, Status> {
match db::Operator::inspect_nodes(&self.db, &req.into_inner().pubkey).await? {
(Some(op), vm_nodes, app_nodes) => Ok(Response::new(InspectOperatorResp {
operator: Some(op.into()),
vm_nodes: vm_nodes.into_iter().map(|n| n.into()).collect(),
app_nodes: app_nodes.into_iter().map(|n| n.into()).collect(),
})),
(None, _, _) => Err(Status::not_found("The wallet you specified is not an operator")),
}
}
async fn register_operator(
&self,
req: Request<RegOperatorReq>,
) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
log::info!("Regitering new operator: {req:?}");
match db::Account::operator_reg(&self.db, &req.pubkey, &req.email, req.escrow).await {
Ok(()) => Ok(Response::new(Empty {})),
Err(e) if matches!(e, db::Error::InsufficientFunds) => {
Err(Status::failed_precondition(e.to_string()))
}
Err(e) => {
log::info!("Failed to register operator: {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn kick_contract(&self, req: Request<KickReq>) -> Result<Response<KickResp>, Status> {
let req = check_sig_from_req(req)?;
log::info!("Kicking contract: {}, by: {}", req.contract_id, req.operator_wallet);
match db::kick_contract(&self.db, &req.operator_wallet, &req.contract_id, &req.reason).await
{
Ok(nano_credits) => Ok(Response::new(KickResp { nano_credits })),
Err(e)
if matches!(
e,
db::Error::ContractNotFound
| db::Error::AccessDenied
| db::Error::FailedToDeleteContract(_)
) =>
{
log::warn!("Failed to kick contract: {e:?}");
Err(Status::failed_precondition(e.to_string()))
}
Err(e) => {
log::error!("Failed to kick contract: {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn ban_user(&self, req: Request<BanUserReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
log::info!("Banning user: {}, by: {}", req.user_wallet, req.operator_wallet);
db::Ban::create(&self.db, &req.operator_wallet, &req.user_wallet).await?;
Ok(Response::new(Empty {}))
}
// admin commands
async fn airdrop(&self, req: Request<AirdropReq>) -> Result<Response<Empty>, Status> {
check_admin_key(&req)?;
let req = check_sig_from_req(req)?;
log::info!("Airdropping {} tokens to {}", req.tokens, req.pubkey);
db::Account::airdrop(&self.db, &req.pubkey, req.tokens).await?;
Ok(Response::new(Empty {}))
}
async fn slash(&self, req: Request<SlashReq>) -> Result<Response<Empty>, Status> {
check_admin_key(&req)?;
let req = check_sig_from_req(req)?;
db::Account::slash_account(&self.db, &req.pubkey, req.tokens).await?;
Ok(Response::new(Empty {}))
}
async fn list_accounts(
&self,
req: Request<Empty>,
) -> Result<Response<Self::ListAccountsStream>, Status> {
check_admin_key(&req)?;
check_sig_from_req(req)?;
let accounts = db::Account::list_accounts(&self.db).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for account in accounts {
let _ = tx.send(Ok(account.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream) as Self::ListAccountsStream))
}
async fn list_all_vm_contracts(
&self,
req: Request<Empty>,
) -> Result<Response<Self::ListAllVmContractsStream>, Status> {
check_admin_key(&req)?;
check_sig_from_req(req)?;
let contracts = db::ActiveVmWithNode::list_all(&self.db).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream) as Self::ListAllVmContractsStream))
}
async fn list_all_app_contracts(
&self,
req: tonic::Request<Empty>,
) -> Result<tonic::Response<Self::ListAllAppContractsStream>, Status> {
check_admin_key(&req)?;
check_sig_from_req(req)?;
let contracts = db::ActiveAppWithNode::list_all(&self.db).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream)))
}
async fn get_recommended_versions(
&self,
_: Request<Empty>,
) -> Result<Response<RecommendedVersions>, Status> {
Ok(Response::new(RecommendedVersions {
cli: "0.1.1".to_string(),
brain: env!("CARGO_PKG_VERSION").to_string(),
snp_daemon: "0.1.0".to_string(),
sgx_daemon: "0.1.0".to_string(),
}))
}
}