sgx-daemon/src/grpc.rs
Noor 1a0d53d5f9 Improves brain connection reliability
Updates the brain connection logic to randomly select from a list of available URLs for staging and testnet environments.
2025-06-18 11:40:23 +00:00

176 lines
5.4 KiB
Rust

use anyhow::Result;
use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient;
use detee_shared::app_proto::{
BrainMessageApp, DaemonAuth, DaemonMessageApp, DelAppReq, RegisterAppNodeReq,
};
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::metadata::AsciiMetadataValue;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::Request;
use crate::global::{BRAIN_STAGING, BRAIN_TESTING, DETEE_ROOT_CA, IP_INFO, PUBLIC_KEY};
pub struct ConnectionData {
pub network: String,
pub brain_msg_tx: Sender<BrainMessageApp>,
pub daemon_msg_rx: Receiver<DaemonMessageApp>,
pub daemon_msg_tx: Sender<DaemonMessageApp>,
pub del_apps_uuid: Vec<String>,
}
async fn client(network: &str) -> Result<BrainAppDaemonClient<Channel>> {
let (brain_url, brain_san) = match network {
"staging" => *BRAIN_STAGING,
"testnet" => *BRAIN_TESTING,
_ => {
return Err(anyhow::anyhow!(
"The only networks currently supported are staging and testnet."
))
}
};
log::info!("brain_url: {brain_url}, brain_san: {brain_san}");
let pem = std::fs::read_to_string(DETEE_ROOT_CA)?;
let ca = Certificate::from_pem(pem);
let tls = ClientTlsConfig::new()
.ca_certificate(ca)
.domain_name(brain_san);
let channel = Channel::from_shared(brain_url.to_string())?
.tls_config(tls)?
.connect()
.await?;
Ok(BrainAppDaemonClient::new(channel))
}
pub async fn register_node(config: &crate::HostConfig) -> Result<Vec<DelAppReq>> {
let mut client = client(&config.network).await?;
log::debug!("registering node with brain");
let req = RegisterAppNodeReq {
node_pubkey: PUBLIC_KEY.to_string(),
operator_wallet: config.operator_wallet.clone(),
main_ip: config.host_ip_address.clone(),
city: IP_INFO.city.clone(),
region: IP_INFO.region.clone(),
country: IP_INFO.country.clone(),
price: config.price,
};
let pubkey = PUBLIC_KEY.clone();
let timestamp = chrono::Utc::now().to_rfc3339();
let signature = crate::global::sign_message(&format!("{timestamp}{req:?}"))?;
let timestamp: AsciiMetadataValue = timestamp.parse()?;
let pubkey: AsciiMetadataValue = pubkey.parse()?;
let signature: AsciiMetadataValue = signature.parse()?;
let mut req = Request::new(req);
req.metadata_mut().insert("timestamp", timestamp);
req.metadata_mut().insert("pubkey", pubkey);
req.metadata_mut().insert("request-signature", signature);
let mut del_app_reqs = vec![];
let mut grpc_stream = client.register_app_node(req).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(del_app_req) => {
del_app_reqs.push(del_app_req);
}
Err(e) => {
println!("Brain disconnected from register_node: {e}");
}
}
}
log::info!(
"Brain registration succcessful, with contract count: {}",
del_app_reqs.len()
);
Ok(del_app_reqs)
}
pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> {
let client = client(&conn_data.network).await?;
let mut streaming_tasks = JoinSet::new();
streaming_tasks.spawn(receive_messages(
client.clone(),
conn_data.del_apps_uuid.clone(),
conn_data.brain_msg_tx,
));
streaming_tasks.spawn(send_messages(
client.clone(),
conn_data.del_apps_uuid.clone(),
conn_data.daemon_msg_rx,
conn_data.daemon_msg_tx,
));
let task_output = streaming_tasks.join_next().await;
println!("exiting: {task_output:?}");
Ok(())
}
fn sign_stream_auth(contracts: Vec<String>) -> Result<DaemonAuth> {
let pubkey = PUBLIC_KEY.clone();
let timestamp = chrono::Utc::now().to_rfc3339();
let signature =
crate::global::sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?;
Ok(DaemonAuth {
timestamp,
pubkey,
contracts,
signature,
})
}
pub async fn receive_messages(
mut client: BrainAppDaemonClient<Channel>,
del_apps_uuid: Vec<String>,
tx: Sender<BrainMessageApp>,
) -> Result<()> {
log::debug!("starting to listen for messages from brain");
let mut grpc_stream = client
.brain_messages(sign_stream_auth(del_apps_uuid)?)
.await?
.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(msg) => {
log::info!("Received message from brain: {msg:?}");
tx.send(msg).await?;
}
Err(e) => {
println!("Brain disconnected from brain_messaages: {e}");
}
}
}
println!("brain_messages is about to exit");
Ok(())
}
pub async fn send_messages(
mut client: BrainAppDaemonClient<Channel>,
del_apps_uuid: Vec<String>,
rx: Receiver<DaemonMessageApp>,
tx: Sender<DaemonMessageApp>,
) -> Result<()> {
let rx_stream = ReceiverStream::new(rx);
tx.send(DaemonMessageApp {
msg: Some(detee_shared::app_proto::daemon_message_app::Msg::Auth(
sign_stream_auth(del_apps_uuid)?,
)),
})
.await?;
client.daemon_messages(rx_stream).await?;
log::debug!("daemon_messages is about to exit");
Ok(())
}