added capability to search nodes

This commit is contained in:
ghe0 2025-04-26 23:26:51 +03:00
parent 1f277db873
commit 512dad5146
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
2 changed files with 58 additions and 13 deletions

@ -182,6 +182,49 @@ pub struct VmNodeWithReports {
pub reports: Vec<Report>, pub reports: Vec<Report>,
} }
impl VmNodeWithReports {
// TODO: find a more elegant way to do this than importing gRPC in the DB module
// https://en.wikipedia.org/wiki/Dependency_inversion_principle
pub async fn find_by_filters(
filters: detee_shared::snp::pb::vm_proto::VmNodeFilters,
) -> Result<Vec<Self>, Error> {
let mut query = format!(
"select *, <-report.* as reports from {VM_NODE} where
avail_ports >= {} &&
max_ports_per_vm >= {} &&
avail_ipv4 >= {} &&
avail_ipv6 >= {} &&
avail_vcpus >= {} &&
avail_mem_mb >= {} &&
avail_storage_gbs >= {}\n",
filters.free_ports,
filters.free_ports,
filters.offers_ipv4 as u32,
filters.offers_ipv6 as u32,
filters.vcpus,
filters.memory_mb,
filters.storage_gb
);
if !filters.city.is_empty() {
query += &format!(r#"&& city = "{}"\n"#, filters.city);
}
if !filters.region.is_empty() {
query += &format!(r#"&& region = "{}"\n"#, filters.region);
}
if !filters.country.is_empty() {
query += &format!(r#"&& country = "{}"\n"#, filters.country);
}
if !filters.ip.is_empty() {
query += &format!(r#"&& ip = "{}"\n"#, filters.ip);
}
query += ";";
let mut result =
DB.query(query).await?;
let vm_nodes: Vec<Self> = result.take(0)?;
Ok(vm_nodes)
}
}
pub enum DaemonNotification { pub enum DaemonNotification {
Create(NewVmReq), Create(NewVmReq),
Update(UpdateVmReq), Update(UpdateVmReq),

@ -356,7 +356,12 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => {
if !new_vm_resp.error.is_empty() { if !new_vm_resp.error.is_empty() {
} else { } else {
db::upsert_record("measurement_args", &new_vm_resp.uuid, new_vm_resp.args).await?; db::upsert_record(
"measurement_args",
&new_vm_resp.uuid,
new_vm_resp.args,
)
.await?;
} }
} }
Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {
@ -654,18 +659,15 @@ impl BrainVmCli for BrainVmCliForReal {
) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> { ) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> {
let req = check_sig_from_req(req)?; let req = check_sig_from_req(req)?;
info!("CLI requested ListVmNodesStream: {req:?}"); info!("CLI requested ListVmNodesStream: {req:?}");
todo!(); let nodes = db::VmNodeWithReports::find_by_filters(req).await?;
// let nodes = self.data.find_vm_nodes_by_filters(&req); let (tx, rx) = mpsc::channel(6);
// let (tx, rx) = mpsc::channel(6); tokio::spawn(async move {
// tokio::spawn(async move { for node in nodes {
// for node in nodes { let _ = tx.send(Ok(node.into())).await;
// let _ = tx.send(Ok(node.into())).await; }
// } });
// }); let output_stream = ReceiverStream::new(rx);
// let output_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(output_stream) as Self::ListVmNodesStream))
// Ok(Response::new(
// Box::pin(output_stream) as Self::ListVmNodesStream
// ))
} }
async fn get_one_vm_node( async fn get_one_vm_node(