diff --git a/src/db.rs b/src/db.rs index 77d9e72..56548c1 100644 --- a/src/db.rs +++ b/src/db.rs @@ -182,6 +182,49 @@ pub struct VmNodeWithReports { pub reports: Vec, } +impl VmNodeWithReports { + // TODO: find a more elegant way to do this than importing gRPC in the DB module + // https://en.wikipedia.org/wiki/Dependency_inversion_principle + pub async fn find_by_filters( + filters: detee_shared::snp::pb::vm_proto::VmNodeFilters, + ) -> Result, Error> { + let mut query = format!( + "select *, <-report.* as reports from {VM_NODE} where + avail_ports >= {} && + max_ports_per_vm >= {} && + avail_ipv4 >= {} && + avail_ipv6 >= {} && + avail_vcpus >= {} && + avail_mem_mb >= {} && + avail_storage_gbs >= {}\n", + filters.free_ports, + filters.free_ports, + filters.offers_ipv4 as u32, + filters.offers_ipv6 as u32, + filters.vcpus, + filters.memory_mb, + filters.storage_gb + ); + if !filters.city.is_empty() { + query += &format!(r#"&& city = "{}"\n"#, filters.city); + } + if !filters.region.is_empty() { + query += &format!(r#"&& region = "{}"\n"#, filters.region); + } + if !filters.country.is_empty() { + query += &format!(r#"&& country = "{}"\n"#, filters.country); + } + if !filters.ip.is_empty() { + query += &format!(r#"&& ip = "{}"\n"#, filters.ip); + } + query += ";"; + let mut result = + DB.query(query).await?; + let vm_nodes: Vec = result.take(0)?; + Ok(vm_nodes) + } +} + pub enum DaemonNotification { Create(NewVmReq), Update(UpdateVmReq), diff --git a/src/grpc.rs b/src/grpc.rs index 9588635..029d463 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -356,7 +356,12 @@ impl BrainVmDaemon for BrainVmDaemonForReal { Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { if !new_vm_resp.error.is_empty() { } else { - db::upsert_record("measurement_args", &new_vm_resp.uuid, new_vm_resp.args).await?; + db::upsert_record( + "measurement_args", + &new_vm_resp.uuid, + new_vm_resp.args, + ) + .await?; } } Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { @@ -654,18 +659,15 @@ impl BrainVmCli for BrainVmCliForReal { ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("CLI requested ListVmNodesStream: {req:?}"); - todo!(); - // let nodes = self.data.find_vm_nodes_by_filters(&req); - // let (tx, rx) = mpsc::channel(6); - // tokio::spawn(async move { - // for node in nodes { - // let _ = tx.send(Ok(node.into())).await; - // } - // }); - // let output_stream = ReceiverStream::new(rx); - // Ok(Response::new( - // Box::pin(output_stream) as Self::ListVmNodesStream - // )) + let nodes = db::VmNodeWithReports::find_by_filters(req).await?; + let (tx, rx) = mpsc::channel(6); + tokio::spawn(async move { + for node in nodes { + let _ = tx.send(Ok(node.into())).await; + } + }); + let output_stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(output_stream) as Self::ListVmNodesStream)) } async fn get_one_vm_node(