#![allow(dead_code)] pub mod brain { tonic::include_proto!("brain"); } use crate::data::BrainData; use brain::brain_cli_service_server::BrainCliService; use brain::brain_daemon_service_server::BrainDaemonService; use brain::*; use log::debug; use log::info; use std::pin::Pin; use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; pub struct BrainDaemonMock { data: Arc, } impl BrainDaemonMock { pub fn new(data: Arc) -> Self { Self { data } } } pub struct BrainCliMock { data: Arc, } impl BrainCliMock { pub fn new(data: Arc) -> Self { Self { data } } } #[tonic::async_trait] impl BrainDaemonService for BrainDaemonMock { async fn register_node( &self, req: Request, ) -> Result, Status> { self.data.insert_node(req.into_inner()); Ok(Response::new(Empty {})) } type GetNewVMReqsStream = Pin> + Send>>; async fn get_new_vm_reqs( &self, req: Request, ) -> Result, Status> { let req = req.into_inner(); info!("Daemon {} requested GetNewVMReqsStream", req.node_pubkey); let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); self.data .clone() .add_daemon_newvm_tx(&req.node_pubkey, data_tx) .await; tokio::spawn(async move { while let Some(newvmreq) = data_rx.recv().await { debug!( "received this newvmreq to {}: {newvmreq:?}", req.node_pubkey ); let _ = grpc_tx.send(Ok(newvmreq)).await; } }); let output_stream = ReceiverStream::new(grpc_rx); Ok(Response::new( Box::pin(output_stream) as Self::GetNewVMReqsStream )) } async fn send_vm_confirmations( &self, req: Request>, ) -> Result, Status> { debug!("Some node connected to stream NewVmConfirmation"); let mut confirmations = req.into_inner(); while let Some(confirmation) = confirmations.next().await { match confirmation { Ok(c) => { info!("Received confirmation from daemon: {c:?}"); self.data.submit_vmconfirmation(c).await; } Err(e) => log::warn!("Daemon disconnected from Streaming: {e:?}"), } } Ok(Response::new(Empty {})) } type DeletedVMUpdatesStream = Pin> + Send>>; async fn deleted_vm_updates( &self, req: Request, ) -> Result, Status> { let node_pubkey = req.into_inner().node_pubkey; info!("Daemon {node_pubkey} requested DeletedVMUpdatesStream"); let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); self.data .clone() .add_daemon_deletevm_tx(&node_pubkey, data_tx); tokio::spawn(async move { while let Some(deleted_vm) = data_rx.recv().await { match grpc_tx.send(Ok(deleted_vm.clone())).await { Ok(_) => debug!( "Sent delete_vm confirmation to {}: {:?}", node_pubkey, deleted_vm ), Err(e) => log::error!( "Could not send delete_vm confirmation {:?} to {} because of error: {:?}", deleted_vm, node_pubkey, e ), } } }); let output_stream = ReceiverStream::new(grpc_rx); Ok(Response::new( Box::pin(output_stream) as Self::DeletedVMUpdatesStream )) } type ListVMContractsStream = Pin> + Send>>; async fn list_vm_contracts( &self, req: Request, ) -> Result, Status> { let req = req.into_inner(); info!("Node {} requested ListVMContractsStream", req.node_pubkey); let contracts = self.data.find_contracts_by_admin_pubkey(&req.node_pubkey); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { let _ = tx.send(Ok(contract.into())).await; } }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ListVMContractsStream )) } } #[tonic::async_trait] impl BrainCliService for BrainCliMock { async fn create_vm_contract( &self, req: Request, ) -> Result, Status> { let req = req.into_inner(); info!("New VM requested via CLI: {req:?}"); let admin_pubkey = req.admin_pubkey.clone(); let (engine_tx, engine_rx) = tokio::sync::oneshot::channel(); if !self.data.submit_newvmrequest(req, engine_tx).await { return Err(Status::unavailable( "The node you picked is currently offline.", )); } if let Ok(response) = engine_rx.await { info!("Sending VM confirmation to {admin_pubkey}: {response:?}"); return Ok(Response::new(response)); } self.data.del_cli_vmcontract_tx(&admin_pubkey); Err(Status::unknown( "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", )) } type ListVMContractsStream = Pin> + Send>>; async fn list_vm_contracts( &self, req: Request, ) -> Result, Status> { let req = req.into_inner(); info!("CLI {} requested ListVMContractsStream", req.admin_pubkey); let contracts = self.data.find_contracts_by_admin_pubkey(&req.admin_pubkey); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { let _ = tx.send(Ok(contract.into())).await; } }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ListVMContractsStream )) } type ListNodesStream = Pin> + Send>>; async fn list_nodes( &self, req: Request, ) -> Result, tonic::Status> { let req = req.into_inner(); info!("Unknown CLI requested ListNodesStream: {req:?}"); let nodes = self.data.find_nodes_by_filters(&req); let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for node in nodes { let _ = tx.send(Ok(node.into())).await; } }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ListNodesStream )) } async fn delete_vm(&self, req: Request) -> Result, Status> { let req = req.into_inner(); info!("Unknown CLI requested to delete vm {}", req.uuid); self.data.delete_vm(req).await; Ok(Response::new(Empty {})) } }