brain-mock/src/grpc.rs

252 lines
8.9 KiB
Rust

#![allow(dead_code)]
pub mod snp_proto {
tonic::include_proto!("snp_proto");
}
use crate::data::BrainData;
use log::info;
use snp_proto::brain_cli_server::BrainCli;
use snp_proto::brain_daemon_server::BrainDaemon;
use snp_proto::*;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};
pub struct BrainDaemonMock {
data: Arc<BrainData>,
}
impl BrainDaemonMock {
pub fn new(data: Arc<BrainData>) -> Self {
Self { data }
}
}
pub struct BrainCliMock {
data: Arc<BrainData>,
}
impl BrainCliMock {
pub fn new(data: Arc<BrainData>) -> Self {
Self { data }
}
}
#[tonic::async_trait]
impl BrainDaemon for BrainDaemonMock {
type RegisterNodeStream = Pin<Box<dyn Stream<Item = Result<Contract, Status>> + Send>>;
async fn register_node(
&self,
req: Request<RegisterNodeReq>,
) -> Result<Response<Self::RegisterNodeStream>, Status> {
let req = req.into_inner();
info!("Starting registration process for {:?}", req);
let node = crate::data::Node {
public_key: req.node_pubkey.clone(),
owner_key: req.owner_pubkey,
country: req.country,
region: req.region,
city: req.city,
ip: req.main_ip,
price: req.price,
..Default::default()
};
self.data.insert_node(node);
info!("Sending existing contracts to {}", req.node_pubkey);
let contracts = self.data.find_contracts_by_node_pubkey(&req.node_pubkey);
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::RegisterNodeStream
))
}
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainMessage, Status>> + Send>>;
async fn brain_messages(
&self,
req: Request<Pubkey>,
) -> Result<Response<Self::BrainMessagesStream>, Status> {
let req = req.into_inner();
info!("Daemon {} connected to receive brain messages", req.pubkey);
let (tx, rx) = mpsc::channel(6);
self.data.add_daemon_tx(&req.pubkey, tx);
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
Ok(Response::new(
Box::pin(output_stream) as Self::BrainMessagesStream
))
}
async fn daemon_messages(
&self,
req: Request<Streaming<DaemonMessage>>,
) -> Result<Response<Empty>, Status> {
let mut req_stream = req.into_inner();
let mut pubkey = String::new();
while let Some(daemon_message) = req_stream.next().await {
info!("Received a message from daemon {pubkey}: {daemon_message:?}");
match daemon_message {
Ok(msg) => match msg.msg {
Some(daemon_message::Msg::Pubkey(p)) => {
pubkey = p.pubkey;
}
Some(daemon_message::Msg::NewVmResp(new_vm_resp)) => {
self.data.submit_newvm_resp(new_vm_resp).await;
}
Some(daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {
self.data.submit_updatevm_resp(update_vm_resp).await;
}
Some(daemon_message::Msg::NodeResources(node_resources)) => {
self.data.submit_node_resources(node_resources);
}
None => {}
},
Err(e) => {
log::warn!("Daemon disconnected: {e:?}");
self.data.del_daemon_tx(&pubkey);
}
}
}
Ok(Response::new(Empty {}))
}
}
#[tonic::async_trait]
impl BrainCli for BrainCliMock {
async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> {
Ok(Response::new(
self.data.get_balance(&req.into_inner().pubkey).into(),
))
}
async fn get_airdrop(&self, req: Request<Pubkey>) -> Result<Response<Empty>, Status> {
self.data.get_airdrop(&req.into_inner().pubkey);
Ok(Response::new(Empty {}))
}
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
let req = req.into_inner();
info!("New VM requested via CLI: {req:?}");
let admin_pubkey = req.admin_pubkey.clone();
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
self.data.submit_newvm_req(req, oneshot_tx).await;
match oneshot_rx.await {
Ok(response) => {
info!("Sending VM confirmation to {admin_pubkey}: {response:?}");
Ok(Response::new(response))
}
Err(e) => {
log::error!("Something weird happened. Reached error {e:?}");
Err(Status::unknown(
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> {
let req = req.into_inner();
info!("Update VM requested via CLI: {req:?}");
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
self.data.submit_updatevm_req(req, oneshot_tx).await;
match oneshot_rx.await {
Ok(response) => {
info!("Sending UpdateVMResp: {response:?}");
Ok(Response::new(response))
}
Err(e) => {
log::error!("Something weird happened. Reached error {e:?}");
Err(Status::unknown(
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn extend_vm(&self, req: Request<ExtendVmReq>) -> Result<Response<Empty>, Status> {
let req = req.into_inner();
match self
.data
.extend_contract_time(&req.uuid, &req.admin_pubkey, req.locked_nano)
{
Ok(()) => Ok(Response::new(Empty {})),
Err(e) => Err(Status::unknown(format!("Could not extend contract: {e}"))),
}
}
type ListContractsStream = Pin<Box<dyn Stream<Item = Result<Contract, Status>> + Send>>;
async fn list_contracts(
&self,
req: Request<ListContractsReq>,
) -> Result<Response<Self::ListContractsStream>, Status> {
let req = req.into_inner();
info!("CLI {} requested ListVMContractsStream", req.admin_pubkey);
let contracts = match req.uuid.is_empty() {
false => match self.data.find_contract_by_uuid(&req.uuid) {
Some(contract) => vec![contract],
None => Vec::new(),
},
true => self.data.find_contracts_by_admin_pubkey(&req.admin_pubkey),
};
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListContractsStream
))
}
type ListNodesStream = Pin<Box<dyn Stream<Item = Result<NodeListResp, Status>> + Send>>;
async fn list_nodes(
&self,
req: Request<NodeFilters>,
) -> Result<Response<Self::ListNodesStream>, tonic::Status> {
let req = req.into_inner();
info!("Unknown CLI requested ListNodesStream: {req:?}");
let nodes = self.data.find_nodes_by_filters(&req);
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for node in nodes {
let _ = tx.send(Ok(node.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListNodesStream
))
}
async fn get_one_node(
&self,
req: Request<NodeFilters>,
) -> Result<Response<NodeListResp>, Status> {
let req = req.into_inner();
info!("Unknown CLI requested ListNodesStream: {req:?}");
match self.data.get_one_node_by_filters(&req) {
Some(node) => Ok(Response::new(node.into())),
None => Err(Status::not_found(
"Could not find any node based on your search criteria",
)),
}
}
async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
let req = req.into_inner();
info!("Unknown CLI requested to delete vm {}", req.uuid);
self.data.delete_vm(req).await;
Ok(Response::new(Empty {}))
}
}