added handling to list some nodes and contracts

This commit is contained in:
ghe0 2024-12-17 03:55:44 +02:00
parent 04d1c53a97
commit f1d9bcb07a
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
3 changed files with 117 additions and 35 deletions

@ -49,6 +49,11 @@ message VMContract {
string created_at = 13;
}
message ListVMContractsReq {
string admin_pubkey = 1;
string node_pubkey = 2;
}
message NewVMConfirmation {
repeated uint32 exposed_ports = 1;
string public_ipv4 = 2;
@ -63,7 +68,7 @@ service BrainDaemonService {
rpc RegisterNode (RegisterNodeRequest) returns (Empty);
rpc NewVMUpdates (stream NewVMConfirmation) returns (stream NewVMRequest);
rpc DeletedVMUpdates (Empty) returns (stream DeletedVMUpdate);
rpc ListVMContracts (Empty) returns (stream VMContract);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
}
message NodeFilters {
@ -76,7 +81,7 @@ message NodeFilters {
string country = 7;
}
message NodeListResponse {
message NodeListResp {
string node_pubkey = 1;
string country = 2;
string city = 3;
@ -87,7 +92,7 @@ message NodeListResponse {
service BrainCliService {
rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation);
rpc ListVMContracts (Empty) returns (stream VMContract);
rpc ListNodes (NodeFilters) returns (stream NodeListResponse);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp);
}

@ -7,10 +7,13 @@ pub struct BrainData {
pub contracts: RwLock<Vec<Contract>>,
}
#[derive(Clone)]
#[derive(Eq, Hash, PartialEq, Clone)]
pub struct Node {
pub public_key: String,
pub owner_key: String,
pub country: String,
pub city: String,
pub ip: String,
pub hostname: String,
pub avail_mem_mb: u32,
pub avail_vcpus: u32,
@ -21,6 +24,19 @@ pub struct Node {
pub max_ports_per_vm: u32,
}
impl Into<crate::grpc::brain::NodeListResp> for Node {
fn into(self) -> crate::grpc::brain::NodeListResp {
crate::grpc::brain::NodeListResp {
node_pubkey: self.public_key,
country: self.country,
city: self.city,
ip: self.ip,
server_rating: 0,
provider_rating: 0,
}
}
}
#[derive(Clone)]
pub struct Contract {
pub uuid: String,
@ -38,6 +54,26 @@ pub struct Contract {
pub created_at: String,
}
impl Into<crate::grpc::brain::VmContract> for Contract {
fn into(self) -> crate::grpc::brain::VmContract {
crate::grpc::brain::VmContract {
uuid: self.uuid,
hostname: self.hostname,
admin_pubkey: self.admin_pubkey,
node_pubkey: self.node_pubkey,
exposed_ports: self.exposed_ports,
public_ipv4: self.public_ipv4,
public_ipv6: self.public_ipv6,
disk_size_gb: self.disk_size_gb,
vcpus: self.vcpus,
memory_mb: self.memory_mb,
kernel_sha: self.kernel_sha,
dtrfs_sha: self.dtrfs_sha,
created_at: self.created_at,
}
}
}
impl BrainData {
pub fn new() -> Arc<Self> {
Arc::new(Self {
@ -56,40 +92,32 @@ impl BrainData {
contracts.push(contract);
}
pub async fn find_contract_by_pubkey(&self, public_key: &str) -> Option<Node> {
pub async fn find_nodes_by_pubkey(&self, public_key: &str) -> Option<Node> {
let nodes = self.nodes.read().await;
nodes.iter().cloned().find(|n| n.public_key == public_key)
}
pub async fn find_contract_by_owner_key(&self, owner_key: &str) -> Option<Node> {
pub async fn find_nodes_by_owner_key(&self, owner_key: &str) -> Option<Node> {
let nodes = self.nodes.read().await;
nodes.iter().cloned().find(|n| n.owner_key == owner_key)
}
pub async fn find_nodes_by_avail_vcpus(&self, min_vcpus: u32) -> Vec<Node> {
pub async fn find_nodes_by_filters(
&self,
filters: &crate::grpc::brain::NodeFilters,
) -> Vec<Node> {
let nodes = self.nodes.read().await;
nodes
.iter()
.cloned()
.filter(|n| n.avail_vcpus >= min_vcpus)
.collect()
}
pub async fn find_nodes_by_avail_mem(&self, min_mem_mb: u32) -> Vec<Node> {
let nodes = self.nodes.read().await;
nodes
.iter()
.cloned()
.filter(|n| n.avail_mem_mb >= min_mem_mb)
.collect()
}
pub async fn find_nodes_by_avail_storage(&self, min_storage_gb: u32) -> Vec<Node> {
let nodes = self.nodes.read().await;
nodes
.iter()
.cloned()
.filter(|n| n.avail_storage_gbs >= min_storage_gb)
.filter(|n| {
n.avail_ports >= filters.free_ports
&& (!filters.offers_ipv4 || n.avail_ipv4 > 0)
&& (!filters.offers_ipv6 || n.avail_ipv6 > 0)
&& n.avail_vcpus >= filters.vcpus
&& n.avail_mem_mb >= filters.memory_mb
&& n.avail_storage_gbs >= filters.storage_gb
})
.collect()
}

@ -10,7 +10,8 @@ use brain::brain_daemon_service_server::BrainDaemonService;
use brain::*;
use std::pin::Pin;
use std::sync::Arc;
use tokio_stream::Stream;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tonic::{Request, Response, Status, Streaming};
struct BrainDaemonMock {
@ -52,9 +53,26 @@ impl BrainDaemonService for BrainDaemonMock {
async fn list_vm_contracts(
&self,
_req: Request<Empty>,
req: Request<ListVmContractsReq>,
) -> Result<Response<Self::ListVMContractsStream>, Status> {
todo!()
let req = req.into_inner();
let contracts = self
.data
.find_contracts_by_admin_pubkey(&req.node_pubkey)
.await;
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
for contract in contracts {
let _ = tx
.send(Ok(contract.into()))
.await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListVMContractsStream
))
}
}
@ -70,16 +88,47 @@ impl BrainCliService for BrainCliMock {
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn list_vm_contracts(
&self,
_req: Request<Empty>,
req: Request<ListVmContractsReq>,
) -> Result<Response<Self::ListVMContractsStream>, Status> {
todo!()
let req = req.into_inner();
let contracts = self
.data
.find_contracts_by_admin_pubkey(&req.admin_pubkey)
.await;
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
for contract in contracts {
let _ = tx
.send(Ok(contract.into()))
.await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListVMContractsStream
))
}
type ListNodesStream = Pin<Box<dyn Stream<Item = Result<NodeListResponse, Status>> + Send>>;
type ListNodesStream = Pin<Box<dyn Stream<Item = Result<NodeListResp, Status>> + Send>>;
async fn list_nodes(
&self,
_req: Request<NodeFilters>,
req: Request<NodeFilters>,
) -> Result<Response<Self::ListNodesStream>, tonic::Status> {
todo!()
let req = req.into_inner();
let nodes = self.data.find_nodes_by_filters(&req).await;
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
for node in nodes {
let _ = tx
.send(Ok(node.into()))
.await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListNodesStream
))
}
}