Updated proto, changes in app resource change disk unit to GB and vcpu to vcpus refactor contract handling while registering logging brain url and san
176 lines
5.4 KiB
Rust
176 lines
5.4 KiB
Rust
use anyhow::Result;
|
|
use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient;
|
|
use detee_shared::app_proto::{
|
|
BrainMessageApp, DaemonAuth, DaemonMessageApp, DelAppReq, RegisterAppNodeReq,
|
|
};
|
|
use tokio::sync::mpsc::Receiver;
|
|
use tokio::sync::mpsc::Sender;
|
|
use tokio::task::JoinSet;
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
use tokio_stream::StreamExt;
|
|
use tonic::metadata::AsciiMetadataValue;
|
|
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
|
|
use tonic::Request;
|
|
|
|
use crate::global::{BRAIN_STAGING, BRAIN_TESTING, DETEE_ROOT_CA, IP_INFO, PUBLIC_KEY};
|
|
|
|
pub struct ConnectionData {
|
|
pub network: String,
|
|
pub brain_msg_tx: Sender<BrainMessageApp>,
|
|
pub daemon_msg_rx: Receiver<DaemonMessageApp>,
|
|
pub daemon_msg_tx: Sender<DaemonMessageApp>,
|
|
pub del_apps_uuid: Vec<String>,
|
|
}
|
|
|
|
async fn client(network: &str) -> Result<BrainAppDaemonClient<Channel>> {
|
|
let (brain_url, brain_san) = match network {
|
|
"staging" => BRAIN_STAGING,
|
|
"testnet" => BRAIN_TESTING,
|
|
_ => {
|
|
return Err(anyhow::anyhow!(
|
|
"The only networks currently supported are staging and testnet."
|
|
))
|
|
}
|
|
};
|
|
log::info!("brain_url: {brain_url}, brain_san: {brain_san}");
|
|
let pem = std::fs::read_to_string(DETEE_ROOT_CA)?;
|
|
let ca = Certificate::from_pem(pem);
|
|
|
|
let tls = ClientTlsConfig::new()
|
|
.ca_certificate(ca)
|
|
.domain_name(brain_san);
|
|
|
|
let channel = Channel::from_shared(brain_url.to_string())?
|
|
.tls_config(tls)?
|
|
.connect()
|
|
.await?;
|
|
|
|
Ok(BrainAppDaemonClient::new(channel))
|
|
}
|
|
|
|
pub async fn register_node(config: &crate::HostConfig) -> Result<Vec<DelAppReq>> {
|
|
let mut client = client(&config.network).await?;
|
|
|
|
log::debug!("registering node with brain");
|
|
|
|
let req = RegisterAppNodeReq {
|
|
node_pubkey: PUBLIC_KEY.to_string(),
|
|
operator_wallet: config.operator_wallet.clone(),
|
|
main_ip: config.host_ip_address.clone(),
|
|
city: IP_INFO.city.clone(),
|
|
region: IP_INFO.region.clone(),
|
|
country: IP_INFO.country.clone(),
|
|
price: config.price,
|
|
};
|
|
|
|
let pubkey = PUBLIC_KEY.clone();
|
|
let timestamp = chrono::Utc::now().to_rfc3339();
|
|
let signature = crate::global::sign_message(&format!("{timestamp}{req:?}"))?;
|
|
let timestamp: AsciiMetadataValue = timestamp.parse()?;
|
|
let pubkey: AsciiMetadataValue = pubkey.parse()?;
|
|
let signature: AsciiMetadataValue = signature.parse()?;
|
|
let mut req = Request::new(req);
|
|
req.metadata_mut().insert("timestamp", timestamp);
|
|
req.metadata_mut().insert("pubkey", pubkey);
|
|
req.metadata_mut().insert("request-signature", signature);
|
|
|
|
let mut del_app_reqs = vec![];
|
|
|
|
let mut grpc_stream = client.register_app_node(req).await?.into_inner();
|
|
while let Some(stream_update) = grpc_stream.next().await {
|
|
match stream_update {
|
|
Ok(del_app_req) => {
|
|
del_app_reqs.push(del_app_req);
|
|
}
|
|
Err(e) => {
|
|
println!("Brain disconnected from register_node: {e}");
|
|
}
|
|
}
|
|
}
|
|
log::info!(
|
|
"Brain registration succcessful, with contract count: {}",
|
|
del_app_reqs.len()
|
|
);
|
|
|
|
Ok(del_app_reqs)
|
|
}
|
|
|
|
pub async fn connect_and_run(conn_data: ConnectionData) -> Result<()> {
|
|
let client = client(&conn_data.network).await?;
|
|
|
|
let mut streaming_tasks = JoinSet::new();
|
|
|
|
streaming_tasks.spawn(receive_messages(
|
|
client.clone(),
|
|
conn_data.del_apps_uuid.clone(),
|
|
conn_data.brain_msg_tx,
|
|
));
|
|
streaming_tasks.spawn(send_messages(
|
|
client.clone(),
|
|
conn_data.del_apps_uuid.clone(),
|
|
conn_data.daemon_msg_rx,
|
|
conn_data.daemon_msg_tx,
|
|
));
|
|
|
|
let task_output = streaming_tasks.join_next().await;
|
|
println!("exiting: {task_output:?}");
|
|
Ok(())
|
|
}
|
|
|
|
fn sign_stream_auth(contracts: Vec<String>) -> Result<DaemonAuth> {
|
|
let pubkey = PUBLIC_KEY.clone();
|
|
let timestamp = chrono::Utc::now().to_rfc3339();
|
|
let signature =
|
|
crate::global::sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?;
|
|
Ok(DaemonAuth {
|
|
timestamp,
|
|
pubkey,
|
|
contracts,
|
|
signature,
|
|
})
|
|
}
|
|
|
|
pub async fn receive_messages(
|
|
mut client: BrainAppDaemonClient<Channel>,
|
|
del_apps_uuid: Vec<String>,
|
|
tx: Sender<BrainMessageApp>,
|
|
) -> Result<()> {
|
|
log::debug!("starting to listen for messages from brain");
|
|
let mut grpc_stream = client
|
|
.brain_messages(sign_stream_auth(del_apps_uuid)?)
|
|
.await?
|
|
.into_inner();
|
|
|
|
while let Some(stream_update) = grpc_stream.next().await {
|
|
match stream_update {
|
|
Ok(msg) => {
|
|
log::info!("Received message from brain: {msg:?}");
|
|
tx.send(msg).await?;
|
|
}
|
|
Err(e) => {
|
|
println!("Brain disconnected from brain_messaages: {e}");
|
|
}
|
|
}
|
|
}
|
|
println!("brain_messages is about to exit");
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn send_messages(
|
|
mut client: BrainAppDaemonClient<Channel>,
|
|
del_apps_uuid: Vec<String>,
|
|
rx: Receiver<DaemonMessageApp>,
|
|
tx: Sender<DaemonMessageApp>,
|
|
) -> Result<()> {
|
|
let rx_stream = ReceiverStream::new(rx);
|
|
tx.send(DaemonMessageApp {
|
|
msg: Some(detee_shared::app_proto::daemon_message_app::Msg::Auth(
|
|
sign_stream_auth(del_apps_uuid)?,
|
|
)),
|
|
})
|
|
.await?;
|
|
client.daemon_messages(rx_stream).await?;
|
|
log::debug!("daemon_messages is about to exit");
|
|
Ok(())
|
|
}
|