refactor: General grpc services
reorganize proto imports for clarity proper naming for services organized all rpc methos with minimal changes
This commit is contained in:
parent
b063ff4b34
commit
e3cb722d97
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -420,7 +420,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "detee-shared"
|
||||
version = "0.1.0"
|
||||
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=main#be4e41db050c6d59e9fb5abf47e647f5bbdc24b2"
|
||||
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=main#e434b70a5794bef79681ce520873662c78edca76"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"prost",
|
||||
|
51
src/data.rs
51
src/data.rs
@ -1,6 +1,7 @@
|
||||
use crate::grpc::snp_proto::{self as grpc};
|
||||
use chrono::Utc;
|
||||
use dashmap::DashMap;
|
||||
use detee_shared::general_proto::{self, InspectOperatorResp, ListOperatorsResp, RegOperatorReq};
|
||||
use log::{debug, info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::str::FromStr;
|
||||
@ -13,17 +14,13 @@ use std::{
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::oneshot::Sender as OneshotSender;
|
||||
|
||||
use detee_shared::sgx::pb::brain::brain_message_app;
|
||||
use detee_shared::sgx::pb::brain::AppContract as AppContractPB;
|
||||
use detee_shared::sgx::pb::brain::AppNodeFilters;
|
||||
use detee_shared::sgx::pb::brain::AppNodeListResp;
|
||||
use detee_shared::sgx::pb::brain::AppNodeResources;
|
||||
use detee_shared::sgx::pb::brain::AppResource as AppResourcePB;
|
||||
use detee_shared::sgx::pb::brain::BrainMessageApp;
|
||||
use detee_shared::sgx::pb::brain::DelAppReq;
|
||||
use detee_shared::sgx::pb::brain::MappedPort;
|
||||
use detee_shared::sgx::pb::brain::NewAppReq;
|
||||
use detee_shared::sgx::pb::brain::NewAppRes;
|
||||
use detee_shared::app_proto::{
|
||||
brain_message_app, AppContract as AppContractPB, AppNodeFilters, AppNodeListResp,
|
||||
AppNodeResources, AppResource as AppResourcePB, BrainMessageApp, DelAppReq, MappedPort,
|
||||
NewAppReq, NewAppRes,
|
||||
};
|
||||
use detee_shared::general_proto::Account;
|
||||
|
||||
const DATA_PATH: &str = "/etc/detee/brain-mock/saved_data.yaml";
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
@ -65,9 +62,9 @@ pub struct OperatorData {
|
||||
pub app_nodes: HashSet<String>,
|
||||
}
|
||||
|
||||
impl From<AccountData> for grpc::AccountBalance {
|
||||
impl From<AccountData> for general_proto::AccountBalance {
|
||||
fn from(value: AccountData) -> Self {
|
||||
grpc::AccountBalance {
|
||||
general_proto::AccountBalance {
|
||||
balance: value.balance,
|
||||
tmp_locked: value.tmp_locked,
|
||||
}
|
||||
@ -987,7 +984,7 @@ impl BrainData {
|
||||
});
|
||||
}
|
||||
|
||||
pub fn register_operator(&self, req: grpc::RegOperatorReq) -> Result<(), Error> {
|
||||
pub fn register_operator(&self, req: RegOperatorReq) -> Result<(), Error> {
|
||||
let mut operator = match self.operators.get(&req.pubkey) {
|
||||
Some(o) => (*(o.value())).clone(),
|
||||
None => OperatorData {
|
||||
@ -1092,10 +1089,10 @@ impl BrainData {
|
||||
contracts.iter().cloned().collect()
|
||||
}
|
||||
|
||||
pub fn list_accounts(&self) -> Vec<grpc::Account> {
|
||||
pub fn list_accounts(&self) -> Vec<Account> {
|
||||
self.accounts
|
||||
.iter()
|
||||
.map(|a| grpc::Account {
|
||||
.map(|a| Account {
|
||||
pubkey: a.key().to_string(),
|
||||
balance: a.balance,
|
||||
tmp_locked: a.tmp_locked,
|
||||
@ -1103,10 +1100,10 @@ impl BrainData {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn list_operators(&self) -> Vec<grpc::ListOperatorsResp> {
|
||||
pub fn list_operators(&self) -> Vec<ListOperatorsResp> {
|
||||
self.operators
|
||||
.iter()
|
||||
.map(|op| grpc::ListOperatorsResp {
|
||||
.map(|op| ListOperatorsResp {
|
||||
pubkey: op.key().to_string(),
|
||||
escrow: op.escrow / 1_000_000_000,
|
||||
email: op.email.clone(),
|
||||
@ -1117,15 +1114,15 @@ impl BrainData {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn inspect_operator(&self, wallet: &str) -> Option<grpc::InspectOperatorResp> {
|
||||
pub fn inspect_operator(&self, wallet: &str) -> Option<InspectOperatorResp> {
|
||||
self.operators.get(wallet).map(|op| {
|
||||
let nodes = self
|
||||
let vm_nodes = self
|
||||
.find_vm_nodes_by_operator(wallet)
|
||||
.into_iter()
|
||||
.map(|n| n.into())
|
||||
.collect();
|
||||
grpc::InspectOperatorResp {
|
||||
operator: Some(grpc::ListOperatorsResp {
|
||||
InspectOperatorResp {
|
||||
operator: Some(ListOperatorsResp {
|
||||
pubkey: op.key().to_string(),
|
||||
escrow: op.escrow,
|
||||
email: op.email.clone(),
|
||||
@ -1133,7 +1130,9 @@ impl BrainData {
|
||||
vm_nodes: op.vm_nodes.len() as u64,
|
||||
reports: self.total_operator_reports(op.key()) as u64,
|
||||
}),
|
||||
nodes,
|
||||
vm_nodes,
|
||||
// TODO: fix app node list
|
||||
app_nodes: Vec::new(),
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -1395,9 +1394,9 @@ impl BrainData {
|
||||
req.node_pubkey, req.uuid
|
||||
);
|
||||
let msg = BrainMessageApp {
|
||||
msg: Some(
|
||||
detee_shared::sgx::pb::brain::brain_message_app::Msg::NewAppReq(req.clone()),
|
||||
),
|
||||
msg: Some(detee_shared::app_proto::brain_message_app::Msg::NewAppReq(
|
||||
req.clone(),
|
||||
)),
|
||||
};
|
||||
if let Err(e) = app_daemon_tx.send(msg).await {
|
||||
warn!(
|
||||
|
387
src/grpc.rs
387
src/grpc.rs
@ -1,12 +1,17 @@
|
||||
pub mod snp_proto {
|
||||
// tonic::include_proto!("vm_proto");
|
||||
pub use detee_shared::snp::pb::vm::*;
|
||||
pub use detee_shared::vm_proto::*;
|
||||
}
|
||||
|
||||
use crate::data::BrainData;
|
||||
use crate::grpc::vm_daemon_message;
|
||||
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCli;
|
||||
use detee_shared::general_proto::{
|
||||
Account, AccountBalance, AirdropReq, BanUserReq, InspectOperatorResp, KickReq, KickResp,
|
||||
ListOperatorsResp, RegOperatorReq, ReportNodeReq, SlashReq,
|
||||
};
|
||||
use log::info;
|
||||
use snp_proto::brain_cli_server::BrainCli;
|
||||
use snp_proto::brain_vm_cli_server::BrainVmCli;
|
||||
use snp_proto::brain_vm_daemon_server::BrainVmDaemon;
|
||||
use snp_proto::*;
|
||||
use std::pin::Pin;
|
||||
@ -15,33 +20,44 @@ use tokio::sync::mpsc;
|
||||
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
|
||||
use detee_shared::sgx::pb::brain::brain_app_cli_server::BrainAppCli;
|
||||
use detee_shared::sgx::pb::brain::brain_app_daemon_server::BrainAppDaemon;
|
||||
use detee_shared::sgx::pb::brain::{
|
||||
AppContract, AppNodeFilters, AppNodeListResp, BrainMessageApp, DaemonMessageApp, DelAppReq,
|
||||
use detee_shared::app_proto::{
|
||||
brain_app_cli_server::BrainAppCli, brain_app_daemon_server::BrainAppDaemon, AppContract,
|
||||
AppNodeFilters, AppNodeListResp, BrainMessageApp, DaemonMessageApp, DelAppReq,
|
||||
ListAppContractsReq, NewAppReq, NewAppRes, RegisterAppNodeReq,
|
||||
};
|
||||
|
||||
use detee_shared::common_proto::{Empty, Pubkey};
|
||||
const ADMIN_ACCOUNTS: &[&str] = &[
|
||||
"x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK",
|
||||
"FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL",
|
||||
"H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc",
|
||||
];
|
||||
|
||||
pub struct BrainDaemonMock {
|
||||
pub struct BrainGeneraClilMock {
|
||||
data: Arc<BrainData>,
|
||||
}
|
||||
|
||||
impl BrainDaemonMock {
|
||||
impl BrainGeneraClilMock {
|
||||
pub fn new(data: Arc<BrainData>) -> Self {
|
||||
Self { data }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BrainCliMock {
|
||||
pub struct BrainVmDaemonMock {
|
||||
data: Arc<BrainData>,
|
||||
}
|
||||
|
||||
impl BrainCliMock {
|
||||
impl BrainVmDaemonMock {
|
||||
pub fn new(data: Arc<BrainData>) -> Self {
|
||||
Self { data }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BrainVmCliMock {
|
||||
data: Arc<BrainData>,
|
||||
}
|
||||
|
||||
impl BrainVmCliMock {
|
||||
pub fn new(data: Arc<BrainData>) -> Self {
|
||||
Self { data }
|
||||
}
|
||||
@ -68,7 +84,169 @@ impl BrainAppDaemonMock {
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl BrainVmDaemon for BrainDaemonMock {
|
||||
impl BrainGeneralCli for BrainGeneraClilMock {
|
||||
type ListOperatorsStream =
|
||||
Pin<Box<dyn Stream<Item = Result<ListOperatorsResp, Status>> + Send>>;
|
||||
type ListAccountsStream = Pin<Box<dyn Stream<Item = Result<Account, Status>> + Send>>;
|
||||
type ListAllVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
|
||||
type ListAllAppContractsStream =
|
||||
Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
|
||||
|
||||
async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
Ok(Response::new(self.data.get_balance(&req.pubkey).into()))
|
||||
}
|
||||
|
||||
async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
match self.data.find_contract_by_uuid(&req.contract) {
|
||||
Ok(contract)
|
||||
if contract.admin_pubkey == req.admin_pubkey
|
||||
&& contract.node_pubkey == req.node_pubkey =>
|
||||
{
|
||||
()
|
||||
}
|
||||
_ => return Err(Status::unauthenticated("No contract found by this ID.")),
|
||||
};
|
||||
self.data
|
||||
.report_node(req.admin_pubkey, &req.node_pubkey, req.reason);
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
|
||||
async fn list_operators(
|
||||
&self,
|
||||
req: Request<Empty>,
|
||||
) -> Result<Response<Self::ListOperatorsStream>, Status> {
|
||||
let _ = check_sig_from_req(req)?;
|
||||
let operators = self.data.list_operators();
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for op in operators {
|
||||
let _ = tx.send(Ok(op.into())).await;
|
||||
}
|
||||
});
|
||||
let output_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(
|
||||
Box::pin(output_stream) as Self::ListOperatorsStream
|
||||
))
|
||||
}
|
||||
|
||||
async fn inspect_operator(
|
||||
&self,
|
||||
req: Request<Pubkey>,
|
||||
) -> Result<Response<InspectOperatorResp>, Status> {
|
||||
match self.data.inspect_operator(&req.into_inner().pubkey) {
|
||||
Some(op) => Ok(Response::new(op.into())),
|
||||
None => Err(Status::not_found(
|
||||
"The wallet you specified is not an operator",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn register_operator(
|
||||
&self,
|
||||
req: Request<RegOperatorReq>,
|
||||
) -> Result<Response<Empty>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("Regitering new operator: {req:?}");
|
||||
match self.data.register_operator(req) {
|
||||
Ok(()) => Ok(Response::new(Empty {})),
|
||||
Err(e) => Err(Status::failed_precondition(e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn kick_contract(&self, req: Request<KickReq>) -> Result<Response<KickResp>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
match self
|
||||
.data
|
||||
.kick_contract(&req.operator_wallet, &req.contract_uuid, &req.reason)
|
||||
.await
|
||||
{
|
||||
Ok(nano_lp) => Ok(Response::new(KickResp { nano_lp })),
|
||||
Err(e) => Err(Status::permission_denied(e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn ban_user(&self, req: Request<BanUserReq>) -> Result<Response<Empty>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
self.data.ban_user(&req.operator_wallet, &req.user_wallet);
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
|
||||
// admin commands
|
||||
|
||||
async fn airdrop(&self, req: Request<AirdropReq>) -> Result<Response<Empty>, Status> {
|
||||
check_admin_key(&req)?;
|
||||
let req = check_sig_from_req(req)?;
|
||||
self.data.give_airdrop(&req.pubkey, req.tokens);
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
|
||||
async fn slash(&self, req: Request<SlashReq>) -> Result<Response<Empty>, Status> {
|
||||
check_admin_key(&req)?;
|
||||
let req = check_sig_from_req(req)?;
|
||||
self.data.slash_account(&req.pubkey, req.tokens);
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
|
||||
async fn list_accounts(
|
||||
&self,
|
||||
req: Request<Empty>,
|
||||
) -> Result<Response<Self::ListAccountsStream>, Status> {
|
||||
check_admin_key(&req)?;
|
||||
let _ = check_sig_from_req(req)?;
|
||||
let accounts = self.data.list_accounts();
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for account in accounts {
|
||||
let _ = tx.send(Ok(account.into())).await;
|
||||
}
|
||||
});
|
||||
let output_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(
|
||||
Box::pin(output_stream) as Self::ListAccountsStream
|
||||
))
|
||||
}
|
||||
|
||||
async fn list_all_vm_contracts(
|
||||
&self,
|
||||
req: Request<Empty>,
|
||||
) -> Result<Response<Self::ListAllVmContractsStream>, Status> {
|
||||
check_admin_key(&req)?;
|
||||
let _ = check_sig_from_req(req)?;
|
||||
let contracts = self.data.list_all_contracts();
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for contract in contracts {
|
||||
let _ = tx.send(Ok(contract.into())).await;
|
||||
}
|
||||
});
|
||||
let output_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(
|
||||
Box::pin(output_stream) as Self::ListAllVmContractsStream
|
||||
))
|
||||
}
|
||||
|
||||
async fn list_all_app_contracts(
|
||||
&self,
|
||||
req: tonic::Request<Empty>,
|
||||
) -> Result<tonic::Response<Self::ListAllAppContractsStream>, Status> {
|
||||
check_admin_key(&req)?;
|
||||
let _ = check_sig_from_req(req)?;
|
||||
let contracts = self.data.list_all_app_contracts();
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for contract in contracts {
|
||||
let _ = tx.send(Ok(contract.into())).await;
|
||||
}
|
||||
});
|
||||
let output_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(Box::pin(output_stream)))
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl BrainVmDaemon for BrainVmDaemonMock {
|
||||
type RegisterVmNodeStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
|
||||
async fn register_vm_node(
|
||||
&self,
|
||||
@ -178,12 +356,7 @@ impl BrainVmDaemon for BrainDaemonMock {
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl BrainCli for BrainCliMock {
|
||||
async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
Ok(Response::new(self.data.get_balance(&req.pubkey).into()))
|
||||
}
|
||||
|
||||
impl BrainVmCli for BrainVmCliMock {
|
||||
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("New VM requested via CLI: {req:?}");
|
||||
@ -247,22 +420,6 @@ impl BrainCli for BrainCliMock {
|
||||
}
|
||||
}
|
||||
|
||||
async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
match self.data.find_contract_by_uuid(&req.contract) {
|
||||
Ok(contract)
|
||||
if contract.admin_pubkey == req.admin_pubkey
|
||||
&& contract.node_pubkey == req.node_pubkey =>
|
||||
{
|
||||
()
|
||||
}
|
||||
_ => return Err(Status::unauthenticated("No contract found by this ID.")),
|
||||
};
|
||||
self.data
|
||||
.report_node(req.admin_pubkey, &req.node_pubkey, req.reason);
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
|
||||
type ListVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
|
||||
async fn list_vm_contracts(
|
||||
&self,
|
||||
@ -333,122 +490,6 @@ impl BrainCli for BrainCliMock {
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn register_operator(
|
||||
&self,
|
||||
req: Request<RegOperatorReq>,
|
||||
) -> Result<Response<Empty>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("Regitering new operator: {req:?}");
|
||||
match self.data.register_operator(req) {
|
||||
Ok(()) => Ok(Response::new(Empty {})),
|
||||
Err(e) => Err(Status::failed_precondition(e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn kick_contract(&self, req: Request<KickReq>) -> Result<Response<KickResp>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
match self
|
||||
.data
|
||||
.kick_contract(&req.operator_wallet, &req.contract_uuid, &req.reason)
|
||||
.await
|
||||
{
|
||||
Ok(nano_lp) => Ok(Response::new(KickResp { nano_lp })),
|
||||
Err(e) => Err(Status::permission_denied(e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn ban_user(&self, req: Request<BanUserReq>) -> Result<Response<Empty>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
self.data.ban_user(&req.operator_wallet, &req.user_wallet);
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
|
||||
type ListOperatorsStream =
|
||||
Pin<Box<dyn Stream<Item = Result<ListOperatorsResp, Status>> + Send>>;
|
||||
async fn list_operators(
|
||||
&self,
|
||||
req: Request<Empty>,
|
||||
) -> Result<Response<Self::ListOperatorsStream>, Status> {
|
||||
let _ = check_sig_from_req(req)?;
|
||||
let operators = self.data.list_operators();
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for op in operators {
|
||||
let _ = tx.send(Ok(op.into())).await;
|
||||
}
|
||||
});
|
||||
let output_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(
|
||||
Box::pin(output_stream) as Self::ListOperatorsStream
|
||||
))
|
||||
}
|
||||
|
||||
async fn inspect_operator(
|
||||
&self,
|
||||
req: Request<Pubkey>,
|
||||
) -> Result<Response<InspectOperatorResp>, Status> {
|
||||
match self.data.inspect_operator(&req.into_inner().pubkey) {
|
||||
Some(op) => Ok(Response::new(op.into())),
|
||||
None => Err(Status::not_found(
|
||||
"The wallet you specified is not an operator",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn airdrop(&self, req: Request<AirdropReq>) -> Result<Response<Empty>, Status> {
|
||||
check_admin_key(&req)?;
|
||||
let req = check_sig_from_req(req)?;
|
||||
self.data.give_airdrop(&req.pubkey, req.tokens);
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
|
||||
async fn slash(&self, req: Request<SlashReq>) -> Result<Response<Empty>, Status> {
|
||||
check_admin_key(&req)?;
|
||||
let req = check_sig_from_req(req)?;
|
||||
self.data.slash_account(&req.pubkey, req.tokens);
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
|
||||
type ListAllVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
|
||||
async fn list_all_vm_contracts(
|
||||
&self,
|
||||
req: Request<Empty>,
|
||||
) -> Result<Response<Self::ListVmContractsStream>, Status> {
|
||||
check_admin_key(&req)?;
|
||||
let _ = check_sig_from_req(req)?;
|
||||
let contracts = self.data.list_all_contracts();
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for contract in contracts {
|
||||
let _ = tx.send(Ok(contract.into())).await;
|
||||
}
|
||||
});
|
||||
let output_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(
|
||||
Box::pin(output_stream) as Self::ListVmContractsStream
|
||||
))
|
||||
}
|
||||
|
||||
type ListAccountsStream = Pin<Box<dyn Stream<Item = Result<Account, Status>> + Send>>;
|
||||
async fn list_accounts(
|
||||
&self,
|
||||
req: Request<Empty>,
|
||||
) -> Result<Response<Self::ListAccountsStream>, Status> {
|
||||
check_admin_key(&req)?;
|
||||
let _ = check_sig_from_req(req)?;
|
||||
let accounts = self.data.list_accounts();
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for account in accounts {
|
||||
let _ = tx.send(Ok(account.into())).await;
|
||||
}
|
||||
});
|
||||
let output_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(
|
||||
Box::pin(output_stream) as Self::ListAccountsStream
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
trait PubkeyGetter {
|
||||
@ -459,8 +500,6 @@ trait PubkeyGetter {
|
||||
impl BrainAppCli for BrainAppCliMock {
|
||||
type ListAppContractsStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
|
||||
type ListAppNodesStream = Pin<Box<dyn Stream<Item = Result<AppNodeListResp, Status>> + Send>>;
|
||||
type ListAllAppContractsStream =
|
||||
Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
|
||||
|
||||
async fn deploy_app(
|
||||
&self,
|
||||
@ -489,7 +528,7 @@ impl BrainAppCli for BrainAppCliMock {
|
||||
async fn delete_app(
|
||||
&self,
|
||||
req: tonic::Request<DelAppReq>,
|
||||
) -> Result<tonic::Response<detee_shared::sgx::pb::brain::Empty>, Status> {
|
||||
) -> Result<tonic::Response<Empty>, Status> {
|
||||
let req_data = check_sig_from_req(req)?;
|
||||
log::info!("deleting container: {}", req_data.uuid.clone());
|
||||
if let Err(er) = self.data.send_del_container_req(req_data).await {
|
||||
@ -497,7 +536,7 @@ impl BrainAppCli for BrainAppCliMock {
|
||||
return Err(Status::not_found("Could not find container"));
|
||||
};
|
||||
|
||||
Ok(Response::new(detee_shared::sgx::pb::brain::Empty {}))
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
|
||||
async fn list_app_contracts(
|
||||
@ -551,23 +590,6 @@ impl BrainAppCli for BrainAppCliMock {
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_all_app_contracts(
|
||||
&self,
|
||||
req: tonic::Request<detee_shared::sgx::pb::brain::Empty>,
|
||||
) -> Result<tonic::Response<Self::ListAllAppContractsStream>, Status> {
|
||||
check_admin_key(&req)?;
|
||||
let _ = check_sig_from_req(req)?;
|
||||
let contracts = self.data.list_all_app_contracts();
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for contract in contracts {
|
||||
let _ = tx.send(Ok(contract.into())).await;
|
||||
}
|
||||
});
|
||||
let output_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(Box::pin(output_stream)))
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
@ -611,7 +633,7 @@ impl BrainAppDaemon for BrainAppDaemonMock {
|
||||
|
||||
async fn brain_messages(
|
||||
&self,
|
||||
req: tonic::Request<detee_shared::sgx::pb::brain::DaemonAuth>,
|
||||
req: tonic::Request<detee_shared::app_proto::DaemonAuth>,
|
||||
) -> Result<tonic::Response<Self::BrainMessagesStream>, Status> {
|
||||
let req_data = req.into_inner();
|
||||
let pubkey = req_data.pubkey.clone();
|
||||
@ -637,7 +659,7 @@ impl BrainAppDaemon for BrainAppDaemonMock {
|
||||
async fn daemon_messages(
|
||||
&self,
|
||||
req: tonic::Request<Streaming<DaemonMessageApp>>,
|
||||
) -> Result<tonic::Response<detee_shared::sgx::pb::brain::Empty>, Status> {
|
||||
) -> Result<tonic::Response<Empty>, Status> {
|
||||
let mut req_stream = req.into_inner();
|
||||
let mut pubkey;
|
||||
|
||||
@ -646,8 +668,7 @@ impl BrainAppDaemon for BrainAppDaemonMock {
|
||||
"demon_messages received the following auth message: {:?}",
|
||||
msg.msg
|
||||
);
|
||||
if let Some(detee_shared::sgx::pb::brain::daemon_message_app::Msg::Auth(auth)) = msg.msg
|
||||
{
|
||||
if let Some(detee_shared::app_proto::daemon_message_app::Msg::Auth(auth)) = msg.msg {
|
||||
pubkey = auth.pubkey.clone();
|
||||
check_sig_from_parts(
|
||||
&pubkey,
|
||||
@ -667,17 +688,15 @@ impl BrainAppDaemon for BrainAppDaemonMock {
|
||||
while let Some(daemon_message) = req_stream.next().await {
|
||||
match daemon_message {
|
||||
Ok(msg) => match msg.msg {
|
||||
Some(detee_shared::sgx::pb::brain::daemon_message_app::Msg::Auth(
|
||||
daemon_auth,
|
||||
)) => pubkey = daemon_auth.pubkey,
|
||||
Some(detee_shared::sgx::pb::brain::daemon_message_app::Msg::NewAppRes(
|
||||
Some(detee_shared::app_proto::daemon_message_app::Msg::Auth(daemon_auth)) => {
|
||||
pubkey = daemon_auth.pubkey
|
||||
}
|
||||
Some(detee_shared::app_proto::daemon_message_app::Msg::NewAppRes(
|
||||
new_app_res,
|
||||
)) => self.data.send_new_container_resp(new_app_res).await,
|
||||
Some(
|
||||
detee_shared::sgx::pb::brain::daemon_message_app::Msg::AppNodeResources(
|
||||
Some(detee_shared::app_proto::daemon_message_app::Msg::AppNodeResources(
|
||||
node_resource,
|
||||
),
|
||||
) => self.data.submit_app_node_resources(node_resource),
|
||||
)) => self.data.submit_app_node_resources(node_resource),
|
||||
_ => {
|
||||
dbg!("None");
|
||||
}
|
||||
@ -687,10 +706,9 @@ impl BrainAppDaemon for BrainAppDaemonMock {
|
||||
self.data.del_app_daemon_tx(&pubkey);
|
||||
}
|
||||
}
|
||||
//
|
||||
}
|
||||
|
||||
Ok(Response::new(detee_shared::sgx::pb::brain::Empty {}))
|
||||
Ok(Response::new(Empty {}))
|
||||
}
|
||||
}
|
||||
|
||||
@ -733,7 +751,6 @@ impl_pubkey_getter!(DelAppReq, admin_pubkey);
|
||||
impl_pubkey_getter!(ListAppContractsReq, admin_pubkey);
|
||||
|
||||
impl_pubkey_getter!(RegisterAppNodeReq);
|
||||
impl_pubkey_getter!(detee_shared::sgx::pb::brain::Empty);
|
||||
impl_pubkey_getter!(AppNodeFilters);
|
||||
|
||||
fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Result<T, Status> {
|
||||
|
23
src/main.rs
23
src/main.rs
@ -2,14 +2,16 @@ mod data;
|
||||
mod grpc;
|
||||
|
||||
use data::BrainData;
|
||||
use detee_shared::sgx::pb::brain::brain_app_cli_server::BrainAppCliServer;
|
||||
use detee_shared::sgx::pb::brain::brain_app_daemon_server::BrainAppDaemonServer;
|
||||
use grpc::snp_proto::brain_cli_server::BrainCliServer;
|
||||
use detee_shared::app_proto::brain_app_cli_server::BrainAppCliServer;
|
||||
use detee_shared::app_proto::brain_app_daemon_server::BrainAppDaemonServer;
|
||||
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
|
||||
use grpc::snp_proto::brain_vm_cli_server::BrainVmCliServer;
|
||||
use grpc::snp_proto::brain_vm_daemon_server::BrainVmDaemonServer;
|
||||
use grpc::BrainAppCliMock;
|
||||
use grpc::BrainAppDaemonMock;
|
||||
use grpc::BrainCliMock;
|
||||
use grpc::BrainDaemonMock;
|
||||
use grpc::BrainGeneraClilMock;
|
||||
use grpc::BrainVmCliMock;
|
||||
use grpc::BrainVmDaemonMock;
|
||||
use std::sync::Arc;
|
||||
use tonic::transport::Server;
|
||||
|
||||
@ -33,17 +35,20 @@ async fn main() {
|
||||
});
|
||||
let addr = "0.0.0.0:31337".parse().unwrap();
|
||||
|
||||
let daemon_server = BrainVmDaemonServer::new(BrainDaemonMock::new(data.clone()));
|
||||
let cli_server = BrainCliServer::new(BrainCliMock::new(data.clone()));
|
||||
let snp_daemon_server = BrainVmDaemonServer::new(BrainVmDaemonMock::new(data.clone()));
|
||||
let snp_cli_server = BrainVmCliServer::new(BrainVmCliMock::new(data.clone()));
|
||||
|
||||
let sgx_cli_server = BrainAppCliServer::new(BrainAppCliMock::new(data.clone()));
|
||||
let sgx_daemon_server = BrainAppDaemonServer::new(BrainAppDaemonMock::new(data.clone()));
|
||||
|
||||
let general_service_server = BrainGeneralCliServer::new(BrainGeneraClilMock::new(data.clone()));
|
||||
|
||||
Server::builder()
|
||||
.add_service(daemon_server)
|
||||
.add_service(cli_server)
|
||||
.add_service(snp_daemon_server)
|
||||
.add_service(snp_cli_server)
|
||||
.add_service(sgx_cli_server)
|
||||
.add_service(sgx_daemon_server)
|
||||
.add_service(general_service_server)
|
||||
.serve(addr)
|
||||
.await
|
||||
.unwrap();
|
||||
|
Loading…
Reference in New Issue
Block a user