brain-mock/src/grpc.rs

510 lines
17 KiB
Rust

#![allow(dead_code)]
pub mod snp_proto {
tonic::include_proto!("vm_proto");
}
use crate::grpc::vm_daemon_message;
use crate::data::BrainData;
use log::info;
use snp_proto::brain_cli_server::BrainCli;
use snp_proto::brain_vm_daemon_server::BrainVmDaemon;
use snp_proto::*;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};
const ADMIN_ACCOUNTS: &[&str] = &[
"x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK",
"FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL",
];
pub struct BrainDaemonMock {
data: Arc<BrainData>,
}
impl BrainDaemonMock {
pub fn new(data: Arc<BrainData>) -> Self {
Self { data }
}
}
pub struct BrainCliMock {
data: Arc<BrainData>,
}
impl BrainCliMock {
pub fn new(data: Arc<BrainData>) -> Self {
Self { data }
}
}
#[tonic::async_trait]
impl BrainVmDaemon for BrainDaemonMock {
type RegisterVmNodeStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn register_vm_node(
&self,
req: Request<RegisterVmNodeReq>,
) -> Result<Response<Self::RegisterVmNodeStream>, Status> {
let req = check_sig_from_req(req)?;
info!("Starting registration process for {:?}", req);
let node = crate::data::VmNode {
public_key: req.node_pubkey.clone(),
owner_key: req.owner_pubkey,
country: req.country,
region: req.region,
city: req.city,
ip: req.main_ip,
price: req.price,
..Default::default()
};
self.data.insert_node(node);
info!("Sending existing contracts to {}", req.node_pubkey);
let contracts = self.data.find_contracts_by_node_pubkey(&req.node_pubkey);
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::RegisterVmNodeStream
))
}
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainVmMessage, Status>> + Send>>;
async fn brain_messages(
&self,
req: Request<DaemonStreamAuth>,
) -> Result<Response<Self::BrainMessagesStream>, Status> {
let auth = req.into_inner();
let pubkey = auth.pubkey.clone();
check_sig_from_parts(
&pubkey,
&auth.timestamp,
&format!("{:?}", auth.contracts),
&auth.signature,
)?;
info!("Daemon {} connected to receive brain messages", pubkey);
let (tx, rx) = mpsc::channel(6);
self.data.add_daemon_tx(&pubkey, tx);
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg));
Ok(Response::new(
Box::pin(output_stream) as Self::BrainMessagesStream
))
}
async fn daemon_messages(
&self,
req: Request<Streaming<VmDaemonMessage>>,
) -> Result<Response<Empty>, Status> {
let mut req_stream = req.into_inner();
let pubkey: String;
if let Some(Ok(msg)) = req_stream.next().await {
log::debug!(
"demon_messages received the following auth message: {:?}",
msg.msg
);
if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg {
pubkey = auth.pubkey.clone();
check_sig_from_parts(
&pubkey,
&auth.timestamp,
&format!("{:?}", auth.contracts),
&auth.signature,
)?;
} else {
return Err(Status::unauthenticated(
"Could not authenticate the daemon: could not extract auth signature",
));
}
} else {
return Err(Status::unauthenticated("Could not authenticate the daemon"));
}
// info!("Received a message from daemon {pubkey}: {daemon_message:?}");
while let Some(daemon_message) = req_stream.next().await {
match daemon_message {
Ok(msg) => match msg.msg {
Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => {
self.data.submit_newvm_resp(new_vm_resp).await;
}
Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {
self.data.submit_updatevm_resp(update_vm_resp).await;
}
Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => {
self.data.submit_node_resources(node_resources);
}
_ => {}
},
Err(e) => {
log::warn!("Daemon disconnected: {e:?}");
self.data.del_daemon_tx(&pubkey);
}
}
}
Ok(Response::new(Empty {}))
}
}
#[tonic::async_trait]
impl BrainCli for BrainCliMock {
async fn get_balance(&self, req: Request<Pubkey>) -> Result<Response<AccountBalance>, Status> {
let req = check_sig_from_req(req)?;
Ok(Response::new(self.data.get_balance(&req.pubkey).into()))
}
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
let req = check_sig_from_req(req)?;
info!("New VM requested via CLI: {req:?}");
let admin_pubkey = req.admin_pubkey.clone();
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
self.data.submit_newvm_req(req, oneshot_tx).await;
match oneshot_rx.await {
Ok(response) => {
info!("Sending VM confirmation to {admin_pubkey}: {response:?}");
Ok(Response::new(response))
}
Err(e) => {
log::error!("Something weird happened. Reached error {e:?}");
Err(Status::unknown(
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> {
let req = check_sig_from_req(req)?;
info!("Update VM requested via CLI: {req:?}");
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
self.data.submit_updatevm_req(req, oneshot_tx).await;
match oneshot_rx.await {
Ok(response) => {
info!("Sending UpdateVMResp: {response:?}");
Ok(Response::new(response))
}
Err(e) => Err(Status::unknown(format!(
"Update VM request failed due to error: {e}"
))),
}
}
async fn extend_vm(&self, req: Request<ExtendVmReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
match self
.data
.extend_vm_contract_time(&req.uuid, &req.admin_pubkey, req.locked_nano)
{
Ok(()) => Ok(Response::new(Empty {})),
Err(e) => Err(Status::unknown(format!("Could not extend contract: {e}"))),
}
}
type ListVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn list_vm_contracts(
&self,
req: Request<ListVmContractsReq>,
) -> Result<Response<Self::ListVmContractsStream>, Status> {
let req = check_sig_from_req(req)?;
info!("CLI {} requested ListVMVmContractsStream", req.admin_pubkey);
let contracts = match req.uuid.is_empty() {
false => match self.data.find_contract_by_uuid(&req.uuid) {
Some(contract) => vec![contract],
None => Vec::new(),
},
true => self.data.find_contracts_by_admin_pubkey(&req.admin_pubkey),
};
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListVmContractsStream
))
}
type ListVmNodesStream = Pin<Box<dyn Stream<Item = Result<VmNodeListResp, Status>> + Send>>;
async fn list_vm_nodes(
&self,
req: Request<VmNodeFilters>,
) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> {
let req = check_sig_from_req(req)?;
info!("CLI requested ListVmNodesStream: {req:?}");
let nodes = self.data.find_nodes_by_filters(&req);
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for node in nodes {
let _ = tx.send(Ok(node.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListVmNodesStream
))
}
async fn get_one_vm_node(
&self,
req: Request<VmNodeFilters>,
) -> Result<Response<VmNodeListResp>, Status> {
let req = check_sig_from_req(req)?;
info!("Unknown CLI requested ListVmNodesStream: {req:?}");
match self.data.get_one_node_by_filters(&req) {
Some(node) => Ok(Response::new(node.into())),
None => Err(Status::not_found(
"Could not find any node based on your search criteria",
)),
}
}
async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
info!("Unknown CLI requested to delete vm {}", req.uuid);
match self.data.delete_vm(req).await {
Ok(()) => Ok(Response::new(Empty {})),
Err(e) => Err(Status::not_found(e.to_string())),
}
}
async fn airdrop(&self, req: Request<AirdropReq>) -> Result<Response<Empty>, Status> {
check_admin_key(&req)?;
let req = check_sig_from_req(req)?;
self.data.give_airdrop(&req.pubkey, req.tokens);
Ok(Response::new(Empty{}))
}
type ListAllVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
async fn list_all_vm_contracts(
&self,
req: Request<Empty>,
) -> Result<Response<Self::ListVmContractsStream>, Status> {
check_admin_key(&req)?;
let _ = check_sig_from_req(req)?;
let contracts = self.data.list_all_contracts();
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListVmContractsStream
))
}
type ListAccountsStream = Pin<Box<dyn Stream<Item = Result<Account, Status>> + Send>>;
async fn list_accounts(
&self,
req: Request<Empty>,
) -> Result<Response<Self::ListAccountsStream>, Status> {
check_admin_key(&req)?;
let _ = check_sig_from_req(req)?;
let accounts = self.data.list_accounts();
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for account in accounts {
let _ = tx.send(Ok(account.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::ListAccountsStream
))
}
}
trait PubkeyGetter {
fn get_pubkey(&self) -> Option<String>;
}
impl PubkeyGetter for Pubkey {
fn get_pubkey(&self) -> Option<String> {
Some(self.pubkey.clone())
}
}
impl PubkeyGetter for NewVmReq {
fn get_pubkey(&self) -> Option<String> {
Some(self.admin_pubkey.clone())
}
}
impl PubkeyGetter for DeleteVmReq {
fn get_pubkey(&self) -> Option<String> {
Some(self.admin_pubkey.clone())
}
}
impl PubkeyGetter for UpdateVmReq {
fn get_pubkey(&self) -> Option<String> {
Some(self.admin_pubkey.clone())
}
}
impl PubkeyGetter for ExtendVmReq {
fn get_pubkey(&self) -> Option<String> {
Some(self.admin_pubkey.clone())
}
}
impl PubkeyGetter for ListVmContractsReq {
fn get_pubkey(&self) -> Option<String> {
Some(self.admin_pubkey.clone())
}
}
impl PubkeyGetter for VmNodeFilters {
fn get_pubkey(&self) -> Option<String> {
None
}
}
impl PubkeyGetter for RegisterVmNodeReq {
fn get_pubkey(&self) -> Option<String> {
Some(self.node_pubkey.clone())
}
}
impl PubkeyGetter for Empty {
fn get_pubkey(&self) -> Option<String> {
None
}
}
impl PubkeyGetter for AirdropReq {
fn get_pubkey(&self) -> Option<String> {
None
}
}
fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Result<T, Status> {
let time = match req.metadata().get("timestamp") {
Some(t) => t.clone(),
None => return Err(Status::unauthenticated("Timestamp not found in metadata.")),
};
let time = time
.to_str()
.map_err(|_| Status::unauthenticated("Timestamp in metadata is not a string"))?;
let now = chrono::Utc::now();
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds();
if seconds_elapsed > 1 || seconds_elapsed < -1 {
return Err(Status::unauthenticated(format!(
"Date is not within 1 sec of the time of the server: CLI {} vs Server {}",
parsed_time, now
)));
}
let signature = match req.metadata().get("request-signature") {
Some(t) => t,
None => return Err(Status::unauthenticated("signature not found in metadata.")),
};
let signature = bs58::decode(signature)
.into_vec()
.map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?;
let signature = ed25519_dalek::Signature::from_bytes(
signature
.as_slice()
.try_into()
.map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?,
);
let pubkey_value = match req.metadata().get("pubkey") {
Some(p) => p.clone(),
None => return Err(Status::unauthenticated("pubkey not found in metadata.")),
};
let pubkey = ed25519_dalek::VerifyingKey::from_bytes(
&bs58::decode(&pubkey_value)
.into_vec()
.map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))?
.try_into()
.map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?,
)
.map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?;
let req = req.into_inner();
let message = format!("{time}{req:?}");
use ed25519_dalek::Verifier;
pubkey
.verify(message.as_bytes(), &signature)
.map_err(|_| Status::unauthenticated("the signature is not valid"))?;
if let Some(req_pubkey) = req.get_pubkey() {
if pubkey_value.to_str().unwrap().to_string() != req_pubkey {
return Err(Status::unauthenticated(
"pubkey of signature does not match pubkey of request",
));
}
}
Ok(req)
}
fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> {
let now = chrono::Utc::now();
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds();
if seconds_elapsed > 1 || seconds_elapsed < -1 {
return Err(Status::unauthenticated(format!(
"Date is not within 1 sec of the time of the server: CLI {} vs Server {}",
parsed_time, now
)));
}
let signature = bs58::decode(sig)
.into_vec()
.map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?;
let signature = ed25519_dalek::Signature::from_bytes(
signature
.as_slice()
.try_into()
.map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?,
);
let pubkey = ed25519_dalek::VerifyingKey::from_bytes(
&bs58::decode(&pubkey)
.into_vec()
.map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))?
.try_into()
.map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?,
)
.map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?;
let msg = time.to_string() + msg;
use ed25519_dalek::Verifier;
pubkey
.verify(msg.as_bytes(), &signature)
.map_err(|_| Status::unauthenticated("the signature is not valid"))?;
Ok(())
}
fn check_admin_key<T>(req: &Request<T>) -> Result<(), Status> {
let pubkey = match req.metadata().get("pubkey") {
Some(p) => p.clone(),
None => return Err(Status::unauthenticated("pubkey not found in metadata.")),
};
let pubkey = pubkey
.to_str()
.map_err(|_| Status::unauthenticated("could not parse pubkey metadata to str"))?;
if !ADMIN_ACCOUNTS.contains(&pubkey) {
return Err(Status::unauthenticated(
"This operation is reserved to admin accounts",
));
}
Ok(())
}