sgx-daemon/src/grpc.rs
Noor 2b67cff7c1
Refactor register node
add IP information retrieval and update dependencies
2025-01-31 13:02:07 +00:00

116 lines
3.5 KiB
Rust

use anyhow::Result;
use detee_shared::pb::daemon::brain_sgx_daemon_client::BrainSgxDaemonClient;
use detee_shared::pb::daemon::daemon_message::Msg;
use detee_shared::pb::daemon::{BrainMessage, DaemonMessage};
use detee_shared::pb::shared::ContainerContracts;
use detee_shared::pb::shared::{Pubkey, RegisterNodeReq};
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use crate::utils::IP_INFO;
use crate::{ADMIN_PUBKEY, NODE_PUBKEY};
pub struct ConnectionData {
pub brain_url: String,
pub brain_msg_tx: Sender<BrainMessage>,
pub daemon_msg_rx: Receiver<DaemonMessage>,
pub daemon_msg_tx: Sender<DaemonMessage>,
}
pub async fn register_node(config: &crate::Config) -> Result<Vec<ContainerContracts>> {
let mut client = BrainSgxDaemonClient::connect(config.brain_url.clone()).await?;
log::debug!("registering node with brain");
let req = RegisterNodeReq {
node_pubkey: NODE_PUBKEY.to_string(),
owner_pubkey: ADMIN_PUBKEY.to_string(),
main_ip: IP_INFO.ip.clone(),
city: IP_INFO.city.clone(),
region: IP_INFO.region.clone(),
country: IP_INFO.country.clone(),
};
let mut container_contracts = vec![];
let mut grpc_stream = client.register_node(req).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(contract) => {
container_contracts.push(contract);
}
Err(e) => {
println!("Brain disconnected from register_node: {e}");
}
}
}
log::info!(
"Brain registration succcessful, with contract count: {}",
container_contracts.len()
);
Ok(container_contracts)
}
pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> {
let client = BrainSgxDaemonClient::connect(conn_data.brain_url).await?;
let mut streaming_tasks = JoinSet::new();
streaming_tasks.spawn(receive_messages(client.clone(), conn_data.brain_msg_tx));
streaming_tasks.spawn(send_messages(
client.clone(),
conn_data.daemon_msg_rx,
conn_data.daemon_msg_tx,
));
let task_output = streaming_tasks.join_next().await;
println!("exiting: {task_output:?}");
Ok(())
}
pub async fn receive_messages(
mut client: BrainSgxDaemonClient<Channel>,
tx: Sender<BrainMessage>,
) -> Result<()> {
let pubkey = NODE_PUBKEY.to_string();
log::debug!("starting to listen for messages from brain");
let mut grpc_stream = client.brain_messages(Pubkey { pubkey }).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(msg) => {
log::info!("Received message from brain: {msg:?}");
tx.send(msg).await?;
}
Err(e) => {
println!("Brain disconnected from brain_messaages: {e}");
}
}
}
println!("brain_messages is about to exit");
Ok(())
}
pub async fn send_messages(
mut client: BrainSgxDaemonClient<Channel>,
rx: Receiver<DaemonMessage>,
tx: Sender<DaemonMessage>,
) -> Result<()> {
let pubkey = NODE_PUBKEY.to_string();
let rx_stream = ReceiverStream::new(rx);
tx.send(DaemonMessage {
msg: Some(Msg::Pubkey(Pubkey { pubkey })),
})
.await?;
client.daemon_messages(rx_stream).await?;
log::debug!("daemon_messages is about to exit");
Ok(())
}