brain-mock/src/grpc.rs
2024-12-22 04:41:05 +02:00

208 lines
6.6 KiB
Rust

#![allow(dead_code)]
pub mod brain {
tonic::include_proto!("brain");
}
use crate::data::BrainData;
use brain::brain_cli_service_server::BrainCliService;
use brain::brain_daemon_service_server::BrainDaemonService;
use brain::*;
use log::debug;
use log::info;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};
pub struct BrainDaemonMock {
data: Arc<BrainData>,
}
impl BrainDaemonMock {
pub fn new(data: Arc<BrainData>) -> Self {
Self { data }
}
}
pub struct BrainCliMock {
data: Arc<BrainData>,
}
impl BrainCliMock {
pub fn new(data: Arc<BrainData>) -> Self {
Self { data }
}
}
#[tonic::async_trait]
impl BrainDaemonService for BrainDaemonMock {
async fn register_node(
&self,
req: Request<RegisterNodeRequest>,
) -> Result<Response<Empty>, Status> {
self.data.insert_node(req.into_inner());
Ok(Response::new(Empty {}))
}
type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmRequest, Status>> + Send>>;
async fn get_new_vm_reqs(
&self,
req: Request<NodePubkey>,
) -> Result<Response<Self::GetNewVMReqsStream>, Status> {
let req = req.into_inner();
info!(
"Daemon {} connected for GetNewVMReqsStream",
req.node_pubkey
);
let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6);
self
.data
.clone()
.add_daemon_newvm_tx(&req.node_pubkey, data_tx);
let node_pubkey = req.node_pubkey.clone();
let data = self.data.clone();
tokio::spawn(async move {
while let Some(newvmreq) = data_rx.recv().await {
debug!(
"received this newvmreq to {}: {newvmreq:?}",
req.node_pubkey
);
let _ = grpc_tx.send(Ok(newvmreq)).await;
}
let _ = dropper;
data.del_cli_vmcontract_tx(&node_pubkey);
});
let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new(
Box::pin(output_stream) as Self::GetNewVMReqsStream
))
}
async fn send_vm_confirmations(
&self,
req: Request<Streaming<NewVmConfirmation>>,
) -> Result<Response<Empty>, Status> {
let mut confirmations = req.into_inner();
while let Some(confirmation) = confirmations.next().await {
if let Ok(confirmation) = confirmation {
self.data.submit_vmconfirmation(confirmation).await;
}
}
Ok(Response::new(Empty {}))
}
type DeletedVMUpdatesStream =
Pin<Box<dyn Stream<Item = Result<DeletedVmUpdate, Status>> + Send>>;
async fn deleted_vm_updates(
&self,
req: Request<NodePubkey>,
) -> Result<Response<Self::DeletedVMUpdatesStream>, Status> {
let (grpc_tx, grpc_rx) = mpsc::channel(6);
let (data_tx, mut data_rx) = mpsc::channel(6);
let _dropper = self
.data
.clone()
.add_daemon_deletevm_tx(&req.into_inner().node_pubkey, data_tx);
tokio::spawn(async move {
while let Some(deleted_vm) = data_rx.recv().await {
let _ = grpc_tx.send(Ok(deleted_vm)).await;
}
});
let output_stream = ReceiverStream::new(grpc_rx);
Ok(Response::new(
Box::pin(output_stream) as Self::DeletedVMUpdatesStream
))
}
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn list_vm_contracts(
&self,
req: Request<ListVmContractsReq>,
) -> Result<Response<Self::ListVMContractsStream>, Status> {
let req = req.into_inner();
let contracts = self.data.find_contracts_by_admin_pubkey(&req.node_pubkey);
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListVMContractsStream
))
}
}
#[tonic::async_trait]
impl BrainCliService for BrainCliMock {
async fn create_vm_contract(
&self,
req: Request<NewVmRequest>,
) -> Result<Response<NewVmConfirmation>, Status> {
let req = req.into_inner();
let (engine_tx, engine_rx) = tokio::sync::oneshot::channel();
let dropper = self.data.clone().submit_newvmrequest(req, engine_tx).await;
if dropper.is_none() {
return Err(Status::unavailable(
"The node you picked is currently offline.",
));
}
if let Ok(response) = engine_rx.await {
return Ok(Response::new(response));
}
Err(Status::unknown(
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
))
}
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn list_vm_contracts(
&self,
req: Request<ListVmContractsReq>,
) -> Result<Response<Self::ListVMContractsStream>, Status> {
let req = req.into_inner();
let contracts = self.data.find_contracts_by_admin_pubkey(&req.admin_pubkey);
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListVMContractsStream
))
}
type ListNodesStream = Pin<Box<dyn Stream<Item = Result<NodeListResp, Status>> + Send>>;
async fn list_nodes(
&self,
req: Request<NodeFilters>,
) -> Result<Response<Self::ListNodesStream>, tonic::Status> {
let req = req.into_inner();
let nodes = self.data.find_nodes_by_filters(&req);
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for node in nodes {
let _ = tx.send(Ok(node.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListNodesStream
))
}
async fn delete_vm(&self, req: Request<DeletedVmUpdate>) -> Result<Response<Empty>, Status> {
self.data.delete_vm(req.into_inner()).await;
Ok(Response::new(Empty {}))
}
}