handle requests from brain and send response accordingly with new grpc methods and channels implement daemon send message grpc method some WIP handler method to make it work setup application logging
108 lines
3.4 KiB
Rust
108 lines
3.4 KiB
Rust
use anyhow::Result;
|
|
use detee_shared::pb::daemon::brain_sgx_daemon_client::BrainSgxDaemonClient;
|
|
use detee_shared::pb::daemon::daemon_message::Msg;
|
|
use detee_shared::pb::daemon::{BrainMessage, DaemonMessage};
|
|
use detee_shared::pb::shared::ContainerContracts;
|
|
use detee_shared::pb::shared::{Pubkey, RegisterNodeReq};
|
|
use tokio::sync::mpsc::Receiver;
|
|
use tokio::sync::mpsc::Sender;
|
|
use tokio::task::JoinSet;
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
use tokio_stream::StreamExt;
|
|
use tonic::transport::Channel;
|
|
|
|
pub struct ConnectionData {
|
|
pub brain_url: String,
|
|
pub brain_msg_tx: Sender<BrainMessage>,
|
|
pub daemon_msg_rx: Receiver<DaemonMessage>,
|
|
pub daemon_msg_tx: Sender<DaemonMessage>,
|
|
}
|
|
|
|
pub async fn register_node(config: &crate::Config) -> Result<Vec<ContainerContracts>> {
|
|
let mut client = BrainSgxDaemonClient::connect(config.brain_url.clone()).await?;
|
|
|
|
log::debug!("registering node with brain");
|
|
|
|
let req = RegisterNodeReq {
|
|
..Default::default()
|
|
};
|
|
|
|
let mut container_contracts = vec![];
|
|
|
|
let mut grpc_stream = client.register_node(req).await?.into_inner();
|
|
while let Some(stream_update) = grpc_stream.next().await {
|
|
match stream_update {
|
|
Ok(contract) => {
|
|
container_contracts.push(contract);
|
|
}
|
|
Err(e) => {
|
|
println!("Brain disconnected from register_node: {e}");
|
|
}
|
|
}
|
|
}
|
|
log::info!(
|
|
"Brain registration succcessful, with contract count: {}",
|
|
container_contracts.len()
|
|
);
|
|
|
|
Ok(container_contracts)
|
|
}
|
|
|
|
pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> {
|
|
let client = BrainSgxDaemonClient::connect(conn_data.brain_url).await?;
|
|
|
|
let mut streaming_tasks = JoinSet::new();
|
|
|
|
streaming_tasks.spawn(receive_messages(client.clone(), conn_data.brain_msg_tx));
|
|
streaming_tasks.spawn(send_messages(
|
|
client.clone(),
|
|
conn_data.daemon_msg_rx,
|
|
conn_data.daemon_msg_tx,
|
|
));
|
|
|
|
let task_output = streaming_tasks.join_next().await;
|
|
println!("exiting: {task_output:?}");
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn receive_messages(
|
|
mut client: BrainSgxDaemonClient<Channel>,
|
|
tx: Sender<BrainMessage>,
|
|
) -> Result<()> {
|
|
let pubkey = "0xd0837609aedd53854651210327db90f5c2626188a00e940bbc9eea2c7e6838b7".to_owned();
|
|
|
|
log::debug!("starting to listen for messages from brain");
|
|
let mut grpc_stream = client.brain_messages(Pubkey { pubkey }).await?.into_inner();
|
|
|
|
while let Some(stream_update) = grpc_stream.next().await {
|
|
match stream_update {
|
|
Ok(msg) => {
|
|
log::info!("Received message from brain: {msg:?}");
|
|
tx.send(msg).await?;
|
|
}
|
|
Err(e) => {
|
|
println!("Brain disconnected from brain_messaages: {e}");
|
|
}
|
|
}
|
|
}
|
|
println!("brain_messages is about to exit");
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn send_messages(
|
|
mut client: BrainSgxDaemonClient<Channel>,
|
|
rx: Receiver<DaemonMessage>,
|
|
tx: Sender<DaemonMessage>,
|
|
) -> Result<()> {
|
|
let pubkey = "0xd0837609aedd53854651210327db90f5c2626188a00e940bbc9eea2c7e6838b7".to_owned();
|
|
|
|
let rx_stream = ReceiverStream::new(rx);
|
|
tx.send(DaemonMessage {
|
|
msg: Some(Msg::Pubkey(Pubkey { pubkey })),
|
|
})
|
|
.await?;
|
|
client.daemon_messages(rx_stream).await?;
|
|
log::debug!("daemon_messages is about to exit");
|
|
Ok(())
|
|
}
|