155 lines
5.2 KiB
Rust
155 lines
5.2 KiB
Rust
// SPDX-License-Identifier: Apache-2.0
|
|
// SPDX-License-Identifier: Unlicense
|
|
|
|
use crate::{global::*, snp_proto::VmDaemonMessage};
|
|
use anyhow::Result;
|
|
use detee_shared::vm_proto::DeleteVmReq;
|
|
use log::{debug, info, warn};
|
|
use snp_proto::{brain_vm_daemon_client::BrainVmDaemonClient, BrainVmMessage, RegisterVmNodeReq};
|
|
use tokio::{
|
|
sync::mpsc::{Receiver, Sender},
|
|
task::JoinSet,
|
|
};
|
|
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
|
|
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
|
|
|
|
pub mod snp_proto {
|
|
pub use detee_shared::vm_proto::*;
|
|
}
|
|
|
|
async fn client(network: &str) -> Result<BrainVmDaemonClient<Channel>> {
|
|
let (brain_url, brain_san) = match network {
|
|
"staging" => brain_staging(),
|
|
"testnet" => brain_testing(),
|
|
_ => {
|
|
return Err(anyhow::anyhow!(
|
|
"The only networks currently supported are staging and testnet."
|
|
))
|
|
}
|
|
};
|
|
info!("brain_url: {brain_url}, brain_san: {brain_san}");
|
|
|
|
let pem = std::fs::read_to_string(DETEE_ROOT_CA)?;
|
|
let ca = Certificate::from_pem(pem);
|
|
|
|
let tls = ClientTlsConfig::new().ca_certificate(ca).domain_name(brain_san);
|
|
|
|
let channel = Channel::from_shared(brain_url.to_string())?.tls_config(tls)?.connect().await?;
|
|
|
|
Ok(BrainVmDaemonClient::new(channel))
|
|
}
|
|
|
|
pub async fn register_node(config: &crate::config::Config) -> Result<Vec<DeleteVmReq>> {
|
|
use tonic::{metadata::AsciiMetadataValue, Request};
|
|
let mut client = client(&config.network).await?;
|
|
debug!("Starting node registration...");
|
|
let ip_info = IP_INFO.clone();
|
|
let req = RegisterVmNodeReq {
|
|
node_pubkey: PUBLIC_KEY.clone(),
|
|
operator_wallet: config.owner_wallet.clone(),
|
|
main_ip: ip_info.ip,
|
|
country: ip_info.country,
|
|
region: ip_info.region,
|
|
city: ip_info.city,
|
|
};
|
|
|
|
let pubkey = PUBLIC_KEY.clone();
|
|
let timestamp = chrono::Utc::now().to_rfc3339();
|
|
let signature = crate::global::sign_message(&format!("{timestamp}{req:?}"))?;
|
|
let timestamp: AsciiMetadataValue = timestamp.parse()?;
|
|
let pubkey: AsciiMetadataValue = pubkey.parse()?;
|
|
let signature: AsciiMetadataValue = signature.parse()?;
|
|
let mut req = Request::new(req);
|
|
req.metadata_mut().insert("timestamp", timestamp);
|
|
req.metadata_mut().insert("pubkey", pubkey);
|
|
req.metadata_mut().insert("request-signature", signature);
|
|
|
|
let mut contracts = Vec::new();
|
|
let mut grpc_stream = client.register_vm_node(req).await?.into_inner();
|
|
while let Some(stream_update) = grpc_stream.next().await {
|
|
match stream_update {
|
|
Ok(node) => {
|
|
debug!("Received deleted VM from brain: {node:?}");
|
|
contracts.push(node);
|
|
}
|
|
Err(e) => {
|
|
warn!("Received error instead of contracts: {e:?}");
|
|
}
|
|
}
|
|
}
|
|
info!("Brain terminated list_contracts stream.");
|
|
Ok(contracts)
|
|
}
|
|
|
|
fn sign_stream_auth(contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> {
|
|
let pubkey = PUBLIC_KEY.clone();
|
|
let timestamp = chrono::Utc::now().to_rfc3339();
|
|
let signature =
|
|
crate::global::sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?;
|
|
Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature })
|
|
}
|
|
|
|
async fn receive_messages(
|
|
mut client: BrainVmDaemonClient<Channel>,
|
|
contracts: Vec<String>,
|
|
tx: Sender<BrainVmMessage>,
|
|
) -> Result<()> {
|
|
debug!("starting to listen for messages from brain");
|
|
let mut grpc_stream = client.brain_messages(sign_stream_auth(contracts)?).await?.into_inner();
|
|
while let Some(stream_update) = grpc_stream.next().await {
|
|
match stream_update {
|
|
Ok(msg) => {
|
|
info!("Received message from brain: {msg:?}");
|
|
let _ = tx.send(msg).await;
|
|
}
|
|
Err(e) => {
|
|
warn!("Brain disconnected from listen_for_new_vm_reqs: {e}");
|
|
}
|
|
}
|
|
}
|
|
debug!("listen_for_new_vm_reqs is about to exit");
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_messages(
|
|
mut client: BrainVmDaemonClient<Channel>,
|
|
contracts: Vec<String>,
|
|
rx: Receiver<VmDaemonMessage>,
|
|
tx: Sender<VmDaemonMessage>,
|
|
) -> Result<()> {
|
|
debug!("starting daemon message stream to brain");
|
|
let rx_stream = ReceiverStream::new(rx);
|
|
tx.send(VmDaemonMessage {
|
|
msg: Some(snp_proto::vm_daemon_message::Msg::Auth(sign_stream_auth(contracts)?)),
|
|
})
|
|
.await?;
|
|
client.daemon_messages(rx_stream).await?;
|
|
debug!("send_newvm_resp is about to exit");
|
|
Ok(())
|
|
}
|
|
|
|
pub struct ConnectionData {
|
|
pub contracts: Vec<String>,
|
|
pub network: String,
|
|
pub brain_msg_tx: Sender<BrainVmMessage>,
|
|
pub daemon_msg_rx: Receiver<VmDaemonMessage>,
|
|
pub daemon_msg_tx: Sender<VmDaemonMessage>,
|
|
}
|
|
|
|
pub async fn connect_and_run(cd: ConnectionData) -> Result<()> {
|
|
let client = client(&cd.network).await?;
|
|
let mut streaming_tasks = JoinSet::new();
|
|
|
|
streaming_tasks.spawn(receive_messages(client.clone(), cd.contracts.clone(), cd.brain_msg_tx));
|
|
streaming_tasks.spawn(send_messages(
|
|
client.clone(),
|
|
cd.contracts,
|
|
cd.daemon_msg_rx,
|
|
cd.daemon_msg_tx,
|
|
));
|
|
|
|
let task_output = streaming_tasks.join_next().await;
|
|
warn!("One stream exited: {task_output:?}");
|
|
Ok(())
|
|
}
|