wrote brain-mock functionality

This commit is contained in:
ghe0 2024-12-21 01:55:54 +02:00
parent b14c590d01
commit ec5d266c28
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
3 changed files with 79 additions and 19 deletions

@ -10,14 +10,17 @@ message NodePubkey {
message RegisterNodeRequest { message RegisterNodeRequest {
string node_pubkey = 1; string node_pubkey = 1;
string country = 2; string owner_pubkey = 2;
string city = 3; string ip = 3;
uint32 avail_ports = 4; string country = 4;
uint32 avail_ipv4 = 5; string city = 5;
uint32 avail_ipv6 = 6; uint32 avail_ports = 6;
uint32 avail_vcpus = 7; uint32 avail_ipv4 = 7;
uint32 avail_memory_mb = 8; uint32 avail_ipv6 = 8;
uint32 avail_storage_gb = 9; uint32 avail_vcpus = 9;
uint32 avail_memory_mb = 10;
uint32 avail_storage_gb = 11;
uint32 max_ports_per_vm = 12;
} }
message NewVMRequest { message NewVMRequest {
@ -63,6 +66,7 @@ message NewVMConfirmation {
repeated uint32 exposed_ports = 2; repeated uint32 exposed_ports = 2;
string public_ipv4 = 3; string public_ipv4 = 3;
string public_ipv6 = 4; string public_ipv6 = 4;
string error = 5;
} }
message DeletedVMUpdate { message DeletedVMUpdate {
@ -73,7 +77,7 @@ service BrainDaemonService {
rpc RegisterNode (RegisterNodeRequest) returns (Empty); rpc RegisterNode (RegisterNodeRequest) returns (Empty);
rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest);
rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty);
rpc DeletedVMUpdates (Empty) returns (stream DeletedVMUpdate); rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
} }
@ -100,5 +104,6 @@ service BrainCliService {
rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation);
rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract);
rpc ListNodes (NodeFilters) returns (stream NodeListResp); rpc ListNodes (NodeFilters) returns (stream NodeListResp);
rpc DeleteVM (DeletedVMUpdate) returns (Empty);
} }

