From f1d9bcb07a407e900e0bf238b11f3a90f4d74c04 Mon Sep 17 00:00:00 2001 From: ghe0 Date: Tue, 17 Dec 2024 03:55:44 +0200 Subject: [PATCH] added handling to list some nodes and contracts --- brain.proto | 13 +++++++--- src/data.rs | 74 ++++++++++++++++++++++++++++++++++++----------------- src/grpc.rs | 65 ++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 117 insertions(+), 35 deletions(-) diff --git a/brain.proto b/brain.proto index da5b1f0..9509be3 100644 --- a/brain.proto +++ b/brain.proto @@ -49,6 +49,11 @@ message VMContract { string created_at = 13; } +message ListVMContractsReq { + string admin_pubkey = 1; + string node_pubkey = 2; +} + message NewVMConfirmation { repeated uint32 exposed_ports = 1; string public_ipv4 = 2; @@ -63,7 +68,7 @@ service BrainDaemonService { rpc RegisterNode (RegisterNodeRequest) returns (Empty); rpc NewVMUpdates (stream NewVMConfirmation) returns (stream NewVMRequest); rpc DeletedVMUpdates (Empty) returns (stream DeletedVMUpdate); - rpc ListVMContracts (Empty) returns (stream VMContract); + rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); } message NodeFilters { @@ -76,7 +81,7 @@ message NodeFilters { string country = 7; } -message NodeListResponse { +message NodeListResp { string node_pubkey = 1; string country = 2; string city = 3; @@ -87,7 +92,7 @@ message NodeListResponse { service BrainCliService { rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); - rpc ListVMContracts (Empty) returns (stream VMContract); - rpc ListNodes (NodeFilters) returns (stream NodeListResponse); + rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); + rpc ListNodes (NodeFilters) returns (stream NodeListResp); } diff --git a/src/data.rs b/src/data.rs index 88da487..72f4276 100644 --- a/src/data.rs +++ b/src/data.rs @@ -7,10 +7,13 @@ pub struct BrainData { pub contracts: RwLock>, } -#[derive(Clone)] +#[derive(Eq, Hash, PartialEq, Clone)] pub struct Node { pub public_key: String, pub owner_key: String, + pub country: String, + pub city: String, + pub ip: String, pub hostname: String, pub avail_mem_mb: u32, pub avail_vcpus: u32, @@ -21,6 +24,19 @@ pub struct Node { pub max_ports_per_vm: u32, } +impl Into for Node { + fn into(self) -> crate::grpc::brain::NodeListResp { + crate::grpc::brain::NodeListResp { + node_pubkey: self.public_key, + country: self.country, + city: self.city, + ip: self.ip, + server_rating: 0, + provider_rating: 0, + } + } +} + #[derive(Clone)] pub struct Contract { pub uuid: String, @@ -38,6 +54,26 @@ pub struct Contract { pub created_at: String, } +impl Into for Contract { + fn into(self) -> crate::grpc::brain::VmContract { + crate::grpc::brain::VmContract { + uuid: self.uuid, + hostname: self.hostname, + admin_pubkey: self.admin_pubkey, + node_pubkey: self.node_pubkey, + exposed_ports: self.exposed_ports, + public_ipv4: self.public_ipv4, + public_ipv6: self.public_ipv6, + disk_size_gb: self.disk_size_gb, + vcpus: self.vcpus, + memory_mb: self.memory_mb, + kernel_sha: self.kernel_sha, + dtrfs_sha: self.dtrfs_sha, + created_at: self.created_at, + } + } +} + impl BrainData { pub fn new() -> Arc { Arc::new(Self { @@ -56,40 +92,32 @@ impl BrainData { contracts.push(contract); } - pub async fn find_contract_by_pubkey(&self, public_key: &str) -> Option { + pub async fn find_nodes_by_pubkey(&self, public_key: &str) -> Option { let nodes = self.nodes.read().await; nodes.iter().cloned().find(|n| n.public_key == public_key) } - pub async fn find_contract_by_owner_key(&self, owner_key: &str) -> Option { + pub async fn find_nodes_by_owner_key(&self, owner_key: &str) -> Option { let nodes = self.nodes.read().await; nodes.iter().cloned().find(|n| n.owner_key == owner_key) } - pub async fn find_nodes_by_avail_vcpus(&self, min_vcpus: u32) -> Vec { + pub async fn find_nodes_by_filters( + &self, + filters: &crate::grpc::brain::NodeFilters, + ) -> Vec { let nodes = self.nodes.read().await; nodes .iter() .cloned() - .filter(|n| n.avail_vcpus >= min_vcpus) - .collect() - } - - pub async fn find_nodes_by_avail_mem(&self, min_mem_mb: u32) -> Vec { - let nodes = self.nodes.read().await; - nodes - .iter() - .cloned() - .filter(|n| n.avail_mem_mb >= min_mem_mb) - .collect() - } - - pub async fn find_nodes_by_avail_storage(&self, min_storage_gb: u32) -> Vec { - let nodes = self.nodes.read().await; - nodes - .iter() - .cloned() - .filter(|n| n.avail_storage_gbs >= min_storage_gb) + .filter(|n| { + n.avail_ports >= filters.free_ports + && (!filters.offers_ipv4 || n.avail_ipv4 > 0) + && (!filters.offers_ipv6 || n.avail_ipv6 > 0) + && n.avail_vcpus >= filters.vcpus + && n.avail_mem_mb >= filters.memory_mb + && n.avail_storage_gbs >= filters.storage_gb + }) .collect() } diff --git a/src/grpc.rs b/src/grpc.rs index a1e8874..7684327 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -10,7 +10,8 @@ use brain::brain_daemon_service_server::BrainDaemonService; use brain::*; use std::pin::Pin; use std::sync::Arc; -use tokio_stream::Stream; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; use tonic::{Request, Response, Status, Streaming}; struct BrainDaemonMock { @@ -52,9 +53,26 @@ impl BrainDaemonService for BrainDaemonMock { async fn list_vm_contracts( &self, - _req: Request, + req: Request, ) -> Result, Status> { - todo!() + let req = req.into_inner(); + let contracts = self + .data + .find_contracts_by_admin_pubkey(&req.node_pubkey) + .await; + let (tx, rx) = mpsc::channel(128); + tokio::spawn(async move { + for contract in contracts { + let _ = tx + .send(Ok(contract.into())) + .await; + } + }); + + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::ListVMContractsStream + )) } } @@ -70,16 +88,47 @@ impl BrainCliService for BrainCliMock { type ListVMContractsStream = Pin> + Send>>; async fn list_vm_contracts( &self, - _req: Request, + req: Request, ) -> Result, Status> { - todo!() + let req = req.into_inner(); + let contracts = self + .data + .find_contracts_by_admin_pubkey(&req.admin_pubkey) + .await; + let (tx, rx) = mpsc::channel(128); + tokio::spawn(async move { + for contract in contracts { + let _ = tx + .send(Ok(contract.into())) + .await; + } + }); + + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::ListVMContractsStream + )) } - type ListNodesStream = Pin> + Send>>; + type ListNodesStream = Pin> + Send>>; async fn list_nodes( &self, - _req: Request, + req: Request, ) -> Result, tonic::Status> { - todo!() + let req = req.into_inner(); + let nodes = self.data.find_nodes_by_filters(&req).await; + let (tx, rx) = mpsc::channel(128); + tokio::spawn(async move { + for node in nodes { + let _ = tx + .send(Ok(node.into())) + .await; + } + }); + + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::ListNodesStream + )) } }