fixed app node filter query updated proto change mb to gb on app node resource tests for get one node for app and vm tests for node resource updates on vm and app fix test for app node filtering implemented ip based node filtering mocking random public ip for node local brain binary for test without tls refactor test grpc services to use grpc only stream
150 lines
4.8 KiB
Rust
150 lines
4.8 KiB
Rust
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
use super::test_utils::Key;
|
|
use crate::common::test_utils::{generate_random_public_ip, get_ip_info};
|
|
use anyhow::Result;
|
|
use detee_shared::app_proto::brain_app_daemon_client::BrainAppDaemonClient;
|
|
use detee_shared::app_proto::{self, NewAppRes, RegisterAppNodeReq};
|
|
use detee_shared::common_proto::MappedPort;
|
|
use futures::StreamExt;
|
|
use tokio::sync::mpsc;
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
use tonic::transport::Channel;
|
|
|
|
pub async fn mock_app_daemon(
|
|
brain_channel: &Channel,
|
|
daemon_error: Option<String>,
|
|
) -> Result<String> {
|
|
let mut daemon_client = BrainAppDaemonClient::new(brain_channel.clone());
|
|
let daemon_key = Key::new();
|
|
|
|
register_app_node(&mut daemon_client, &daemon_key, &Key::new().pubkey).await?;
|
|
|
|
let (tx, brain_msg_rx) = tokio::sync::mpsc::channel(1);
|
|
|
|
tokio::spawn(daemon_listener(daemon_client.clone(), daemon_key.clone(), tx));
|
|
|
|
let (daemon_msg_tx, rx) = tokio::sync::mpsc::channel(1);
|
|
tokio::spawn(daemon_msg_sender(
|
|
daemon_client.clone(),
|
|
daemon_key.clone(),
|
|
daemon_msg_tx.clone(),
|
|
rx,
|
|
));
|
|
|
|
tokio::spawn(daemon_engine(daemon_msg_tx.clone(), brain_msg_rx, daemon_error));
|
|
|
|
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
|
|
|
Ok(daemon_key.pubkey)
|
|
}
|
|
|
|
pub async fn register_app_node(
|
|
client: &mut BrainAppDaemonClient<Channel>,
|
|
key: &Key,
|
|
operator_wallet: &str,
|
|
) -> Result<Vec<app_proto::DelAppReq>> {
|
|
log::info!("Registering app_node: {}", key.pubkey);
|
|
let node_pubkey = key.pubkey.clone();
|
|
|
|
let ip = generate_random_public_ip().to_string();
|
|
let ip_info = get_ip_info(&ip).await?;
|
|
|
|
let req = RegisterAppNodeReq {
|
|
node_pubkey,
|
|
operator_wallet: operator_wallet.to_string(),
|
|
main_ip: ip_info.ip,
|
|
city: ip_info.city,
|
|
country: ip_info.country,
|
|
region: ip_info.region,
|
|
price: 1200,
|
|
};
|
|
|
|
let mut grpc_stream = client.register_app_node(key.sign_request(req)?).await?.into_inner();
|
|
|
|
let mut deleted_app_reqs = Vec::new();
|
|
while let Some(stream_update) = grpc_stream.next().await {
|
|
match stream_update {
|
|
Ok(del_app_req) => {
|
|
deleted_app_reqs.push(del_app_req);
|
|
}
|
|
Err(e) => {
|
|
panic!("Received error instead of deleted_app_reqs: {e:?}");
|
|
}
|
|
}
|
|
}
|
|
Ok(deleted_app_reqs)
|
|
}
|
|
|
|
pub async fn daemon_listener(
|
|
mut client: BrainAppDaemonClient<Channel>,
|
|
key: Key,
|
|
tx: mpsc::Sender<app_proto::BrainMessageApp>,
|
|
) -> Result<()> {
|
|
log::info!("listening app_daemon");
|
|
let mut grpc_stream =
|
|
client.brain_messages(key.sign_stream_auth_app(vec![])?).await?.into_inner();
|
|
|
|
while let Some(Ok(stream_update)) = grpc_stream.next().await {
|
|
log::info!("app deamon got notified: {:?}", &stream_update);
|
|
let _ = tx.send(stream_update).await;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn daemon_msg_sender(
|
|
mut client: BrainAppDaemonClient<Channel>,
|
|
key: Key,
|
|
tx: mpsc::Sender<app_proto::DaemonMessageApp>,
|
|
rx: mpsc::Receiver<app_proto::DaemonMessageApp>,
|
|
) -> Result<()> {
|
|
log::info!("sender app_daemon");
|
|
let rx_stream = ReceiverStream::new(rx);
|
|
tx.send(app_proto::DaemonMessageApp {
|
|
msg: Some(app_proto::daemon_message_app::Msg::Auth(key.sign_stream_auth_app(vec![])?)),
|
|
})
|
|
.await?;
|
|
client.daemon_messages(rx_stream).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn daemon_engine(
|
|
tx: mpsc::Sender<app_proto::DaemonMessageApp>,
|
|
mut rx: mpsc::Receiver<app_proto::BrainMessageApp>,
|
|
new_app_err: Option<String>,
|
|
) -> Result<()> {
|
|
log::info!("daemon engine app_daemon");
|
|
while let Some(brain_msg) = rx.recv().await {
|
|
match brain_msg.msg {
|
|
Some(app_proto::brain_message_app::Msg::NewAppReq(new_app_req)) => {
|
|
let exposed_ports =
|
|
[vec![34500], new_app_req.resource.unwrap_or_default().ports].concat();
|
|
|
|
let mapped_ports = exposed_ports
|
|
.into_iter()
|
|
.map(|port| MappedPort { host_port: port, guest_port: port })
|
|
.collect::<Vec<MappedPort>>();
|
|
|
|
let res_data = NewAppRes {
|
|
uuid: new_app_req.uuid,
|
|
mapped_ports,
|
|
ip_address: "127.0.0.1".to_string(),
|
|
error: new_app_err.clone().unwrap_or_default(),
|
|
};
|
|
|
|
let res = app_proto::DaemonMessageApp {
|
|
msg: Some(app_proto::daemon_message_app::Msg::NewAppRes(res_data)),
|
|
};
|
|
tx.send(res).await?;
|
|
}
|
|
Some(app_proto::brain_message_app::Msg::DeleteAppReq(del_app_req)) => {
|
|
println!("MOCK_APP_DAEMON::delete app request for {}", del_app_req.uuid);
|
|
}
|
|
None => todo!(),
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|