@ -13,7 +13,6 @@ pub struct Node {
pub country: String, pub country: String,
pub city: String, pub city: String,
pub ip: String, pub ip: String,
pub hostname: String,
pub avail_mem_mb: u32, pub avail_mem_mb: u32,
pub avail_vcpus: u32, pub avail_vcpus: u32,
pub avail_storage_gbs: u32, pub avail_storage_gbs: u32,
@ -23,6 +22,25 @@ pub struct Node {
pub max_ports_per_vm: u32, pub max_ports_per_vm: u32,
} }
impl From<grpc::RegisterNodeRequest> for Node {
fn from(node: grpc::RegisterNodeRequest) -> Self {
Node {
public_key: node.node_pubkey,
owner_key: node.owner_pubkey,
country: node.country,
city: node.city,
ip: node.ip,
avail_mem_mb: node.avail_memory_mb,
avail_vcpus: node.avail_vcpus,
avail_storage_gbs: node.avail_storage_gb,
avail_ipv4: node.avail_ipv4,
avail_ipv6: node.avail_ipv6,
avail_ports: node.avail_ports,
max_ports_per_vm: node.max_ports_per_vm,
}
}
}
impl Into<grpc::NodeListResp> for Node { impl Into<grpc::NodeListResp> for Node {
fn into(self) -> grpc::NodeListResp { fn into(self) -> grpc::NodeListResp {
grpc::NodeListResp { grpc::NodeListResp {
@ -95,15 +113,31 @@ impl Drop for GuardCliTx {
} }
impl BrainData { impl BrainData {
pub async fn insert_node(&self, node: Node) { pub fn insert_node(&self, node: grpc::RegisterNodeRequest) {
let mut nodes = self.nodes.write().unwrap(); let mut nodes = self.nodes.write().unwrap();
nodes.push(node); for n in nodes.iter_mut() {
if n.public_key == node.node_pubkey {
*n = node.into();
return;
}
}
nodes.push(node.into());
} }
pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::DeletedVmUpdate>) { pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender<grpc::DeletedVmUpdate>) {
self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx);
} }
pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) {
if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) {
if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) {
let _ = daemon_tx.send(delete_vm.clone()).await;
}
let mut contracts = self.contracts.write().unwrap();
contracts.retain(|c| c.uuid != delete_vm.uuid);
}
}
pub fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmRequest>) { pub fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender<grpc::NewVmRequest>) {
self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx);
} }
@ -116,6 +150,9 @@ impl BrainData {
if let Some((_, client_tx)) = self.cli_vmcontract_tx.remove(&newvmreq.admin_pubkey) { if let Some((_, client_tx)) = self.cli_vmcontract_tx.remove(&newvmreq.admin_pubkey) {
let _ = client_tx.send(confirmation.clone()); let _ = client_tx.send(confirmation.clone());
} }
if confirmation.error == "" {
return;
}
let contract = Contract { let contract = Contract {
uuid: confirmation.uuid, uuid: confirmation.uuid,
exposed_ports: confirmation.exposed_ports, exposed_ports: confirmation.exposed_ports,

@ -25,9 +25,10 @@ struct BrainCliMock {
impl BrainDaemonService for BrainDaemonMock { impl BrainDaemonService for BrainDaemonMock {
async fn register_node( async fn register_node(
&self, &self,
_req: Request<RegisterNodeRequest>, req: Request<RegisterNodeRequest>,
) -> Result<Response<Empty>, Status> { ) -> Result<Response<Empty>, Status> {
todo!() self.data.insert_node(req.into_inner());
Ok(Response::new(Empty{}))
} }
type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmRequest, Status>> + Send>>; type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmRequest, Status>> + Send>>;
@ -69,9 +70,21 @@ impl BrainDaemonService for BrainDaemonMock {
async fn deleted_vm_updates( async fn deleted_vm_updates(
&self, &self,
_req: Request<Empty>, req: Request<NodePubkey>,
) -> Result<Response<Self::DeletedVMUpdatesStream>, Status> { ) -> Result<Response<Self::DeletedVMUpdatesStream>, Status> {
todo!() let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6);
self.data
.add_daemon_deletevm_tx(&req.into_inner().node_pubkey, data_tx);
tokio::spawn(async move {
while let Some(deleted_vm) = data_rx.recv().await {
let _ = grpc_tx.send(Ok(deleted_vm)).await;
}
});
let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new(
Box::pin(output_stream) as Self::DeletedVMUpdatesStream
))
} }
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>; type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
@ -88,7 +101,6 @@ impl BrainDaemonService for BrainDaemonMock {
let _ = tx.send(Ok(contract.into())).await; let _ = tx.send(Ok(contract.into())).await;
} }
}); });
let output_stream = ReceiverStream::new(rx); let output_stream = ReceiverStream::new(rx);
Ok(Response::new( Ok(Response::new(
Box::pin(output_stream) as Self::ListVMContractsStream Box::pin(output_stream) as Self::ListVMContractsStream
@ -131,7 +143,6 @@ impl BrainCliService for BrainCliMock {
let _ = tx.send(Ok(contract.into())).await; let _ = tx.send(Ok(contract.into())).await;
} }
}); });
let output_stream = ReceiverStream::new(rx); let output_stream = ReceiverStream::new(rx);
Ok(Response::new( Ok(Response::new(
Box::pin(output_stream) as Self::ListVMContractsStream Box::pin(output_stream) as Self::ListVMContractsStream
@ -151,10 +162,17 @@ impl BrainCliService for BrainCliMock {
let _ = tx.send(Ok(node.into())).await; let _ = tx.send(Ok(node.into())).await;
} }
}); });
let output_stream = ReceiverStream::new(rx); let output_stream = ReceiverStream::new(rx);
Ok(Response::new( Ok(Response::new(
Box::pin(output_stream) as Self::ListNodesStream Box::pin(output_stream) as Self::ListNodesStream
)) ))
} }
async fn delete_vm(
&self,
req: Request<DeletedVmUpdate>,
) -> Result<Response<Empty>, Status> {
self.data.delete_vm(req.into_inner()).await;
Ok(Response::new(Empty{}))
}
} }