redirect sgx container create call to daemon and response to cli
This commit is contained in:
parent
b1258ac7c7
commit
8f40adcbf8
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -329,7 +329,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "detee-shared"
|
name = "detee-shared"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#3e783b11bab6894b6f98bd3c6ce44e8bf5b1f78b"
|
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#a2899ba5a25794aec60a093695675ff24f967484"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"prost",
|
"prost",
|
||||||
|
81
src/data.rs
81
src/data.rs
@ -8,6 +8,10 @@ use std::sync::RwLock;
|
|||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio::sync::oneshot::Sender as OneshotSender;
|
use tokio::sync::oneshot::Sender as OneshotSender;
|
||||||
|
|
||||||
|
use detee_shared::pb::daemon::{BrainMessage as BrainMessageSgx, NewContainerRes};
|
||||||
|
use detee_shared::pb::shared as sharedPb;
|
||||||
|
use detee_shared::pb::shared::Container;
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error("We do not allow locking of more than 100000 tokens.")]
|
#[error("We do not allow locking of more than 100000 tokens.")]
|
||||||
@ -143,6 +147,9 @@ pub struct BrainData {
|
|||||||
tmp_newvm_reqs: DashMap<String, (grpc::NewVmReq, OneshotSender<grpc::NewVmResp>)>,
|
tmp_newvm_reqs: DashMap<String, (grpc::NewVmReq, OneshotSender<grpc::NewVmResp>)>,
|
||||||
tmp_updatevm_reqs: DashMap<String, (grpc::UpdateVmReq, OneshotSender<grpc::UpdateVmResp>)>,
|
tmp_updatevm_reqs: DashMap<String, (grpc::UpdateVmReq, OneshotSender<grpc::UpdateVmResp>)>,
|
||||||
daemon_tx: DashMap<String, Sender<grpc::BrainMessage>>,
|
daemon_tx: DashMap<String, Sender<grpc::BrainMessage>>,
|
||||||
|
|
||||||
|
sgx_daemon_tx: DashMap<String, Sender<BrainMessageSgx>>,
|
||||||
|
tmp_new_container_reqs: DashMap<String, (sharedPb::Container, OneshotSender<NewContainerRes>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -161,6 +168,9 @@ impl BrainData {
|
|||||||
tmp_newvm_reqs: DashMap::new(),
|
tmp_newvm_reqs: DashMap::new(),
|
||||||
tmp_updatevm_reqs: DashMap::new(),
|
tmp_updatevm_reqs: DashMap::new(),
|
||||||
daemon_tx: DashMap::new(),
|
daemon_tx: DashMap::new(),
|
||||||
|
|
||||||
|
sgx_daemon_tx: DashMap::new(),
|
||||||
|
tmp_new_container_reqs: DashMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -669,3 +679,74 @@ impl BrainData {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl BrainData {
|
||||||
|
pub fn add_sgx_daemon_tx(&self, node_pubkey: &str, tx: Sender<BrainMessageSgx>) {
|
||||||
|
self.sgx_daemon_tx.insert(node_pubkey.to_string(), tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn del_sgx_daemon_tx(&self, node_pubkey: &str) {
|
||||||
|
self.sgx_daemon_tx.remove(node_pubkey);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send_new_container_req(
|
||||||
|
&self,
|
||||||
|
mut req: Container,
|
||||||
|
tx: OneshotSender<NewContainerRes>,
|
||||||
|
) {
|
||||||
|
req.uuid = uuid::Uuid::new_v4().to_string();
|
||||||
|
|
||||||
|
info!("Inserting new container request in memory: {req:?}");
|
||||||
|
self.tmp_new_container_reqs
|
||||||
|
.insert(req.uuid.clone(), (req.clone(), tx));
|
||||||
|
|
||||||
|
if let Some(sgx_daemon_tx) = self.sgx_daemon_tx.get(&req.node_pubkey) {
|
||||||
|
debug!(
|
||||||
|
"Found daemon TX for {}. Sending newVMReq {}",
|
||||||
|
req.node_pubkey, req.uuid
|
||||||
|
);
|
||||||
|
let msg = BrainMessageSgx {
|
||||||
|
msg: Some(
|
||||||
|
detee_shared::pb::daemon::brain_message::Msg::NewContainerReq(req.clone()),
|
||||||
|
),
|
||||||
|
};
|
||||||
|
if let Err(e) = sgx_daemon_tx.send(msg).await {
|
||||||
|
warn!(
|
||||||
|
"Failed to send new container request to {} due to error: {e:?}",
|
||||||
|
req.node_pubkey
|
||||||
|
);
|
||||||
|
info!("Deleting daemon TX for {}", req.node_pubkey);
|
||||||
|
self.del_sgx_daemon_tx(&req.node_pubkey);
|
||||||
|
self.send_new_container_resp(NewContainerRes {
|
||||||
|
uuid: req.uuid,
|
||||||
|
status: "Failed".to_string(),
|
||||||
|
error: "Daemon is offline.".to_string(),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send_new_container_resp(&self, new_container_resp: NewContainerRes) {
|
||||||
|
dbg!(&new_container_resp.uuid);
|
||||||
|
dbg!(&self.tmp_new_container_reqs);
|
||||||
|
let new_container_req = match self.tmp_new_container_reqs.remove(&new_container_resp.uuid) {
|
||||||
|
Some((_, r)) => r,
|
||||||
|
None => {
|
||||||
|
log::error!(
|
||||||
|
"Received confirmation for ghost new container req {}",
|
||||||
|
new_container_resp.uuid
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(_) = new_container_req.1.send(new_container_resp.clone()) {
|
||||||
|
log::error!(
|
||||||
|
"CLI RX for {} dropped before receiving confirmation {:?}.",
|
||||||
|
&new_container_req.0.admin_pubkey,
|
||||||
|
new_container_resp,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
86
src/grpc.rs
86
src/grpc.rs
@ -16,8 +16,9 @@ use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
|
|||||||
use tonic::{Request, Response, Status, Streaming};
|
use tonic::{Request, Response, Status, Streaming};
|
||||||
|
|
||||||
use detee_shared::pb::daemon::{
|
use detee_shared::pb::daemon::{
|
||||||
brain_sgx_cli_server::BrainSgxCli, brain_sgx_daemon_server::BrainSgxDaemon, ContainerFilters,
|
brain_sgx_cli_server::BrainSgxCli, brain_sgx_daemon_server::BrainSgxDaemon,
|
||||||
ContainerListResp, DeleteContainerRes, NewContainerRes,
|
daemon_message as sgx_daemon_message, ContainerFilters, ContainerListResp, DeleteContainerRes,
|
||||||
|
NewContainerRes,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct BrainDaemonMock {
|
pub struct BrainDaemonMock {
|
||||||
@ -278,10 +279,24 @@ impl BrainSgxCli for BrainSgxCliMock {
|
|||||||
&self,
|
&self,
|
||||||
req: tonic::Request<detee_shared::pb::shared::Container>,
|
req: tonic::Request<detee_shared::pb::shared::Container>,
|
||||||
) -> Result<tonic::Response<NewContainerRes>, Status> {
|
) -> Result<tonic::Response<NewContainerRes>, Status> {
|
||||||
dbg!(req);
|
let req = req.into_inner();
|
||||||
Ok(Response::new(NewContainerRes {
|
log::info!("Creating new container: {req:?}");
|
||||||
..Default::default()
|
let admin_pubkey = req.admin_pubkey.clone();
|
||||||
}))
|
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
|
||||||
|
self.data.send_new_container_req(req, oneshot_tx).await;
|
||||||
|
|
||||||
|
match oneshot_rx.await {
|
||||||
|
Ok(response) => {
|
||||||
|
info!("responding container confirmation to {admin_pubkey}: {response:?}");
|
||||||
|
Ok(Response::new(response))
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Something went wrong. Reached error {e:?}");
|
||||||
|
Err(Status::unknown(
|
||||||
|
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_container(
|
async fn delete_container(
|
||||||
@ -331,33 +346,56 @@ impl BrainSgxDaemon for BrainSgxDaemonMock {
|
|||||||
let (tx, rx) = mpsc::channel(6);
|
let (tx, rx) = mpsc::channel(6);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
for _ in 0..5 {
|
let _ = tx
|
||||||
let _ = tx
|
.send(detee_shared::pb::shared::ContainerContracts {
|
||||||
.send(detee_shared::pb::shared::ContainerContracts {
|
..Default::default()
|
||||||
..Default::default()
|
})
|
||||||
})
|
.await;
|
||||||
.await;
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
|
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
|
||||||
Ok(Response::new(Box::pin(output_stream)))
|
Ok(Response::new(Box::pin(output_stream)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn brain_messages(
|
||||||
|
&self,
|
||||||
|
req: tonic::Request<detee_shared::pb::shared::Pubkey>,
|
||||||
|
) -> Result<tonic::Response<Self::BrainMessagesStream>, Status> {
|
||||||
|
let req = req.into_inner();
|
||||||
|
info!("Daemon {} connected to receive brain messages", req.pubkey);
|
||||||
|
let (tx, rx) = mpsc::channel(6);
|
||||||
|
self.data.add_sgx_daemon_tx(&req.pubkey, tx);
|
||||||
|
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
|
||||||
|
Ok(Response::new(
|
||||||
|
Box::pin(output_stream) as Self::BrainMessagesStream
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
async fn daemon_messages(
|
async fn daemon_messages(
|
||||||
&self,
|
&self,
|
||||||
req: tonic::Request<Streaming<detee_shared::pb::daemon::DaemonMessage>>,
|
req: tonic::Request<Streaming<detee_shared::pb::daemon::DaemonMessage>>,
|
||||||
) -> Result<tonic::Response<detee_shared::pb::shared::Empty>, Status> {
|
) -> Result<tonic::Response<detee_shared::pb::shared::Empty>, Status> {
|
||||||
dbg!(req);
|
dbg!(&req);
|
||||||
|
let mut req_stream = req.into_inner();
|
||||||
|
|
||||||
|
while let Some(daemon_message) = req_stream.next().await {
|
||||||
|
match daemon_message {
|
||||||
|
Ok(msg) => match msg.msg {
|
||||||
|
Some(sgx_daemon_message::Msg::NewContainerResp(new_cont)) => {
|
||||||
|
dbg!(&new_cont);
|
||||||
|
self.data.send_new_container_resp(new_cont).await;
|
||||||
|
}
|
||||||
|
Some(something) => {
|
||||||
|
dbg!(something);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
dbg!("None");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => todo!(),
|
||||||
|
}
|
||||||
|
//
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Response::new(detee_shared::pb::shared::Empty {}))
|
Ok(Response::new(detee_shared::pb::shared::Empty {}))
|
||||||
}
|
}
|
||||||
async fn brain_messages(
|
|
||||||
&self,
|
|
||||||
req: tonic::Request<detee_shared::pb::shared::Pubkey>,
|
|
||||||
) -> Result<tonic::Response<Self::BrainMessagesStream>, Status> {
|
|
||||||
dbg!(req);
|
|
||||||
Ok(Response::new(
|
|
||||||
Box::pin(tokio_stream::empty()) as Self::BrainMessagesStream
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user