diff --git a/brain.proto b/brain.proto index c1b14b9..05d8e4e 100644 --- a/brain.proto +++ b/brain.proto @@ -10,14 +10,17 @@ message NodePubkey { message RegisterNodeRequest { string node_pubkey = 1; - string country = 2; - string city = 3; - uint32 avail_ports = 4; - uint32 avail_ipv4 = 5; - uint32 avail_ipv6 = 6; - uint32 avail_vcpus = 7; - uint32 avail_memory_mb = 8; - uint32 avail_storage_gb = 9; + string owner_pubkey = 2; + string ip = 3; + string country = 4; + string city = 5; + uint32 avail_ports = 6; + uint32 avail_ipv4 = 7; + uint32 avail_ipv6 = 8; + uint32 avail_vcpus = 9; + uint32 avail_memory_mb = 10; + uint32 avail_storage_gb = 11; + uint32 max_ports_per_vm = 12; } message NewVMRequest { @@ -63,6 +66,7 @@ message NewVMConfirmation { repeated uint32 exposed_ports = 2; string public_ipv4 = 3; string public_ipv6 = 4; + string error = 5; } message DeletedVMUpdate { @@ -73,7 +77,7 @@ service BrainDaemonService { rpc RegisterNode (RegisterNodeRequest) returns (Empty); rpc GetNewVMReqs (NodePubkey) returns (stream NewVMRequest); rpc SendVMConfirmations (stream NewVMConfirmation) returns (Empty); - rpc DeletedVMUpdates (Empty) returns (stream DeletedVMUpdate); + rpc DeletedVMUpdates (NodePubkey) returns (stream DeletedVMUpdate); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); } @@ -100,5 +104,6 @@ service BrainCliService { rpc CreateVMContract (NewVMRequest) returns (NewVMConfirmation); rpc ListVMContracts (ListVMContractsReq) returns (stream VMContract); rpc ListNodes (NodeFilters) returns (stream NodeListResp); + rpc DeleteVM (DeletedVMUpdate) returns (Empty); } diff --git a/src/data.rs b/src/data.rs index f2daf29..085cd5f 100644 --- a/src/data.rs +++ b/src/data.rs @@ -13,7 +13,6 @@ pub struct Node { pub country: String, pub city: String, pub ip: String, - pub hostname: String, pub avail_mem_mb: u32, pub avail_vcpus: u32, pub avail_storage_gbs: u32, @@ -23,6 +22,25 @@ pub struct Node { pub max_ports_per_vm: u32, } +impl From for Node { + fn from(node: grpc::RegisterNodeRequest) -> Self { + Node { + public_key: node.node_pubkey, + owner_key: node.owner_pubkey, + country: node.country, + city: node.city, + ip: node.ip, + avail_mem_mb: node.avail_memory_mb, + avail_vcpus: node.avail_vcpus, + avail_storage_gbs: node.avail_storage_gb, + avail_ipv4: node.avail_ipv4, + avail_ipv6: node.avail_ipv6, + avail_ports: node.avail_ports, + max_ports_per_vm: node.max_ports_per_vm, + } + } +} + impl Into for Node { fn into(self) -> grpc::NodeListResp { grpc::NodeListResp { @@ -95,15 +113,31 @@ impl Drop for GuardCliTx { } impl BrainData { - pub async fn insert_node(&self, node: Node) { + pub fn insert_node(&self, node: grpc::RegisterNodeRequest) { let mut nodes = self.nodes.write().unwrap(); - nodes.push(node); + for n in nodes.iter_mut() { + if n.public_key == node.node_pubkey { + *n = node.into(); + return; + } + } + nodes.push(node.into()); } pub fn add_daemon_deletevm_tx(&self, node_pubkey: &str, tx: Sender) { self.daemon_deletevm_tx.insert(node_pubkey.to_string(), tx); } + pub async fn delete_vm(&self, delete_vm: grpc::DeletedVmUpdate) { + if let Some(contract) = self.find_contract_by_uuid(&delete_vm.uuid) { + if let Some(daemon_tx) = self.daemon_deletevm_tx.get(&contract.node_pubkey) { + let _ = daemon_tx.send(delete_vm.clone()).await; + } + let mut contracts = self.contracts.write().unwrap(); + contracts.retain(|c| c.uuid != delete_vm.uuid); + } + } + pub fn add_daemon_newvm_tx(&self, node_pubkey: &str, tx: Sender) { self.daemon_newvm_tx.insert(node_pubkey.to_string(), tx); } @@ -116,6 +150,9 @@ impl BrainData { if let Some((_, client_tx)) = self.cli_vmcontract_tx.remove(&newvmreq.admin_pubkey) { let _ = client_tx.send(confirmation.clone()); } + if confirmation.error == "" { + return; + } let contract = Contract { uuid: confirmation.uuid, exposed_ports: confirmation.exposed_ports, diff --git a/src/grpc.rs b/src/grpc.rs index 5ca5bd4..063e96e 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -25,9 +25,10 @@ struct BrainCliMock { impl BrainDaemonService for BrainDaemonMock { async fn register_node( &self, - _req: Request, + req: Request, ) -> Result, Status> { - todo!() + self.data.insert_node(req.into_inner()); + Ok(Response::new(Empty{})) } type GetNewVMReqsStream = Pin> + Send>>; @@ -69,9 +70,21 @@ impl BrainDaemonService for BrainDaemonMock { async fn deleted_vm_updates( &self, - _req: Request, + req: Request, ) -> Result, Status> { - todo!() + let (grpc_tx, grpc_rx) = mpsc::channel(6); + let (data_tx, mut data_rx) = mpsc::channel(6); + self.data + .add_daemon_deletevm_tx(&req.into_inner().node_pubkey, data_tx); + tokio::spawn(async move { + while let Some(deleted_vm) = data_rx.recv().await { + let _ = grpc_tx.send(Ok(deleted_vm)).await; + } + }); + let output_stream = ReceiverStream::new(grpc_rx); + Ok(Response::new( + Box::pin(output_stream) as Self::DeletedVMUpdatesStream + )) } type ListVMContractsStream = Pin> + Send>>; @@ -88,7 +101,6 @@ impl BrainDaemonService for BrainDaemonMock { let _ = tx.send(Ok(contract.into())).await; } }); - let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ListVMContractsStream @@ -131,7 +143,6 @@ impl BrainCliService for BrainCliMock { let _ = tx.send(Ok(contract.into())).await; } }); - let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ListVMContractsStream @@ -151,10 +162,17 @@ impl BrainCliService for BrainCliMock { let _ = tx.send(Ok(node.into())).await; } }); - let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ListNodesStream )) } + + async fn delete_vm( + &self, + req: Request, + ) -> Result, Status> { + self.data.delete_vm(req.into_inner()).await; + Ok(Response::new(Empty{})) + } }