integrating new proto structure

This commit is contained in:
Noor 2025-02-04 12:57:23 +00:00
parent 23777d8a54
commit afaa478f46
Signed by: noormohammedb
GPG Key ID: E424C39E19EFD7DF
4 changed files with 55 additions and 47 deletions

2
Cargo.lock generated

@ -290,7 +290,7 @@ dependencies = [
[[package]]
name = "detee-shared"
version = "0.1.0"
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#7c9f66a7394c06ad8af0934e34b113f9c965bc98"
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#42443b816299afb0da6493b35165f8ce7558d717"
dependencies = [
"base64",
"prost",

@ -1,6 +1,6 @@
use anyhow::{anyhow, Result};
use detee_shared::types::shared::Container as ContainerConfig;
use detee_shared::types::shared::Resource as ResourceConfig;
use detee_shared::types::brain::AppDeployConfig;
use detee_shared::types::brain::Resource as ResourceConfig;
use crate::container::delete_enclave;
use crate::container::deploy_enclave;
@ -24,10 +24,10 @@ pub struct Container {
impl DaemonState {
pub async fn create_new_container(
&mut self,
req_data: ContainerConfig,
req_data: AppDeployConfig,
unarchive_dir: String,
) -> Result<Vec<(u16, u16)>> {
let publishing_ports = req_data.resource.clone().unwrap().port;
let publishing_ports = req_data.resource.clone().port;
let uuid = req_data.uuid;
let container_name = format!("dtpm-{uuid}");
let mapped_ports =
@ -39,7 +39,7 @@ impl DaemonState {
package_path: unarchive_dir,
status: "running".to_string(),
admin: req_data.admin_pubkey,
container_resource: req_data.resource.unwrap(),
container_resource: req_data.resource,
mapped_ports: mapped_ports.clone(),
};

@ -1,13 +1,13 @@
use detee_shared::pb::brain::brain_app_daemon_client::BrainAppDaemonClient;
use detee_shared::pb::brain::{
AppContract, BrainMessageApp, DaemonMessageApp, Pubkey, RegisterAppNodeReq,
};
use anyhow::Result;
use detee_shared::pb::daemon::brain_sgx_daemon_client::BrainSgxDaemonClient;
use detee_shared::pb::daemon::daemon_message::Msg;
use detee_shared::pb::daemon::{BrainMessage, DaemonMessage};
use detee_shared::pb::shared::ContainerContracts;
use detee_shared::pb::shared::{Pubkey, RegisterNodeReq};
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;
use tokio_stream::wrappers::ReceiverStream;
// use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
@ -16,17 +16,17 @@ use crate::{ADMIN_PUBKEY, NODE_PUBKEY};
pub struct ConnectionData {
pub brain_url: String,
pub brain_msg_tx: Sender<BrainMessage>,
pub daemon_msg_rx: Receiver<DaemonMessage>,
pub daemon_msg_tx: Sender<DaemonMessage>,
pub brain_msg_tx: Sender<BrainMessageApp>,
pub daemon_msg_rx: Receiver<DaemonMessageApp>,
pub daemon_msg_tx: Sender<DaemonMessageApp>,
}
pub async fn register_node(config: &crate::Config) -> Result<Vec<ContainerContracts>> {
let mut client = BrainSgxDaemonClient::connect(config.brain_url.clone()).await?;
pub async fn register_node(config: &crate::Config) -> Result<Vec<AppContract>> {
let mut client = BrainAppDaemonClient::connect(config.brain_url.clone()).await?;
log::debug!("registering node with brain");
let req = RegisterNodeReq {
let req = RegisterAppNodeReq {
node_pubkey: NODE_PUBKEY.to_string(),
owner_pubkey: ADMIN_PUBKEY.to_string(),
main_ip: IP_INFO.ip.clone(),
@ -57,16 +57,16 @@ pub async fn register_node(config: &crate::Config) -> Result<Vec<ContainerContra
}
pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> {
let client = BrainSgxDaemonClient::connect(conn_data.brain_url).await?;
let client = BrainAppDaemonClient::connect(conn_data.brain_url).await?;
let mut streaming_tasks = JoinSet::new();
streaming_tasks.spawn(receive_messages(client.clone(), conn_data.brain_msg_tx));
streaming_tasks.spawn(send_messages(
client.clone(),
conn_data.daemon_msg_rx,
conn_data.daemon_msg_tx,
));
// streaming_tasks.spawn(send_messages(
// client.clone(),
// conn_data.daemon_msg_rx,
// conn_data.daemon_msg_tx,
// ));
let task_output = streaming_tasks.join_next().await;
println!("exiting: {task_output:?}");
@ -74,8 +74,8 @@ pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> {
}
pub async fn receive_messages(
mut client: BrainSgxDaemonClient<Channel>,
tx: Sender<BrainMessage>,
mut client: BrainAppDaemonClient<Channel>,
tx: Sender<BrainMessageApp>,
) -> Result<()> {
let pubkey = NODE_PUBKEY.to_string();
@ -97,6 +97,7 @@ pub async fn receive_messages(
Ok(())
}
/*
pub async fn send_messages(
mut client: BrainSgxDaemonClient<Channel>,
rx: Receiver<DaemonMessage>,
@ -113,3 +114,5 @@ pub async fn send_messages(
log::debug!("daemon_messages is about to exit");
Ok(())
}
*/

@ -6,15 +6,15 @@ pub mod utils;
use std::time::Duration;
pub use data::DaemonState;
use detee_shared::pb::daemon::brain_message;
use detee_shared::pb::daemon::daemon_message;
use detee_shared::pb::daemon::BrainMessage;
use detee_shared::pb::daemon::DaemonMessage;
use detee_shared::pb::daemon::NewContainerRes;
use detee_shared::pb::shared::ContainerContracts;
use detee_shared::pb::brain::brain_message_app;
use detee_shared::pb::brain::daemon_message_app;
use detee_shared::pb::brain::AppContract;
use detee_shared::pb::brain::BrainMessageApp;
use detee_shared::pb::brain::DaemonMessageApp;
use detee_shared::pb::brain::MappedPort;
use detee_shared::pb::brain::NewAppRes;
use detee_shared::pb::shared::MappedPort;
use detee_shared::types::shared::Container as ContainerConfig;
use detee_shared::types::brain::AppDeployConfig;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
@ -44,14 +44,14 @@ impl Default for Config {
#[derive(Debug)]
pub struct ContainerHandler {
pub receiver: Receiver<BrainMessage>,
pub sender: Sender<DaemonMessage>,
pub receiver: Receiver<BrainMessageApp>,
pub sender: Sender<DaemonMessageApp>,
pub config: Config,
pub data: DaemonState,
}
impl ContainerHandler {
pub fn new(receiver: Receiver<BrainMessage>, sender: Sender<DaemonMessage>) -> Self {
pub fn new(receiver: Receiver<BrainMessageApp>, sender: Sender<DaemonMessageApp>) -> Self {
Self {
receiver,
sender,
@ -60,17 +60,17 @@ impl ContainerHandler {
}
}
fn handle_contracts(&mut self, contracts: Vec<ContainerContracts>) {
fn handle_contracts(&mut self, contracts: Vec<AppContract>) {
dbg!(&contracts);
}
async fn run(mut self) {
while let Some(brain_msg) = self.receiver.recv().await {
match brain_msg.msg {
Some(brain_message::Msg::NewContainerReq(msg)) => {
Some(brain_message_app::Msg::NewAppReq(msg)) => {
self.handle_new_container_req(msg.into()).await;
}
/*
Some(brain_message::Msg::DeleteContainer(msg)) => {
let container_id = msg.uuid.unwrap_or_default();
self.handle_del_container_req(container_id).await;
@ -79,15 +79,20 @@ impl ContainerHandler {
Some(brain_message::Msg::ListContainer(msg)) => {
dbg!(&msg);
}
*/
None => {
log::error!("Brain disconnected");
break;
}
_ => {
todo!("wip")
}
}
}
}
async fn handle_new_container_req(&mut self, new_container_req: ContainerConfig) {
async fn handle_new_container_req(&mut self, new_container_req: AppDeployConfig) {
let container_uuid = new_container_req.uuid.clone();
let unarchive_dir = match handle_package(
@ -98,8 +103,8 @@ impl ContainerHandler {
{
Ok(unarchive_dir) => unarchive_dir,
Err(e) => {
let res = DaemonMessage {
msg: Some(daemon_message::Msg::NewContainerResp(NewContainerRes {
let res = DaemonMessageApp {
msg: Some(daemon_message_app::Msg::NewAppResp(NewAppRes {
uuid: new_container_req.uuid,
status: "failed".to_string(),
error: e.to_string(),
@ -120,8 +125,8 @@ impl ContainerHandler {
{
Ok(mapped_ports) => mapped_ports.into_iter().map(MappedPort::from).collect(),
Err(e) => {
let res = DaemonMessage {
msg: Some(daemon_message::Msg::NewContainerResp(NewContainerRes {
let res = DaemonMessageApp {
msg: Some(daemon_message_app::Msg::NewAppResp(NewAppRes {
uuid: container_uuid,
status: "failed".to_string(),
error: e.to_string(),
@ -135,8 +140,8 @@ impl ContainerHandler {
}
};
let res = DaemonMessage {
msg: Some(daemon_message::Msg::NewContainerResp(NewContainerRes {
let res = DaemonMessageApp {
msg: Some(daemon_message_app::Msg::NewAppResp(NewAppRes {
uuid: container_uuid,
status: "Success".to_string(),
error: "".to_string(),