use crate::global::*; use crate::snp_proto::VmDaemonMessage; use anyhow::Result; use log::{debug, info, warn}; use snp_proto::{brain_vm_daemon_client::BrainVmDaemonClient, BrainVmMessage, VmContract, RegisterVmNodeReq}; use tokio::{ sync::mpsc::{Receiver, Sender}, task::JoinSet, }; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::transport::Channel; pub mod snp_proto { tonic::include_proto!("vm_proto"); } impl From for snp_proto::VmDaemonMessage { fn from(value: snp_proto::NewVmResp) -> Self { snp_proto::VmDaemonMessage { msg: Some(snp_proto::vm_daemon_message::Msg::NewVmResp(value)) } } } impl From for snp_proto::VmDaemonMessage { fn from(value: snp_proto::UpdateVmResp) -> Self { snp_proto::VmDaemonMessage { msg: Some(snp_proto::vm_daemon_message::Msg::UpdateVmResp(value)) } } } impl From for snp_proto::VmDaemonMessage { fn from(value: snp_proto::VmNodeResources) -> Self { snp_proto::VmDaemonMessage { msg: Some(snp_proto::vm_daemon_message::Msg::VmNodeResources(value)) } } } pub async fn register_node(config: &crate::config::Config) -> Result> { use tonic::metadata::AsciiMetadataValue; use tonic::Request; let mut client = BrainVmDaemonClient::connect(config.brain_url.clone()).await?; debug!("Starting node registration..."); let ip_info = IP_INFO.clone(); let req = RegisterVmNodeReq { node_pubkey: PUBLIC_KEY.clone(), operator_wallet: config.owner_wallet.clone(), main_ip: ip_info.ip, country: ip_info.country, region: ip_info.region, city: ip_info.city, price: config.price, }; let pubkey = PUBLIC_KEY.clone(); let timestamp = chrono::Utc::now().to_rfc3339(); let signature = crate::global::sign_message(&format!("{timestamp}{req:?}"))?; let timestamp: AsciiMetadataValue = timestamp.parse()?; let pubkey: AsciiMetadataValue = pubkey.parse()?; let signature: AsciiMetadataValue = signature.parse()?; let mut req = Request::new(req); req.metadata_mut().insert("timestamp", timestamp); req.metadata_mut().insert("pubkey", pubkey); req.metadata_mut().insert("request-signature", signature); let mut contracts = Vec::new(); let mut grpc_stream = client.register_vm_node(req).await?.into_inner(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(node) => { debug!("Received contract from brain: {node:?}"); contracts.push(node); } Err(e) => { warn!("Received error instead of contracts: {e:?}"); } } } info!("Brain terminated list_contracts stream."); Ok(contracts) } fn sign_stream_auth(contracts: Vec) -> Result { let pubkey = PUBLIC_KEY.clone(); let timestamp = chrono::Utc::now().to_rfc3339(); let signature = crate::global::sign_message(&(timestamp.to_string() + &format!("{contracts:?}")))?; Ok(snp_proto::DaemonStreamAuth { timestamp, pubkey, contracts, signature }) } async fn receive_messages( mut client: BrainVmDaemonClient, contracts: Vec, tx: Sender, ) -> Result<()> { debug!("starting to listen for messages from brain"); let mut grpc_stream = client.brain_messages(sign_stream_auth(contracts)?).await?.into_inner(); while let Some(stream_update) = grpc_stream.next().await { match stream_update { Ok(msg) => { info!("Received message from brain: {msg:?}"); let _ = tx.send(msg).await; } Err(e) => { warn!("Brain disconnected from listen_for_new_vm_reqs: {e}"); } } } debug!("listen_for_new_vm_reqs is about to exit"); Ok(()) } async fn send_messages( mut client: BrainVmDaemonClient, contracts: Vec, rx: Receiver, tx: Sender, ) -> Result<()> { debug!("starting daemon message stream to brain"); let rx_stream = ReceiverStream::new(rx); tx.send(VmDaemonMessage { msg: Some(snp_proto::vm_daemon_message::Msg::Auth(sign_stream_auth(contracts)?)), }) .await?; client.daemon_messages(rx_stream).await?; debug!("send_newvm_resp is about to exit"); Ok(()) } pub struct ConnectionData { pub contracts: Vec, pub brain_url: String, pub brain_msg_tx: Sender, pub daemon_msg_rx: Receiver, pub daemon_msg_tx: Sender, } pub async fn connect_and_run(cd: ConnectionData) -> Result<()> { let client = BrainVmDaemonClient::connect(cd.brain_url).await?; let mut streaming_tasks = JoinSet::new(); streaming_tasks.spawn(receive_messages(client.clone(), cd.contracts.clone(), cd.brain_msg_tx)); streaming_tasks.spawn(send_messages( client.clone(), cd.contracts, cd.daemon_msg_rx, cd.daemon_msg_tx, )); let task_output = streaming_tasks.join_next().await; warn!("One stream exited: {task_output:?}"); Ok(()) }