316 lines
11 KiB
Rust
316 lines
11 KiB
Rust
#![allow(dead_code)]
|
|
|
|
pub mod brain {
|
|
tonic::include_proto!("brain");
|
|
}
|
|
|
|
use crate::data::BrainData;
|
|
use brain::brain_cli_service_server::BrainCliService;
|
|
use brain::brain_daemon_service_server::BrainDaemonService;
|
|
use brain::*;
|
|
use log::debug;
|
|
use log::info;
|
|
use log::warn;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use tokio::sync::mpsc;
|
|
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
|
|
use tonic::{Request, Response, Status, Streaming};
|
|
|
|
pub struct BrainDaemonMock {
|
|
data: Arc<BrainData>,
|
|
}
|
|
|
|
impl BrainDaemonMock {
|
|
pub fn new(data: Arc<BrainData>) -> Self {
|
|
Self { data }
|
|
}
|
|
}
|
|
|
|
pub struct BrainCliMock {
|
|
data: Arc<BrainData>,
|
|
}
|
|
|
|
impl BrainCliMock {
|
|
pub fn new(data: Arc<BrainData>) -> Self {
|
|
Self { data }
|
|
}
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl BrainDaemonService for BrainDaemonMock {
|
|
async fn register_node(
|
|
&self,
|
|
req: Request<RegisterNodeReq>,
|
|
) -> Result<Response<Empty>, Status> {
|
|
self.data.insert_node(req.into_inner());
|
|
Ok(Response::new(Empty {}))
|
|
}
|
|
|
|
type GetNewVMReqsStream = Pin<Box<dyn Stream<Item = Result<NewVmReq, Status>> + Send>>;
|
|
|
|
async fn get_new_vm_reqs(
|
|
&self,
|
|
req: Request<NodePubkey>,
|
|
) -> Result<Response<Self::GetNewVMReqsStream>, Status> {
|
|
let req = req.into_inner();
|
|
info!("Daemon {} requested GetNewVMReqsStream", req.node_pubkey);
|
|
let (grpc_tx, grpc_rx) = mpsc::channel(6);
|
|
let (data_tx, mut data_rx) = mpsc::channel(6);
|
|
self.data
|
|
.add_daemon_newvm_tx(&req.node_pubkey, data_tx)
|
|
.await;
|
|
let data = self.data.clone();
|
|
tokio::spawn(async move {
|
|
while let Some(newvmreq) = data_rx.recv().await {
|
|
let uuid = newvmreq.uuid.clone();
|
|
debug!("Sending NewVMRequest to {}: {newvmreq:?}", req.node_pubkey);
|
|
if let Err(e) = grpc_tx.send(Ok(newvmreq)).await {
|
|
warn!("Could not send NewVMRequest to {}: {e:?}", req.node_pubkey);
|
|
data.submit_vmconfirmation(NewVmResp {
|
|
error: "Daemon not connected.".to_string(),
|
|
uuid,
|
|
..Default::default()
|
|
})
|
|
.await;
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
let output_stream = ReceiverStream::new(grpc_rx);
|
|
Ok(Response::new(
|
|
Box::pin(output_stream) as Self::GetNewVMReqsStream
|
|
))
|
|
}
|
|
|
|
async fn send_new_vm_resp(
|
|
&self,
|
|
req: Request<Streaming<NewVmResp>>,
|
|
) -> Result<Response<Empty>, Status> {
|
|
debug!("Some node connected to stream NewVMResp");
|
|
let mut confirmations = req.into_inner();
|
|
while let Some(confirmation) = confirmations.next().await {
|
|
match confirmation {
|
|
Ok(c) => {
|
|
info!("Received confirmation from daemon: {c:?}");
|
|
self.data.submit_vmconfirmation(c).await;
|
|
}
|
|
Err(e) => {
|
|
log::warn!("Daemon disconnected from Streaming<NewVMResp>: {e:?}")
|
|
}
|
|
}
|
|
}
|
|
Ok(Response::new(Empty {}))
|
|
}
|
|
|
|
type GetDeleteVMReqStream = Pin<Box<dyn Stream<Item = Result<DeleteVmReq, Status>> + Send>>;
|
|
|
|
async fn get_delete_vm_req(
|
|
&self,
|
|
req: Request<NodePubkey>,
|
|
) -> Result<Response<Self::GetDeleteVMReqStream>, Status> {
|
|
let node_pubkey = req.into_inner().node_pubkey;
|
|
info!("Daemon {node_pubkey} requested GetDeleteVMReqStream");
|
|
let (grpc_tx, grpc_rx) = mpsc::channel(6);
|
|
let (data_tx, mut data_rx) = mpsc::channel(6);
|
|
self.data
|
|
.clone()
|
|
.add_daemon_deletevm_tx(&node_pubkey, data_tx);
|
|
tokio::spawn(async move {
|
|
while let Some(deleted_vm) = data_rx.recv().await {
|
|
match grpc_tx.send(Ok(deleted_vm.clone())).await {
|
|
Ok(_) => debug!(
|
|
"Sent delete_vm confirmation to {}: {:?}",
|
|
node_pubkey, deleted_vm
|
|
),
|
|
Err(e) => log::error!(
|
|
"Could not send delete_vm confirmation {:?} to {} because of error: {:?}",
|
|
deleted_vm,
|
|
node_pubkey,
|
|
e
|
|
),
|
|
}
|
|
}
|
|
});
|
|
let output_stream = ReceiverStream::new(grpc_rx);
|
|
Ok(Response::new(
|
|
Box::pin(output_stream) as Self::GetDeleteVMReqStream
|
|
))
|
|
}
|
|
|
|
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
|
|
|
|
async fn list_vm_contracts(
|
|
&self,
|
|
req: Request<ListVmContractsReq>,
|
|
) -> Result<Response<Self::ListVMContractsStream>, Status> {
|
|
let req = req.into_inner();
|
|
info!("Node {} requested ListVMContractsStream", req.node_pubkey);
|
|
let contracts = self.data.find_contracts_by_admin_pubkey(&req.node_pubkey);
|
|
let (tx, rx) = mpsc::channel(6);
|
|
tokio::spawn(async move {
|
|
for contract in contracts {
|
|
let _ = tx.send(Ok(contract.into())).await;
|
|
}
|
|
});
|
|
let output_stream = ReceiverStream::new(rx);
|
|
Ok(Response::new(
|
|
Box::pin(output_stream) as Self::ListVMContractsStream
|
|
))
|
|
}
|
|
|
|
type GetUpdateVMReqStream = Pin<Box<dyn Stream<Item = Result<UpdateVmReq, Status>> + Send>>;
|
|
|
|
async fn get_update_vm_req(
|
|
&self,
|
|
req: Request<NodePubkey>,
|
|
) -> Result<Response<Self::GetUpdateVMReqStream>, Status> {
|
|
let req = req.into_inner();
|
|
info!("Daemon {} requested GetUpdateVMStream", req.node_pubkey);
|
|
let (grpc_tx, grpc_rx) = mpsc::channel(6);
|
|
let (data_tx, mut data_rx) = mpsc::channel(6);
|
|
self.data
|
|
.add_daemon_updatevm_tx(&req.node_pubkey, data_tx)
|
|
.await;
|
|
let data = self.data.clone();
|
|
tokio::spawn(async move {
|
|
while let Some(updatevmreq) = data_rx.recv().await {
|
|
debug!("Sending UpdateVMRequest to {}: {updatevmreq:?}", req.node_pubkey);
|
|
if let Err(e) = grpc_tx.send(Ok(updatevmreq.clone())).await {
|
|
warn!("Could not send UpdateVMRequest to {}: {e:?}", req.node_pubkey);
|
|
data.submit_update_vmconfirmation(UpdateVmResp {
|
|
error: "Daemon not connected.".to_string(),
|
|
uuid: updatevmreq.uuid,
|
|
timestamp: format!("{:?}", std::time::SystemTime::now()),
|
|
})
|
|
.await;
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
let output_stream = ReceiverStream::new(grpc_rx);
|
|
Ok(Response::new(
|
|
Box::pin(output_stream) as Self::GetUpdateVMReqStream
|
|
))
|
|
}
|
|
|
|
async fn send_update_vm_resp(
|
|
&self,
|
|
req: Request<Streaming<UpdateVmResp>>,
|
|
) -> Result<Response<Empty>, Status> {
|
|
debug!("Some node connected to stream NewVMResp");
|
|
let mut confirmations = req.into_inner();
|
|
while let Some(confirmation) = confirmations.next().await {
|
|
match confirmation {
|
|
Ok(mut c) => {
|
|
info!("Received confirmation from daemon: {c:?}");
|
|
c.timestamp = format!("{:?}", std::time::SystemTime::now());
|
|
self.data.submit_update_vmconfirmation(c).await;
|
|
}
|
|
Err(e) => {
|
|
log::warn!("Daemon disconnected from Streaming<NewVMResp>: {e:?}")
|
|
}
|
|
}
|
|
}
|
|
Ok(Response::new(Empty {}))
|
|
}
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl BrainCliService for BrainCliMock {
|
|
async fn create_vm_contract(
|
|
&self,
|
|
req: Request<NewVmReq>,
|
|
) -> Result<Response<NewVmResp>, Status> {
|
|
let req = req.into_inner();
|
|
info!("New VM requested via CLI: {req:?}");
|
|
let admin_pubkey = req.admin_pubkey.clone();
|
|
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
|
|
self.data.submit_newvmrequest(req, oneshot_tx).await;
|
|
match oneshot_rx.await {
|
|
Ok(response) => {
|
|
info!("Sending VM confirmation to {admin_pubkey}: {response:?}");
|
|
Ok(Response::new(response))
|
|
}
|
|
Err(e) => {
|
|
log::error!("Something weird happened. Reached error {e:?}");
|
|
Err(Status::unknown(
|
|
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn update_vm(
|
|
&self,
|
|
req: Request<UpdateVmReq>,
|
|
) -> Result<Response<UpdateVmResp>, Status> {
|
|
let req = req.into_inner();
|
|
info!("Update VM requested via CLI: {req:?}");
|
|
let node_pubkey = req.node_pubkey.clone();
|
|
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
|
|
self.data.submit_updatevmrequest(req, oneshot_tx).await;
|
|
match oneshot_rx.await {
|
|
Ok(response) => {
|
|
info!("Sending Update VM confirmation to {node_pubkey}: {response:?}");
|
|
Ok(Response::new(response))
|
|
}
|
|
Err(e) => {
|
|
log::error!("Something weird happened. Reached error {e:?}");
|
|
Err(Status::unknown(
|
|
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
type ListVMContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
|
|
async fn list_vm_contracts(
|
|
&self,
|
|
req: Request<ListVmContractsReq>,
|
|
) -> Result<Response<Self::ListVMContractsStream>, Status> {
|
|
let req = req.into_inner();
|
|
info!("CLI {} requested ListVMContractsStream", req.admin_pubkey);
|
|
let contracts = self.data.find_contracts_by_admin_pubkey(&req.admin_pubkey);
|
|
let (tx, rx) = mpsc::channel(6);
|
|
tokio::spawn(async move {
|
|
for contract in contracts {
|
|
let _ = tx.send(Ok(contract.into())).await;
|
|
}
|
|
});
|
|
let output_stream = ReceiverStream::new(rx);
|
|
Ok(Response::new(
|
|
Box::pin(output_stream) as Self::ListVMContractsStream
|
|
))
|
|
}
|
|
|
|
type ListNodesStream = Pin<Box<dyn Stream<Item = Result<NodeListResp, Status>> + Send>>;
|
|
async fn list_nodes(
|
|
&self,
|
|
req: Request<NodeFilters>,
|
|
) -> Result<Response<Self::ListNodesStream>, tonic::Status> {
|
|
let req = req.into_inner();
|
|
info!("Unknown CLI requested ListNodesStream: {req:?}");
|
|
let nodes = self.data.find_nodes_by_filters(&req);
|
|
let (tx, rx) = mpsc::channel(6);
|
|
tokio::spawn(async move {
|
|
for node in nodes {
|
|
let _ = tx.send(Ok(node.into())).await;
|
|
}
|
|
});
|
|
let output_stream = ReceiverStream::new(rx);
|
|
Ok(Response::new(
|
|
Box::pin(output_stream) as Self::ListNodesStream
|
|
))
|
|
}
|
|
|
|
async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
|
|
let req = req.into_inner();
|
|
info!("Unknown CLI requested to delete vm {}", req.uuid);
|
|
self.data.delete_vm(req).await;
|
|
Ok(Response::new(Empty {}))
|
|
}
|
|
|
|
|
|
}
|