refactor actor model

implementing actor model inspire from snp daemon
migrated from grpc server model to grpc client model for
actor and brain integration
basic channel communication for actors and grpc
deleted unused proto file
This commit is contained in:
Noor 2025-01-29 18:08:59 +05:30
parent fd297488e2
commit 169bff5803
Signed by: noormohammedb
GPG Key ID: D83EFB8B3B967146
5 changed files with 183 additions and 160 deletions

4
Cargo.lock generated

@ -303,12 +303,14 @@ dependencies = [
"detee-shared",
"env_logger",
"flate2",
"log",
"prost",
"prost-types",
"rand",
"reqwest",
"tar",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
]
@ -316,7 +318,7 @@ dependencies = [
[[package]]
name = "detee-shared"
version = "0.1.0"
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#78c84299947e887fe8d8c737656318f409c7f0b4"
source = "git+ssh://git@gitea.detee.cloud/noormohammedb/detee-shared#3e783b11bab6894b6f98bd3c6ce44e8bf5b1f78b"
dependencies = [
"base64",
"prost",

@ -9,15 +9,17 @@ prost = "0.13.4"
prost-types = "0.13.4"
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread", "fs"] }
tonic = "0.12.3"
detee-shared = { git = "ssh://git@gitea.detee.cloud/noormohammedb/detee-shared" }
# detee-shared = { path = "../detee-shared" }
chrono = "0.4.39"
reqwest = "0.12.12"
flate2 = "1.0.35"
tar = "0.4.43"
anyhow = "1.0.95"
rand = "0.8.5"
tokio-stream = "0.1.17"
detee-shared = { git = "ssh://git@gitea.detee.cloud/noormohammedb/detee-shared" }
# detee-shared = { path = "../detee-shared" }
log = "0.4.25"
[build-dependencies]
tonic-build = "0.12.3"

@ -1,19 +0,0 @@
syntax = "proto3";
package deamon;
message Empty {
}
message NewContainerReq {
repeated string port = 1;
}
message NewContainerRes {
string status = 1;
}
service DaemonService {
rpc CreateContainer (NewContainerReq) returns (NewContainerRes);
// rpc ListContainer (NodeFilters) returns (stream NodeListResp);
}

@ -1,141 +1,102 @@
use anyhow::Result;
use detee_shared::pb::daemon::brain_sgx_daemon_client::BrainSgxDaemonClient;
use detee_shared::pb::daemon::daemon_message::Msg;
use detee_shared::pb::daemon::{BrainMessage, DaemonMessage};
use detee_shared::pb::shared::ContainerContracts;
use detee_shared::pb::shared::{Pubkey, RegisterNodeReq};
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use std::sync::Arc;
use std::{net::SocketAddr, str::FromStr};
use tokio::sync::RwLock;
use tonic::transport::Server;
use detee_shared::pb::daemon::daemon_service_server::{
DaemonService as DaemonServicePB, DaemonServiceServer as DaemonServiceServerPB,
};
use detee_shared::pb::daemon::{
ContainerFilters, ContainerInspectResp, ContainerListResp, DeleteContainerRes, NewContainerRes,
};
use detee_shared::pb::shared::Container as ContainerPB;
use detee_shared::types::shared::Container as ContainerConfig;
use crate::utils::handle_package;
use crate::DaemonState;
#[derive(Debug, Clone)]
pub struct DaemonServer {
pub data: Arc<RwLock<DaemonState>>,
pub struct ConnectionData {
pub brain_url: String,
pub brain_msg_tx: Sender<BrainMessage>,
pub daemon_msg_rx: Receiver<DaemonMessage>,
pub daemon_msg_tx: Sender<DaemonMessage>,
}
impl DaemonServer {
pub fn new(data: Arc<RwLock<DaemonState>>) -> Self {
Self { data }
pub async fn register_node(config: &crate::Config) -> Result<Vec<ContainerContracts>> {
let mut client = BrainSgxDaemonClient::connect(config.brain_url.clone()).await?;
log::debug!("registering node with brain");
let req = RegisterNodeReq {
..Default::default()
};
let mut container_contracts = vec![];
let mut grpc_stream = client.register_node(req).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(contract) => {
container_contracts.push(contract);
}
Err(e) => {
println!("Brain disconnected from register_node: {e}");
}
}
}
log::info!(
"Brain registration succcessful, with contract count: {}",
container_contracts.len()
);
Ok(container_contracts)
}
pub async fn start(&self) -> Result<()> {
let port: String = std::env::var("PORT").unwrap_or_else(|_| "33400".to_string());
pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> {
let client = BrainSgxDaemonClient::connect(conn_data.brain_url).await?;
let addr = SocketAddr::from_str(format!("0.0.0.0:{port}").as_str())?;
let mut streaming_tasks = JoinSet::new();
let daemon_server = DaemonServiceServerPB::new(DaemonServer::new(self.data.clone()));
streaming_tasks.spawn(receive_messages(client.clone(), conn_data.brain_msg_tx));
println!("Listening on {}", addr);
Server::builder()
.add_service(daemon_server)
.serve(addr)
.await?;
let task_output = streaming_tasks.join_next().await;
println!("exiting: {task_output:?}");
Ok(())
}
pub async fn receive_messages(
mut client: BrainSgxDaemonClient<Channel>,
tx: Sender<BrainMessage>,
) -> Result<()> {
let pubkey = "node_pubkey".to_owned();
log::debug!("starting to listen for messages from brain");
let mut grpc_stream = client.brain_messages(Pubkey { pubkey }).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(msg) => {
log::info!("Received message from brain: {msg:?}");
let _ = tx.send(msg).await?;
}
Err(e) => {
println!("Brain disconnected from brain_messaages: {e}");
}
}
}
println!("brain_messages is about to exit");
Ok(())
}
#[tonic::async_trait]
impl DaemonServicePB for DaemonServer {
async fn create_container(
&self,
request: tonic::Request<ContainerPB>,
) -> Result<tonic::Response<NewContainerRes>, tonic::Status> {
let req_data = request.into_inner();
pub async fn send_messages(
mut client: BrainSgxDaemonClient<Channel>,
rx: Receiver<DaemonMessage>,
tx: Sender<DaemonMessage>,
) -> Result<()> {
let pubkey = "node_pubkey".to_owned();
if req_data.package_url.is_none() || req_data.resource.is_none() {
return Err(tonic::Status::data_loss("missing some data in request"));
}
let unarchive_dir = handle_package(req_data.package_url.clone().unwrap_or_default())
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let req_container: ContainerConfig = req_data.into();
let container_uuid = req_container.uuid.clone().unwrap_or_default().uuid;
let mapped_ports = self
.data
.write()
.await
.create_new_container(req_container, unarchive_dir)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let mapped_ports = mapped_ports
.into_iter()
.map(|(host, container)| detee_shared::pb::shared::MappedPort {
host_port: host.into(),
container_port: container.into(),
let rx_stream = ReceiverStream::new(rx);
tx.send(DaemonMessage {
msg: Some(Msg::Pubkey(Pubkey { pubkey })),
})
.collect();
return Ok(tonic::Response::new(NewContainerRes {
container_id: Some(detee_shared::pb::shared::Uuid {
uuid: container_uuid,
}),
status: "success".to_string(),
ip_address: "".to_string(),
mapped_ports,
}));
}
async fn delete_container(
&self,
req: tonic::Request<ContainerFilters>,
) -> Result<tonic::Response<DeleteContainerRes>, tonic::Status> {
let req_data = req.into_inner();
if req_data.container_id.is_none() {
return Err(tonic::Status::data_loss("missing container id"));
}
self.data
.write()
.await
.delete_container(
req_data.admin_pubkey,
req_data.container_id.unwrap_or_default().uuid,
)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
return Ok(tonic::Response::new(DeleteContainerRes {
..Default::default()
}));
}
async fn inspect_container(
&self,
req: tonic::Request<detee_shared::pb::shared::Uuid>,
) -> Result<tonic::Response<ContainerInspectResp>, tonic::Status> {
dbg!(req);
return Ok(tonic::Response::new(ContainerInspectResp {
..Default::default()
}));
}
// async fn container_log(
// &self,
// req: tonic::Request<detee_shared::pb::shared::Uuid>,
// ) -> Result<tonic::Response<Self::ContainerLogStream>, tonic::Status> {
// todo!()
// }
async fn list_containers(
&self,
req: tonic::Request<ContainerFilters>,
) -> Result<tonic::Response<ContainerListResp>, tonic::Status> {
dbg!(req);
return Ok(tonic::Response::new(ContainerListResp {
..Default::default()
}));
}
.await?;
client.daemon_messages(rx_stream).await?;
log::debug!("send_newvm_resp is about to exit");
Ok(())
}

@ -3,18 +3,95 @@ pub mod data;
pub mod grpc;
pub mod utils;
pub use data::DaemonState;
use std::sync::Arc;
use std::time::Duration;
use grpc::DaemonServer;
use tokio::sync::RwLock;
pub use data::DaemonState;
use detee_shared::pb::daemon::BrainMessage;
use detee_shared::pb::daemon::DaemonMessage;
use detee_shared::pb::shared::ContainerContracts;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::time::sleep;
pub struct Config {
pub brain_url: String,
}
impl Default for Config {
fn default() -> Self {
let brain_url =
std::env::var("BRAIN_URL").unwrap_or_else(|_| "http://127.0.0.1:31337".to_string());
Self { brain_url }
}
}
pub struct ContainerHandler {
receiver: Receiver<BrainMessage>,
sender: Sender<DaemonMessage>,
config: Config,
// res: state::Resources,
}
impl ContainerHandler {
pub fn new(receiver: Receiver<BrainMessage>, sender: Sender<DaemonMessage>) -> Self {
Self {
receiver,
sender,
config: Config::default(),
}
}
fn handle_contracts(&mut self, contracts: Vec<ContainerContracts>) -> () {
dbg!(&contracts);
}
async fn run(mut self) -> () {
while let Some(brain_msg) = self.receiver.recv().await {
match brain_msg.msg {
Some(msg) => {
dbg!(&msg);
}
None => {
log::error!("Brain disconnected");
break;
}
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Detee daemon");
DaemonServer::new(Arc::new(RwLock::new(DaemonState::default())))
.start()
.await?;
Ok(())
loop {
let (brain_msg_tx, brain_msg_rx) = tokio::sync::mpsc::channel(6);
let (daemon_msg_tx, daemon_msg_rx) = tokio::sync::mpsc::channel(6);
let mut container_handler = ContainerHandler::new(brain_msg_rx, daemon_msg_tx.clone());
let brain_url = container_handler.config.brain_url.clone();
match grpc::register_node(&container_handler.config).await {
Ok(container_contracts) => container_handler.handle_contracts(container_contracts),
Err(e) => log::error!("Failed to connect to brain: {e}"),
}
tokio::spawn(async move {
container_handler.run().await;
});
log::info!("Connecting to brain...");
if let Err(e) = grpc::connect_and_run(grpc::ConnectionData {
brain_url,
brain_msg_tx,
daemon_msg_rx,
daemon_msg_tx,
})
.await
{
log::error!("The connection broke: {e}");
}
sleep(Duration::from_secs(3)).await;
}
}