133 lines
4.5 KiB
Rust
133 lines
4.5 KiB
Rust
use crate::global::*;
|
|
use crate::snp_proto::VmDaemonMessage;
|
|
use anyhow::Result;
|
|
use log::{debug, info, warn};
|
|
use snp_proto::{brain_vm_daemon_client::BrainVmDaemonClient, BrainVmMessage, VmContract, RegisterVmNodeReq};
|
|
use tokio::{
|
|
sync::mpsc::{Receiver, Sender},
|
|
task::JoinSet,
|
|
};
|
|
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
|
|
use tonic::transport::Channel;
|
|
|
|
pub mod snp_proto {
|
|
// tonic::include_proto!("vm_proto");
|
|
pub use detee_shared::snp::pb::vm::*;
|
|
}
|
|
|
|
pub async fn register_node(config: &crate::config::Config) -> Result<Vec<VmContract>> {
|
|
use tonic::metadata::AsciiMetadataValue;
|
|
use tonic::Request;
|
|
let mut client = BrainVmDaemonClient::connect(config.brain_url.clone()).await?;
|
|
debug!("Starting node registration...");
|
|
let ip_info = IP_INFO.clone();
|
|
let req = RegisterVmNodeReq {
|
|
node_pubkey: PUBLIC_KEY.clone(),
|
|
operator_wallet: config.owner_wallet.clone(),
|
|
main_ip: ip_info.ip,
|
|
country: ip_info.country,
|
|
region: ip_info.region,
|
|
city: ip_info.city,
|
|
price: config.price,
|
|
};
|
|
|
|
let pubkey = PUBLIC_KEY.clone();
|
|
let timestamp = chrono::Utc::now().to_rfc3339();
|
|
let signature = crate::global::sign_message(&format!("{timestamp}{req:?}"))?;
|
|
let timestamp: AsciiMetadataValue = timestamp.parse()?;
|
|
let pubkey: AsciiMetadataValue = pubkey.parse()?;
|
|
let signature: AsciiMetadataValue = signature.parse()?;
|
|
let mut req = Request::new(req);
|
|
req.metadata_mut().insert("timestamp", timestamp);
|
|
req.metadata_mut().insert("pubkey", pubkey);
|
|
req.metadata_mut().insert("request-signature", signature);
|
|
|
|
let mut contracts = Vec::new();
|
|
let mut grpc_stream = client.register_vm_node(req).await?.into_inner();
|
|
while let Some(stream_update) = grpc_stream.next().await {
|
|
match stream_update {
|
|
Ok(node) => {
|
|
debug!("Received contract from brain: {node:?}");
|
|
contracts.push(node);
|
|
}
|
|
Err(e) => {
|
|
warn!("Received error instead of contracts: {e:?}");
|
|
}
|
|
}
|
|
}
|
|
info!("Brain terminated list_contracts stream.");
|
|
Ok(contracts)
|
|
}
|
|
|
|
fn sign_stream_auth(contracts: Vec<String>) -> Result<snp_proto::DaemonStreamAuth> {
|
|
let pubkey = PUBLIC_KEY.clone();
|
|
let timestamp = chrono::Utc::now().to_rfc3339();
|
|
let signature =
|
|
crate::global::sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?;
|
|
Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature })
|
|
}
|
|
|
|
async fn receive_messages(
|
|
mut client: BrainVmDaemonClient<Channel>,
|
|
contracts: Vec<String>,
|
|
tx: Sender<BrainVmMessage>,
|
|
) -> Result<()> {
|
|
debug!("starting to listen for messages from brain");
|
|
let mut grpc_stream = client.brain_messages(sign_stream_auth(contracts)?).await?.into_inner();
|
|
while let Some(stream_update) = grpc_stream.next().await {
|
|
match stream_update {
|
|
Ok(msg) => {
|
|
info!("Received message from brain: {msg:?}");
|
|
let _ = tx.send(msg).await;
|
|
}
|
|
Err(e) => {
|
|
warn!("Brain disconnected from listen_for_new_vm_reqs: {e}");
|
|
}
|
|
}
|
|
}
|
|
debug!("listen_for_new_vm_reqs is about to exit");
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_messages(
|
|
mut client: BrainVmDaemonClient<Channel>,
|
|
contracts: Vec<String>,
|
|
rx: Receiver<VmDaemonMessage>,
|
|
tx: Sender<VmDaemonMessage>,
|
|
) -> Result<()> {
|
|
debug!("starting daemon message stream to brain");
|
|
let rx_stream = ReceiverStream::new(rx);
|
|
tx.send(VmDaemonMessage {
|
|
msg: Some(snp_proto::vm_daemon_message::Msg::Auth(sign_stream_auth(contracts)?)),
|
|
})
|
|
.await?;
|
|
client.daemon_messages(rx_stream).await?;
|
|
debug!("send_newvm_resp is about to exit");
|
|
Ok(())
|
|
}
|
|
|
|
pub struct ConnectionData {
|
|
pub contracts: Vec<String>,
|
|
pub brain_url: String,
|
|
pub brain_msg_tx: Sender<BrainVmMessage>,
|
|
pub daemon_msg_rx: Receiver<VmDaemonMessage>,
|
|
pub daemon_msg_tx: Sender<VmDaemonMessage>,
|
|
}
|
|
|
|
pub async fn connect_and_run(cd: ConnectionData) -> Result<()> {
|
|
let client = BrainVmDaemonClient::connect(cd.brain_url).await?;
|
|
let mut streaming_tasks = JoinSet::new();
|
|
|
|
streaming_tasks.spawn(receive_messages(client.clone(), cd.contracts.clone(), cd.brain_msg_tx));
|
|
streaming_tasks.spawn(send_messages(
|
|
client.clone(),
|
|
cd.contracts,
|
|
cd.daemon_msg_rx,
|
|
cd.daemon_msg_tx,
|
|
));
|
|
|
|
let task_output = streaming_tasks.join_next().await;
|
|
warn!("One stream exited: {task_output:?}");
|
|
Ok(())
|
|
}
|