delete container implemented
refactor daemon message
This commit is contained in:
parent
c6c44da0e8
commit
230eb8cfbd
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -329,7 +329,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "detee-shared"
|
name = "detee-shared"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#6e1b1853838905c44d535d984d1221dd5d0dc2bc"
|
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#7c9f66a7394c06ad8af0934e34b113f9c965bc98"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"prost",
|
"prost",
|
||||||
|
81
src/data.rs
81
src/data.rs
@ -8,7 +8,7 @@ use std::sync::RwLock;
|
|||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio::sync::oneshot::Sender as OneshotSender;
|
use tokio::sync::oneshot::Sender as OneshotSender;
|
||||||
|
|
||||||
use detee_shared::pb::daemon::{BrainMessage as BrainMessageSgx, NewContainerRes};
|
use detee_shared::pb::daemon as daemonPb;
|
||||||
use detee_shared::pb::shared as sharedPb;
|
use detee_shared::pb::shared as sharedPb;
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
@ -202,8 +202,14 @@ pub struct BrainData {
|
|||||||
daemon_tx: DashMap<String, Sender<grpc::BrainMessage>>,
|
daemon_tx: DashMap<String, Sender<grpc::BrainMessage>>,
|
||||||
|
|
||||||
app_nodes: RwLock<Vec<AppNode>>,
|
app_nodes: RwLock<Vec<AppNode>>,
|
||||||
app_daemon_tx: DashMap<String, Sender<BrainMessageSgx>>,
|
app_daemon_tx: DashMap<String, Sender<daemonPb::BrainMessage>>,
|
||||||
tmp_new_container_reqs: DashMap<String, (sharedPb::Container, OneshotSender<NewContainerRes>)>,
|
tmp_new_container_reqs: DashMap<
|
||||||
|
String,
|
||||||
|
(
|
||||||
|
sharedPb::Container,
|
||||||
|
OneshotSender<daemonPb::NewContainerRes>,
|
||||||
|
),
|
||||||
|
>,
|
||||||
app_contracts: RwLock<Vec<AppContract>>,
|
app_contracts: RwLock<Vec<AppContract>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -738,7 +744,7 @@ impl BrainData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BrainData {
|
impl BrainData {
|
||||||
pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender<BrainMessageSgx>) {
|
pub fn add_app_daemon_tx(&self, node_pubkey: &str, tx: Sender<daemonPb::BrainMessage>) {
|
||||||
self.app_daemon_tx.insert(node_pubkey.to_string(), tx);
|
self.app_daemon_tx.insert(node_pubkey.to_string(), tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -747,7 +753,6 @@ impl BrainData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert_app_node(&self, node: AppNode) {
|
pub fn insert_app_node(&self, node: AppNode) {
|
||||||
info!("Registering app node {node:?}");
|
|
||||||
let mut nodes = self.app_nodes.write().unwrap();
|
let mut nodes = self.app_nodes.write().unwrap();
|
||||||
for n in nodes.iter_mut() {
|
for n in nodes.iter_mut() {
|
||||||
if n.public_key == node.public_key {
|
if n.public_key == node.public_key {
|
||||||
@ -760,6 +765,25 @@ impl BrainData {
|
|||||||
nodes.push(node);
|
nodes.push(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn find_app_contract_by_uuid(&self, uuid: &str) -> Option<AppContract> {
|
||||||
|
let contracts = self.app_contracts.read().unwrap();
|
||||||
|
contracts.iter().cloned().find(|c| c.uuid == uuid)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn find_app_contracts_by_admin_pubkey(&self, admin_pubkey: &str) -> Vec<AppContract> {
|
||||||
|
debug!("Searching contracts for admin pubkey {admin_pubkey}");
|
||||||
|
let contracts: Vec<AppContract> = self
|
||||||
|
.app_contracts
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
.filter(|c| c.admin_pubkey == admin_pubkey)
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
debug!("Found {} contracts or {admin_pubkey}.", contracts.len());
|
||||||
|
contracts
|
||||||
|
}
|
||||||
|
|
||||||
pub fn find_app_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec<AppContract> {
|
pub fn find_app_contracts_by_node_pubkey(&self, node_pubkey: &str) -> Vec<AppContract> {
|
||||||
let app_contracts = self.app_contracts.read().unwrap();
|
let app_contracts = self.app_contracts.read().unwrap();
|
||||||
app_contracts
|
app_contracts
|
||||||
@ -772,7 +796,7 @@ impl BrainData {
|
|||||||
pub async fn send_new_container_req(
|
pub async fn send_new_container_req(
|
||||||
&self,
|
&self,
|
||||||
mut req: sharedPb::Container,
|
mut req: sharedPb::Container,
|
||||||
tx: OneshotSender<NewContainerRes>,
|
tx: OneshotSender<daemonPb::NewContainerRes>,
|
||||||
) {
|
) {
|
||||||
req.uuid = uuid::Uuid::new_v4().to_string();
|
req.uuid = uuid::Uuid::new_v4().to_string();
|
||||||
|
|
||||||
@ -785,7 +809,7 @@ impl BrainData {
|
|||||||
"Found daemon TX for {}. Sending newVMReq {}",
|
"Found daemon TX for {}. Sending newVMReq {}",
|
||||||
req.node_pubkey, req.uuid
|
req.node_pubkey, req.uuid
|
||||||
);
|
);
|
||||||
let msg = BrainMessageSgx {
|
let msg = daemonPb::BrainMessage {
|
||||||
msg: Some(
|
msg: Some(
|
||||||
detee_shared::pb::daemon::brain_message::Msg::NewContainerReq(req.clone()),
|
detee_shared::pb::daemon::brain_message::Msg::NewContainerReq(req.clone()),
|
||||||
),
|
),
|
||||||
@ -797,7 +821,7 @@ impl BrainData {
|
|||||||
);
|
);
|
||||||
info!("Deleting daemon TX for {}", req.node_pubkey);
|
info!("Deleting daemon TX for {}", req.node_pubkey);
|
||||||
self.del_app_daemon_tx(&req.node_pubkey);
|
self.del_app_daemon_tx(&req.node_pubkey);
|
||||||
self.send_new_container_resp(NewContainerRes {
|
self.send_new_container_resp(daemonPb::NewContainerRes {
|
||||||
uuid: req.uuid,
|
uuid: req.uuid,
|
||||||
status: "Failed".to_string(),
|
status: "Failed".to_string(),
|
||||||
error: "Daemon is offline.".to_string(),
|
error: "Daemon is offline.".to_string(),
|
||||||
@ -808,9 +832,44 @@ impl BrainData {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_new_container_resp(&self, new_container_resp: NewContainerRes) {
|
pub async fn send_del_container_req(
|
||||||
dbg!(&new_container_resp.uuid);
|
&self,
|
||||||
dbg!(&self.tmp_new_container_reqs);
|
req: daemonPb::ContainerFilters,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
if let Some(app_contract) = self.find_app_contract_by_uuid(req.uuid()) {
|
||||||
|
info!("Found app contract {}. Deleting...", req.uuid());
|
||||||
|
if let Some(app_daemon_tx) = self.app_daemon_tx.get(&app_contract.node_pubkey) {
|
||||||
|
debug!(
|
||||||
|
"TX for daemon {} found. Informing daemon about deletion of {}.",
|
||||||
|
app_contract.node_pubkey,
|
||||||
|
req.uuid()
|
||||||
|
);
|
||||||
|
let msg = daemonPb::BrainMessage {
|
||||||
|
msg: Some(
|
||||||
|
detee_shared::pb::daemon::brain_message::Msg::DeleteContainer(req.clone()),
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = app_daemon_tx.send(msg).await {
|
||||||
|
warn!(
|
||||||
|
"Failed to send deletion request to {} due to error: {e:?}",
|
||||||
|
app_contract.node_pubkey
|
||||||
|
);
|
||||||
|
info!("Deleting daemon TX for {}", app_contract.node_pubkey);
|
||||||
|
self.del_app_daemon_tx(&app_contract.node_pubkey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut app_contracts = self.app_contracts.write().unwrap();
|
||||||
|
app_contracts.retain(|c| c.uuid != req.uuid());
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
} else {
|
||||||
|
Err("Contract not found".into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send_new_container_resp(&self, new_container_resp: daemonPb::NewContainerRes) {
|
||||||
let new_container_req = match self.tmp_new_container_reqs.remove(&new_container_resp.uuid) {
|
let new_container_req = match self.tmp_new_container_reqs.remove(&new_container_resp.uuid) {
|
||||||
Some((_, r)) => r,
|
Some((_, r)) => r,
|
||||||
None => {
|
None => {
|
||||||
|
42
src/grpc.rs
42
src/grpc.rs
@ -17,8 +17,7 @@ use tonic::{Request, Response, Status, Streaming};
|
|||||||
|
|
||||||
use detee_shared::pb::daemon::{
|
use detee_shared::pb::daemon::{
|
||||||
brain_sgx_cli_server::BrainSgxCli, brain_sgx_daemon_server::BrainSgxDaemon,
|
brain_sgx_cli_server::BrainSgxCli, brain_sgx_daemon_server::BrainSgxDaemon,
|
||||||
daemon_message as sgx_daemon_message, ContainerFilters, ContainerListResp, DeleteContainerRes,
|
daemon_message as sgx_daemon_message, ContainerFilters, ContainerListResp, NewContainerRes,
|
||||||
NewContainerRes,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct BrainDaemonMock {
|
pub struct BrainDaemonMock {
|
||||||
@ -302,11 +301,18 @@ impl BrainSgxCli for BrainSgxCliMock {
|
|||||||
async fn delete_container(
|
async fn delete_container(
|
||||||
&self,
|
&self,
|
||||||
req: tonic::Request<ContainerFilters>,
|
req: tonic::Request<ContainerFilters>,
|
||||||
) -> Result<tonic::Response<DeleteContainerRes>, Status> {
|
) -> Result<tonic::Response<detee_shared::pb::shared::Empty>, Status> {
|
||||||
dbg!(req);
|
let req = req.into_inner();
|
||||||
Ok(Response::new(DeleteContainerRes {
|
log::info!(
|
||||||
..Default::default()
|
"deleting container: {}",
|
||||||
}))
|
req.uuid.clone().unwrap_or_default()
|
||||||
|
);
|
||||||
|
if let Err(er) = self.data.send_del_container_req(req).await {
|
||||||
|
info!("Could not delete container: {er}");
|
||||||
|
return Err(Status::not_found("Could not find container"));
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Response::new(detee_shared::pb::shared::Empty {}))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_containers(
|
async fn list_containers(
|
||||||
@ -342,8 +348,12 @@ impl BrainSgxDaemon for BrainSgxDaemonMock {
|
|||||||
&self,
|
&self,
|
||||||
req: tonic::Request<detee_shared::pb::shared::RegisterNodeReq>,
|
req: tonic::Request<detee_shared::pb::shared::RegisterNodeReq>,
|
||||||
) -> Result<tonic::Response<Self::RegisterNodeStream>, Status> {
|
) -> Result<tonic::Response<Self::RegisterNodeStream>, Status> {
|
||||||
log::info!("registering app node : {:?}", &req);
|
|
||||||
let req_data = req.into_inner();
|
let req_data = req.into_inner();
|
||||||
|
log::info!(
|
||||||
|
"registering app node_key : {}, owner_key: {}",
|
||||||
|
&req_data.node_pubkey,
|
||||||
|
&req_data.owner_pubkey
|
||||||
|
);
|
||||||
|
|
||||||
let app_node = crate::data::AppNode {
|
let app_node = crate::data::AppNode {
|
||||||
public_key: req_data.node_pubkey.clone(),
|
public_key: req_data.node_pubkey.clone(),
|
||||||
@ -390,24 +400,26 @@ impl BrainSgxDaemon for BrainSgxDaemonMock {
|
|||||||
&self,
|
&self,
|
||||||
req: tonic::Request<Streaming<detee_shared::pb::daemon::DaemonMessage>>,
|
req: tonic::Request<Streaming<detee_shared::pb::daemon::DaemonMessage>>,
|
||||||
) -> Result<tonic::Response<detee_shared::pb::shared::Empty>, Status> {
|
) -> Result<tonic::Response<detee_shared::pb::shared::Empty>, Status> {
|
||||||
dbg!(&req);
|
|
||||||
let mut req_stream = req.into_inner();
|
let mut req_stream = req.into_inner();
|
||||||
|
let mut pubkey = String::new();
|
||||||
|
|
||||||
while let Some(daemon_message) = req_stream.next().await {
|
while let Some(daemon_message) = req_stream.next().await {
|
||||||
match daemon_message {
|
match daemon_message {
|
||||||
Ok(msg) => match msg.msg {
|
Ok(msg) => match msg.msg {
|
||||||
Some(sgx_daemon_message::Msg::NewContainerResp(new_cont)) => {
|
Some(sgx_daemon_message::Msg::Pubkey(something)) => {
|
||||||
dbg!(&new_cont);
|
pubkey = something.pubkey;
|
||||||
self.data.send_new_container_resp(new_cont).await;
|
|
||||||
}
|
}
|
||||||
Some(something) => {
|
Some(sgx_daemon_message::Msg::NewContainerResp(new_cont)) => {
|
||||||
dbg!(something);
|
self.data.send_new_container_resp(new_cont).await;
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
dbg!("None");
|
dbg!("None");
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(_) => todo!(),
|
Err(e) => {
|
||||||
|
log::warn!("Daemon disconnected: {e:?}");
|
||||||
|
self.data.del_app_daemon_tx(&pubkey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
//
|
//
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user