forked from ghe0/brain-mock
integrating new proto structure
This commit is contained in:
parent
230eb8cfbd
commit
5ae69dee52
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -329,7 +329,6 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "detee-shared"
|
name = "detee-shared"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#7c9f66a7394c06ad8af0934e34b113f9c965bc98"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"prost",
|
"prost",
|
||||||
|
61
src/data.rs
61
src/data.rs
@ -2,14 +2,18 @@
|
|||||||
use crate::grpc::snp_proto::{self as grpc};
|
use crate::grpc::snp_proto::{self as grpc};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
|
use detee_shared::pb::brain::DelAppReq;
|
||||||
use log::{debug, info, warn};
|
use log::{debug, info, warn};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio::sync::oneshot::Sender as OneshotSender;
|
use tokio::sync::oneshot::Sender as OneshotSender;
|
||||||
|
|
||||||
use detee_shared::pb::daemon as daemonPb;
|
use detee_shared::pb::brain::AppContract as AppContractPB;
|
||||||
use detee_shared::pb::shared as sharedPb;
|
use detee_shared::pb::brain::BrainMessageApp;
|
||||||
|
use detee_shared::pb::brain::MappedPort;
|
||||||
|
use detee_shared::pb::brain::NewAppReq;
|
||||||
|
use detee_shared::pb::brain::NewAppRes;
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
@ -157,7 +161,7 @@ pub struct AppContract {
|
|||||||
pub collected_at: chrono::DateTime<Utc>,
|
pub collected_at: chrono::DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<AppContract> for sharedPb::ContainerContracts {
|
impl From<AppContract> for AppContractPB {
|
||||||
fn from(value: AppContract) -> Self {
|
fn from(value: AppContract) -> Self {
|
||||||
Self {
|
Self {
|
||||||
uuid: value.uuid,
|
uuid: value.uuid,
|
||||||
@ -167,7 +171,7 @@ impl From<AppContract> for sharedPb::ContainerContracts {
|
|||||||
exposed_ports: value
|
exposed_ports: value
|
||||||
.mapped_ports
|
.mapped_ports
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(sharedPb::MappedPort::from)
|
.map(MappedPort::from)
|
||||||
.collect(),
|
.collect(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}
|
}
|
||||||
@ -202,14 +206,8 @@ pub struct BrainData {
|
|||||||
daemon_tx: DashMap<String, Sender<grpc::BrainMessage>>,
|
daemon_tx: DashMap<String, Sender<grpc::BrainMessage>>,
|
||||||
|
|
||||||
app_nodes: RwLock<Vec<AppNode>>,
|
app_nodes: RwLock<Vec<AppNode>>,
|
||||||
app_daemon_tx: DashMap<String, Sender<daemonPb::BrainMessage>>,
|
app_daemon_tx: DashMap<String, Sender<BrainMessageApp>>,
|
||||||
tmp_new_container_reqs: DashMap<
|
tmp_new_container_reqs: DashMap<String, (NewAppReq, OneshotSender<NewAppRes>)>,
|
||||||
String,
|
|
||||||
(
|
|
||||||
sharedPb::Container,
|
|
||||||
OneshotSender<daemonPb::NewContainerRes>,
|
|
||||||
),
|
|
||||||
>,
|
|
||||||
app_contracts: RwLock<Vec<AppContract>>,
|
app_contracts: RwLock<Vec<AppContract>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -744,7 +742,7 @@ impl BrainData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BrainData {
|
impl BrainData {
|
||||||
pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender<daemonPb::BrainMessage>) {
|
pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender<BrainMessageApp>) {
|
||||||
self.app_daemon_tx.insert(node_pubkey.to_string(), tx);
|
self.app_daemon_tx.insert(node_pubkey.to_string(), tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -767,7 +765,7 @@ impl BrainData {
|
|||||||
|
|
||||||
pub fn find_app_contract_by_uuid(&self, uuid: &str) -> Option<AppContract> {
|
pub fn find_app_contract_by_uuid(&self, uuid: &str) -> Option<AppContract> {
|
||||||
let contracts = self.app_contracts.read().unwrap();
|
let contracts = self.app_contracts.read().unwrap();
|
||||||
contracts.iter().cloned().find(|c| c.uuid == uuid)
|
contracts.iter().find(|c| c.uuid == uuid).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn find_app_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec<AppContract> {
|
pub fn find_app_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec<AppContract> {
|
||||||
@ -793,11 +791,7 @@ impl BrainData {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_new_container_req(
|
pub async fn send_new_container_req(&self, mut req: NewAppReq, tx: OneshotSender<NewAppRes>) {
|
||||||
&self,
|
|
||||||
mut req: sharedPb::Container,
|
|
||||||
tx: OneshotSender<daemonPb::NewContainerRes>,
|
|
||||||
) {
|
|
||||||
req.uuid = uuid::Uuid::new_v4().to_string();
|
req.uuid = uuid::Uuid::new_v4().to_string();
|
||||||
|
|
||||||
info!("Inserting new container request in memory: {req:?}");
|
info!("Inserting new container request in memory: {req:?}");
|
||||||
@ -809,10 +803,10 @@ impl BrainData {
|
|||||||
"Found daemon TX for {}. Sending newVMReq {}",
|
"Found daemon TX for {}. Sending newVMReq {}",
|
||||||
req.node_pubkey, req.uuid
|
req.node_pubkey, req.uuid
|
||||||
);
|
);
|
||||||
let msg = daemonPb::BrainMessage {
|
let msg = BrainMessageApp {
|
||||||
msg: Some(
|
msg: Some(detee_shared::pb::brain::brain_message_app::Msg::NewAppReq(
|
||||||
detee_shared::pb::daemon::brain_message::Msg::NewContainerReq(req.clone()),
|
req.clone(),
|
||||||
),
|
)),
|
||||||
};
|
};
|
||||||
if let Err(e) = app_daemon_tx.send(msg).await {
|
if let Err(e) = app_daemon_tx.send(msg).await {
|
||||||
warn!(
|
warn!(
|
||||||
@ -821,7 +815,7 @@ impl BrainData {
|
|||||||
);
|
);
|
||||||
info!("Deleting daemon TX for {}", req.node_pubkey);
|
info!("Deleting daemon TX for {}", req.node_pubkey);
|
||||||
self.del_app_daemon_tx(&req.node_pubkey);
|
self.del_app_daemon_tx(&req.node_pubkey);
|
||||||
self.send_new_container_resp(daemonPb::NewContainerRes {
|
self.send_new_container_resp(NewAppRes {
|
||||||
uuid: req.uuid,
|
uuid: req.uuid,
|
||||||
status: "Failed".to_string(),
|
status: "Failed".to_string(),
|
||||||
error: "Daemon is offline.".to_string(),
|
error: "Daemon is offline.".to_string(),
|
||||||
@ -834,19 +828,18 @@ impl BrainData {
|
|||||||
|
|
||||||
pub async fn send_del_container_req(
|
pub async fn send_del_container_req(
|
||||||
&self,
|
&self,
|
||||||
req: daemonPb::ContainerFilters,
|
req: DelAppReq,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
if let Some(app_contract) = self.find_app_contract_by_uuid(req.uuid()) {
|
if let Some(app_contract) = self.find_app_contract_by_uuid(&req.uuid) {
|
||||||
info!("Found app contract {}. Deleting...", req.uuid());
|
info!("Found app contract {}. Deleting...", &req.uuid);
|
||||||
if let Some(app_daemon_tx) = self.app_daemon_tx.get(&app_contract.node_pubkey) {
|
if let Some(app_daemon_tx) = self.app_daemon_tx.get(&app_contract.node_pubkey) {
|
||||||
debug!(
|
debug!(
|
||||||
"TX for daemon {} found. Informing daemon about deletion of {}.",
|
"TX for daemon {} found. Informing daemon about deletion of {}.",
|
||||||
app_contract.node_pubkey,
|
app_contract.node_pubkey, &req.uuid
|
||||||
req.uuid()
|
|
||||||
);
|
);
|
||||||
let msg = daemonPb::BrainMessage {
|
let msg = BrainMessageApp {
|
||||||
msg: Some(
|
msg: Some(
|
||||||
detee_shared::pb::daemon::brain_message::Msg::DeleteContainer(req.clone()),
|
detee_shared::pb::brain::brain_message_app::Msg::DeleteAppReq(req.clone()),
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -861,15 +854,15 @@ impl BrainData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut app_contracts = self.app_contracts.write().unwrap();
|
let mut app_contracts = self.app_contracts.write().unwrap();
|
||||||
app_contracts.retain(|c| c.uuid != req.uuid());
|
app_contracts.retain(|c| c.uuid != req.uuid);
|
||||||
|
|
||||||
return Ok(());
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err("Contract not found".into())
|
Err("Contract not found".into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_new_container_resp(&self, new_container_resp: daemonPb::NewContainerRes) {
|
pub async fn send_new_container_resp(&self, new_container_resp: NewAppRes) {
|
||||||
let new_container_req = match self.tmp_new_container_reqs.remove(&new_container_resp.uuid) {
|
let new_container_req = match self.tmp_new_container_reqs.remove(&new_container_resp.uuid) {
|
||||||
Some((_, r)) => r,
|
Some((_, r)) => r,
|
||||||
None => {
|
None => {
|
||||||
|
51
src/grpc.rs
51
src/grpc.rs
@ -15,9 +15,10 @@ use tokio::sync::mpsc;
|
|||||||
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
|
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
|
||||||
use tonic::{Request, Response, Status, Streaming};
|
use tonic::{Request, Response, Status, Streaming};
|
||||||
|
|
||||||
use detee_shared::pb::daemon::{
|
use detee_shared::pb::brain::brain_app_cli_server::BrainAppCli;
|
||||||
brain_sgx_cli_server::BrainSgxCli, brain_sgx_daemon_server::BrainSgxDaemon,
|
use detee_shared::pb::brain::brain_app_daemon_server::BrainAppDaemon;
|
||||||
daemon_message as sgx_daemon_message, ContainerFilters, ContainerListResp, NewContainerRes,
|
use detee_shared::pb::brain::{
|
||||||
|
AppContract, BrainMessageApp, NewAppReq, NewAppRes, RegisterAppNodeReq,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct BrainDaemonMock {
|
pub struct BrainDaemonMock {
|
||||||
@ -40,21 +41,21 @@ impl BrainCliMock {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct BrainSgxCliMock {
|
pub struct BrainAppCliMock {
|
||||||
data: Arc<BrainData>,
|
data: Arc<BrainData>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BrainSgxCliMock {
|
impl BrainAppCliMock {
|
||||||
pub fn new(data: Arc<BrainData>) -> Self {
|
pub fn new(data: Arc<BrainData>) -> Self {
|
||||||
Self { data }
|
Self { data }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct BrainSgxDaemonMock {
|
pub struct BrainAppDaemonMock {
|
||||||
data: Arc<BrainData>,
|
data: Arc<BrainData>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BrainSgxDaemonMock {
|
impl BrainAppDaemonMock {
|
||||||
pub fn new(data: Arc<BrainData>) -> Self {
|
pub fn new(data: Arc<BrainData>) -> Self {
|
||||||
Self { data }
|
Self { data }
|
||||||
}
|
}
|
||||||
@ -273,11 +274,11 @@ impl BrainCli for BrainCliMock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl BrainSgxCli for BrainSgxCliMock {
|
impl BrainAppCli for BrainAppCliMock {
|
||||||
async fn create_container(
|
async fn create_app(
|
||||||
&self,
|
&self,
|
||||||
req: tonic::Request<detee_shared::pb::shared::Container>,
|
req: tonic::Request<NewAppReq>,
|
||||||
) -> Result<tonic::Response<NewContainerRes>, Status> {
|
) -> Result<tonic::Response<NewAppRes>, Status> {
|
||||||
let req = req.into_inner();
|
let req = req.into_inner();
|
||||||
log::info!("Creating new container: {req:?}");
|
log::info!("Creating new container: {req:?}");
|
||||||
let admin_pubkey = req.admin_pubkey.clone();
|
let admin_pubkey = req.admin_pubkey.clone();
|
||||||
@ -298,6 +299,8 @@ impl BrainSgxCli for BrainSgxCliMock {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
async fn delete_container(
|
async fn delete_container(
|
||||||
&self,
|
&self,
|
||||||
req: tonic::Request<ContainerFilters>,
|
req: tonic::Request<ContainerFilters>,
|
||||||
@ -319,12 +322,17 @@ impl BrainSgxCli for BrainSgxCliMock {
|
|||||||
&self,
|
&self,
|
||||||
req: tonic::Request<ContainerFilters>,
|
req: tonic::Request<ContainerFilters>,
|
||||||
) -> Result<tonic::Response<ContainerListResp>, Status> {
|
) -> Result<tonic::Response<ContainerListResp>, Status> {
|
||||||
dbg!(req);
|
let req_data = req.into_inner();
|
||||||
|
dbg!(&req_data);
|
||||||
|
|
||||||
|
// let containers = self.data.find_app_contracts_by_admin_pubkey(&req_data.admin_pubkey).into();
|
||||||
Ok(Response::new(ContainerListResp {
|
Ok(Response::new(ContainerListResp {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
// async fn inspect_container(
|
// async fn inspect_container(
|
||||||
// &self,
|
// &self,
|
||||||
// req: tonic::Request<detee_shared::pb::shared::Uuid>,
|
// req: tonic::Request<detee_shared::pb::shared::Uuid>,
|
||||||
@ -337,16 +345,13 @@ impl BrainSgxCli for BrainSgxCliMock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl BrainSgxDaemon for BrainSgxDaemonMock {
|
impl BrainAppDaemon for BrainAppDaemonMock {
|
||||||
type RegisterNodeStream = Pin<
|
type RegisterNodeStream = Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
|
||||||
Box<dyn Stream<Item = Result<detee_shared::pb::shared::ContainerContracts, Status>> + Send>,
|
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessageApp, Status>> + Send>>;
|
||||||
>;
|
|
||||||
type BrainMessagesStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<detee_shared::pb::daemon::BrainMessage, Status>> + Send>>;
|
|
||||||
|
|
||||||
async fn register_node(
|
async fn register_node(
|
||||||
&self,
|
&self,
|
||||||
req: tonic::Request<detee_shared::pb::shared::RegisterNodeReq>,
|
req: tonic::Request<RegisterAppNodeReq>,
|
||||||
) -> Result<tonic::Response<Self::RegisterNodeStream>, Status> {
|
) -> Result<tonic::Response<Self::RegisterNodeStream>, Status> {
|
||||||
let req_data = req.into_inner();
|
let req_data = req.into_inner();
|
||||||
log::info!(
|
log::info!(
|
||||||
@ -378,24 +383,25 @@ impl BrainSgxDaemon for BrainSgxDaemonMock {
|
|||||||
let _ = tx.send(contract.into()).await;
|
let _ = tx.send(contract.into()).await;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
|
let output_stream = ReceiverStream::new(rx).map(Ok);
|
||||||
Ok(Response::new(Box::pin(output_stream)))
|
Ok(Response::new(Box::pin(output_stream)))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn brain_messages(
|
async fn brain_messages(
|
||||||
&self,
|
&self,
|
||||||
req: tonic::Request<detee_shared::pb::shared::Pubkey>,
|
req: tonic::Request<detee_shared::pb::brain::Pubkey>,
|
||||||
) -> Result<tonic::Response<Self::BrainMessagesStream>, Status> {
|
) -> Result<tonic::Response<Self::BrainMessagesStream>, Status> {
|
||||||
let req = req.into_inner();
|
let req = req.into_inner();
|
||||||
info!("Daemon {} connected to receive brain messages", req.pubkey);
|
info!("Daemon {} connected to receive brain messages", req.pubkey);
|
||||||
let (tx, rx) = mpsc::channel(6);
|
let (tx, rx) = mpsc::channel(6);
|
||||||
self.data.add_app_daemon_tx(&req.pubkey, tx);
|
self.data.add_app_daemon_tx(&req.pubkey, tx);
|
||||||
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
|
let output_stream = ReceiverStream::new(rx).map(Ok);
|
||||||
Ok(Response::new(
|
Ok(Response::new(
|
||||||
Box::pin(output_stream) as Self::BrainMessagesStream
|
Box::pin(output_stream) as Self::BrainMessagesStream
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
async fn daemon_messages(
|
async fn daemon_messages(
|
||||||
&self,
|
&self,
|
||||||
req: tonic::Request<Streaming<detee_shared::pb::daemon::DaemonMessage>>,
|
req: tonic::Request<Streaming<detee_shared::pb::daemon::DaemonMessage>>,
|
||||||
@ -426,4 +432,5 @@ impl BrainSgxDaemon for BrainSgxDaemonMock {
|
|||||||
|
|
||||||
Ok(Response::new(detee_shared::pb::shared::Empty {}))
|
Ok(Response::new(detee_shared::pb::shared::Empty {}))
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
13
src/main.rs
13
src/main.rs
@ -2,18 +2,17 @@ mod data;
|
|||||||
mod grpc;
|
mod grpc;
|
||||||
|
|
||||||
use data::BrainData;
|
use data::BrainData;
|
||||||
use detee_shared::pb::daemon::brain_sgx_daemon_server::BrainSgxDaemonServer;
|
use detee_shared::pb::brain::brain_app_cli_server::BrainAppCliServer;
|
||||||
|
use detee_shared::pb::brain::brain_app_daemon_server::BrainAppDaemonServer;
|
||||||
use grpc::snp_proto::brain_cli_server::BrainCliServer;
|
use grpc::snp_proto::brain_cli_server::BrainCliServer;
|
||||||
use grpc::snp_proto::brain_daemon_server::BrainDaemonServer;
|
use grpc::snp_proto::brain_daemon_server::BrainDaemonServer;
|
||||||
|
use grpc::BrainAppCliMock;
|
||||||
|
use grpc::BrainAppDaemonMock;
|
||||||
use grpc::BrainCliMock;
|
use grpc::BrainCliMock;
|
||||||
use grpc::BrainDaemonMock;
|
use grpc::BrainDaemonMock;
|
||||||
use grpc::BrainSgxCliMock;
|
|
||||||
use grpc::BrainSgxDaemonMock;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tonic::transport::Server;
|
use tonic::transport::Server;
|
||||||
|
|
||||||
use detee_shared::pb::daemon::brain_sgx_cli_server::BrainSgxCliServer;
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
env_logger::builder()
|
env_logger::builder()
|
||||||
@ -32,8 +31,8 @@ async fn main() {
|
|||||||
let daemon_server = BrainDaemonServer::new(BrainDaemonMock::new(data.clone()));
|
let daemon_server = BrainDaemonServer::new(BrainDaemonMock::new(data.clone()));
|
||||||
let cli_server = BrainCliServer::new(BrainCliMock::new(data.clone()));
|
let cli_server = BrainCliServer::new(BrainCliMock::new(data.clone()));
|
||||||
|
|
||||||
let sgx_cli_server = BrainSgxCliServer::new(BrainSgxCliMock::new(data.clone()));
|
let sgx_cli_server = BrainAppCliServer::new(BrainAppCliMock::new(data.clone()));
|
||||||
let sgx_daemon_server = BrainSgxDaemonServer::new(BrainSgxDaemonMock::new(data.clone()));
|
let sgx_daemon_server = BrainAppDaemonServer::new(BrainAppDaemonMock::new(data.clone()));
|
||||||
|
|
||||||
Server::builder()
|
Server::builder()
|
||||||
.add_service(daemon_server)
|
.add_service(daemon_server)
|
||||||
|
Loading…
Reference in New Issue
Block a user