use anyhow::Result; use detee_shared::pb::brain::brain_app_daemon_client::BrainAppDaemonClient; use detee_shared::pb::brain::{ AppContract, BrainMessageApp, DaemonAuth, DaemonMessageApp, RegisterAppNodeReq, }; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::task::JoinSet; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tonic::metadata::AsciiMetadataValue; use tonic::transport::Channel; use tonic::Request; use crate::global::IP_INFO; use crate::global::PUBLIC_KEY; pub struct ConnectionData { pub brain_url: String, pub brain_msg_tx: Sender, pub daemon_msg_rx: Receiver, pub daemon_msg_tx: Sender, pub app_contracts_uuid: Vec, } pub async fn register_node(config: &crate::HostConfig) -> Result> { let mut client = BrainAppDaemonClient::connect(config.brain_url.clone()).await?; log::debug!("registering node with brain"); let req = RegisterAppNodeReq { node_pubkey: PUBLIC_KEY.to_string(), owner_pubkey: config.owner_wallet.clone(), main_ip: IP_INFO.ip.clone(), city: IP_INFO.city.clone(), region: IP_INFO.region.clone(), country: IP_INFO.country.clone(), }; let pubkey = PUBLIC_KEY.clone(); let timestamp = chrono::Utc::now().to_rfc3339(); let signature = crate::global::sign_message(&format!("{timestamp}{req:?}"))?; let timestamp: AsciiMetadataValue = timestamp.parse()?; let pubkey: AsciiMetadataValue = pubkey.parse()?; let signature: AsciiMetadataValue = signature.parse()?; let mut req = Request::new(req); req.metadata_mut().insert("timestamp", timestamp); req.metadata_mut().insert("pubkey", pubkey); req.metadata_mut().insert("request-signature", signature); let mut container_contracts = vec![]; let mut grpc_stream = client.register_app_node(req).await?.into_inner(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(contract) => { container_contracts.push(contract); } Err(e) => { println!("Brain disconnected from register_node: {e}"); } } } log::info!( "Brain registration succcessful, with contract count: {}", container_contracts.len() ); Ok(container_contracts) } pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> { let client = BrainAppDaemonClient::connect(conn_data.brain_url).await?; let mut streaming_tasks = JoinSet::new(); streaming_tasks.spawn(receive_messages( client.clone(), conn_data.app_contracts_uuid.clone(), conn_data.brain_msg_tx, )); streaming_tasks.spawn(send_messages( client.clone(), conn_data.app_contracts_uuid.clone(), conn_data.daemon_msg_rx, conn_data.daemon_msg_tx, )); let task_output = streaming_tasks.join_next().await; println!("exiting: {task_output:?}"); Ok(()) } fn sign_stream_auth(contracts: Vec) -> Result { let pubkey = PUBLIC_KEY.clone(); let timestamp = chrono::Utc::now().to_rfc3339(); let signature = crate::global::sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; Ok(DaemonAuth { timestamp, pubkey, contracts, signature, }) } pub async fn receive_messages( mut client: BrainAppDaemonClient, contracts: Vec, tx: Sender, ) -> Result<()> { log::debug!("starting to listen for messages from brain"); let mut grpc_stream = client .brain_messages(sign_stream_auth(contracts)?) .await? .into_inner(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(msg) => { log::info!("Received message from brain: {msg:?}"); tx.send(msg).await?; } Err(e) => { println!("Brain disconnected from brain_messaages: {e}"); } } } println!("brain_messages is about to exit"); Ok(()) } pub async fn send_messages( mut client: BrainAppDaemonClient, contracts: Vec, rx: Receiver, tx: Sender, ) -> Result<()> { let rx_stream = ReceiverStream::new(rx); tx.send(DaemonMessageApp { msg: Some(detee_shared::pb::brain::daemon_message_app::Msg::Auth( sign_stream_auth(contracts)?, )), }) .await?; client.daemon_messages(rx_stream).await?; log::debug!("daemon_messages is about to exit"); Ok(()) }