#![allow(dead_code)] use crate::db; use detee_shared::app_proto::AppContract; use detee_shared::{ common_proto::{Empty, Pubkey}, general_proto::{ brain_general_cli_server::BrainGeneralCli, Account, AccountBalance, AirdropReq, BanUserReq, InspectOperatorResp, KickReq, KickResp, ListOperatorsResp, RegOperatorReq, ReportNodeReq, SlashReq, }, vm_proto::{brain_vm_cli_server::BrainVmCli, ListVmContractsReq, *}, }; use log::info; use std::pin::Pin; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; // use tokio::sync::mpsc; // use tokio_stream::{wrappers::ReceiverStream, Stream}; use tokio_stream::Stream; use tonic::{Request, Response, Status}; pub struct BrainGeneralCliMock {} impl From for AccountBalance { fn from(account: db::Account) -> Self { AccountBalance { balance: account.balance, tmp_locked: account.tmp_locked } } } impl From for VmContract { fn from(db_c: db::VmContractWithNode) -> Self { let mut exposed_ports = Vec::new(); for port in db_c.mapped_ports.iter() { exposed_ports.push(port.0); } VmContract { uuid: db_c.id.key().to_string(), hostname: db_c.hostname.clone(), admin_pubkey: db_c.admin.key().to_string(), node_pubkey: db_c.vm_node.id.key().to_string(), node_ip: db_c.vm_node.ip.clone(), location: format!( "{}, {}, {}", db_c.vm_node.city, db_c.vm_node.region, db_c.vm_node.country ), memory_mb: db_c.memory_mb, vcpus: db_c.vcpus, disk_size_gb: db_c.disk_size_gb, mapped_ports: db_c .mapped_ports .iter() .map(|(h, g)| MappedPort { host_port: *h, guest_port: *g }) .collect(), vm_public_ipv6: db_c.public_ipv6.clone(), vm_public_ipv4: db_c.public_ipv4.clone(), locked_nano: db_c.locked_nano, dtrfs_sha: db_c.dtrfs_sha.clone(), kernel_sha: db_c.kernel_sha.clone(), nano_per_minute: db_c.price_per_minute(), created_at: db_c.created_at.to_rfc3339(), updated_at: db_c.updated_at.to_rfc3339(), collected_at: db_c.collected_at.to_rfc3339(), } } } impl From for tonic::Status { fn from(e: db::Error) -> Self { Self::internal(format!("Internal error: {e}")) } } impl From for ListOperatorsResp { fn from(db_o: db::Operator) -> Self { ListOperatorsResp { pubkey: db_o.account.key().to_string(), escrow: db_o.escrow, email: db_o.email, app_nodes: db_o.app_nodes, vm_nodes: db_o.vm_nodes, reports: db_o.reports, } } } #[tonic::async_trait] impl BrainGeneralCli for BrainGeneralCliMock { type ListAccountsStream = Pin> + Send>>; type ListAllAppContractsStream = Pin> + Send>>; type ListAllVmContractsStream = Pin> + Send>>; type ListOperatorsStream = Pin> + Send>>; async fn get_balance(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; Ok(Response::new(db::account(&req.pubkey).await?.into())) } async fn report_node(&self, req: Request) -> Result, Status> { let _req = check_sig_from_req(req)?; todo!(); // match self.data.find_any_contract_by_uuid(&req.contract) { // Ok((Some(vm_contract), _)) // if vm_contract.admin_pubkey == req.admin_pubkey // && vm_contract.node_pubkey == req.node_pubkey => // { // () // } // Ok((_, Some(app_contract))) // if app_contract.admin_pubkey == req.admin_pubkey // && app_contract.node_pubkey == req.node_pubkey => // { // () // } // _ => return Err(Status::unauthenticated("No contract found by this ID.")), // }; // self.data.report_any_node(req.admin_pubkey, &req.node_pubkey, req.reason); // Ok(Response::new(Empty {})) } async fn list_operators( &self, req: Request, ) -> Result, Status> { let _ = check_sig_from_req(req)?; let operators = db::Operator::list().await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for op in operators { let _ = tx.send(Ok(op.into())).await; } }); let output_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(output_stream) as Self::ListOperatorsStream)) } async fn inspect_operator( &self, _req: Request, ) -> Result, Status> { todo!(); // match self.data.inspect_operator(&req.into_inner().pubkey) { // Some(op) => Ok(Response::new(op.into())), // None => Err(Status::not_found("The wallet you specified is not an operator")), // } } async fn register_operator( &self, _req: Request, ) -> Result, Status> { todo!(); // let req = check_sig_from_req(req)?; // info!("Regitering new operator: {req:?}"); // match self.data.register_operator(req) { // Ok(()) => Ok(Response::new(Empty {})), // Err(e) => Err(Status::failed_precondition(e.to_string())), // } } async fn kick_contract(&self, _req: Request) -> Result, Status> { todo!(); // let req = check_sig_from_req(req)?; // match self.data.kick_contract(&req.operator_wallet, &req.contract_uuid, &req.reason).await { // Ok(nano_lp) => Ok(Response::new(KickResp { nano_lp })), // Err(e) => Err(Status::permission_denied(e.to_string())), // } } async fn ban_user(&self, _req: Request) -> Result, Status> { todo!(); // let req = check_sig_from_req(req)?; // self.data.ban_user(&req.operator_wallet, &req.user_wallet); // Ok(Response::new(Empty {})) } // admin commands async fn airdrop(&self, _req: Request) -> Result, Status> { todo!(); // check_admin_key(&req)?; // let req = check_sig_from_req(req)?; // self.data.give_airdrop(&req.pubkey, req.tokens); // Ok(Response::new(Empty {})) } async fn slash(&self, _req: Request) -> Result, Status> { todo!(); // check_admin_key(&req)?; // let req = check_sig_from_req(req)?; // self.data.slash_account(&req.pubkey, req.tokens); // Ok(Response::new(Empty {})) } async fn list_accounts( &self, _req: Request, ) -> Result, Status> { todo!(); // check_admin_key(&req)?; // let _ = check_sig_from_req(req)?; // let accounts = self.data.list_accounts(); // let (tx, rx) = mpsc::channel(6); // tokio::spawn(async move { // for account in accounts { // let _ = tx.send(Ok(account.into())).await; // } // }); // let output_stream = ReceiverStream::new(rx); // Ok(Response::new(Box::pin(output_stream) as Self::ListAccountsStream)) } async fn list_all_vm_contracts( &self, _req: Request, ) -> Result, Status> { todo!(); // check_admin_key(&req)?; // let _ = check_sig_from_req(req)?; // let contracts = self.data.list_all_contracts(); // let (tx, rx) = mpsc::channel(6); // tokio::spawn(async move { // for contract in contracts { // let _ = tx.send(Ok(contract.into())).await; // } // }); // let output_stream = ReceiverStream::new(rx); // Ok(Response::new(Box::pin(output_stream) as Self::ListAllVmContractsStream)) } async fn list_all_app_contracts( &self, _req: tonic::Request, ) -> Result, Status> { todo!(); // check_admin_key(&req)?; // let _ = check_sig_from_req(req)?; // let contracts = self.data.list_all_app_contracts(); // let (tx, rx) = mpsc::channel(6); // tokio::spawn(async move { // for contract in contracts { // let _ = tx.send(Ok(contract.into())).await; // } // }); // let output_stream = ReceiverStream::new(rx); // Ok(Response::new(Box::pin(output_stream))) } } pub struct BrainVmCliMock {} #[tonic::async_trait] impl BrainVmCli for BrainVmCliMock { type ListVmContractsStream = Pin> + Send>>; type ListVmNodesStream = Pin> + Send>>; async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("New VM requested via CLI: {req:?}"); todo!(); // if self // .data // .is_user_banned_by_node(&req.admin_pubkey, &req.node_pubkey) // { // return Err(Status::permission_denied( // "This operator banned you. What did you do?", // )); // } // let admin_pubkey = req.admin_pubkey.clone(); // let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); // self.data.submit_newvm_req(req, oneshot_tx).await; // match oneshot_rx.await { // Ok(response) => { // info!("Sending VM confirmation to {admin_pubkey}: {response:?}"); // Ok(Response::new(response)) // } // Err(e) => { // log::error!("Something weird happened. Reached error {e:?}"); // Err(Status::unknown( // "Request failed due to unknown error. Please try again or contact the DeTEE devs team.", // )) // } // } } async fn update_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("Update VM requested via CLI: {req:?}"); todo!(); // let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); // self.data.submit_updatevm_req(req, oneshot_tx).await; // match oneshot_rx.await { // Ok(response) => { // info!("Sending UpdateVMResp: {response:?}"); // Ok(Response::new(response)) // } // Err(e) => Err(Status::unknown(format!( // "Update VM request failed due to error: {e}" // ))), // } } async fn extend_vm(&self, req: Request) -> Result, Status> { let _req = check_sig_from_req(req)?; todo!(); // match self // .data // .extend_vm_contract_time(&req.uuid, &req.admin_pubkey, req.locked_nano) // { // Ok(()) => Ok(Response::new(Empty {})), // Err(e) => Err(Status::unknown(format!("Could not extend contract: {e}"))), // } } async fn delete_vm(&self, req: Request) -> Result, Status> { let _req = check_sig_from_req(req)?; todo!(); // match self.data.delete_vm(req).await { // Ok(()) => Ok(Response::new(Empty {})), // Err(e) => Err(Status::not_found(e.to_string())), // } } async fn list_vm_contracts( &self, req: Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; info!( "CLI {} requested ListVMVmContractsStream. As operator: {}", req.wallet, req.as_operator ); let mut contracts = Vec::new(); if !req.uuid.is_empty() { if let Some(specific_contract) = db::VmContractWithNode::get_by_uuid(&req.uuid).await? { if specific_contract.admin.key().to_string() == req.wallet { contracts.push(specific_contract.into()); } // TODO: allow operator to inspect contracts } } else { if req.as_operator { contracts.append( &mut db::VmContractWithNode::list_by_operator(&req.wallet).await?.into(), ); } else { contracts .append(&mut db::VmContractWithNode::list_by_admin(&req.wallet).await?.into()); } } let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { let _ = tx.send(Ok(contract.into())).await; } }); let output_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(output_stream) as Self::ListVmContractsStream)) } async fn list_vm_nodes( &self, req: Request, ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("CLI requested ListVmNodesStream: {req:?}"); todo!(); // let nodes = self.data.find_vm_nodes_by_filters(&req); // let (tx, rx) = mpsc::channel(6); // tokio::spawn(async move { // for node in nodes { // let _ = tx.send(Ok(node.into())).await; // } // }); // let output_stream = ReceiverStream::new(rx); // Ok(Response::new( // Box::pin(output_stream) as Self::ListVmNodesStream // )) } async fn get_one_vm_node( &self, req: Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("Unknown CLI requested ListVmNodesStream: {req:?}"); todo!(); // match self.data.get_one_node_by_filters(&req) { // Some(node) => Ok(Response::new(node.into())), // None => Err(Status::not_found( // "Could not find any node based on your search criteria", // )), // } } } trait PubkeyGetter { fn get_pubkey(&self) -> Option; } macro_rules! impl_pubkey_getter { ($t:ty, $field:ident) => { impl PubkeyGetter for $t { fn get_pubkey(&self) -> Option { Some(self.$field.clone()) } } }; ($t:ty) => { impl PubkeyGetter for $t { fn get_pubkey(&self) -> Option { None } } }; } impl_pubkey_getter!(Pubkey, pubkey); impl_pubkey_getter!(NewVmReq, admin_pubkey); impl_pubkey_getter!(DeleteVmReq, admin_pubkey); impl_pubkey_getter!(ReportNodeReq, admin_pubkey); impl_pubkey_getter!(UpdateVmReq, admin_pubkey); impl_pubkey_getter!(ExtendVmReq, admin_pubkey); impl_pubkey_getter!(ListVmContractsReq, wallet); impl_pubkey_getter!(RegisterVmNodeReq, node_pubkey); impl_pubkey_getter!(VmNodeFilters); impl_pubkey_getter!(Empty); fn check_sig_from_req(req: Request) -> Result { let time = match req.metadata().get("timestamp") { Some(t) => t.clone(), None => return Err(Status::unauthenticated("Timestamp not found in metadata.")), }; let time = time .to_str() .map_err(|_| Status::unauthenticated("Timestamp in metadata is not a string"))?; let now = chrono::Utc::now(); let parsed_time = chrono::DateTime::parse_from_rfc3339(time) .map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?; let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds(); if seconds_elapsed > 4 || seconds_elapsed < -4 { return Err(Status::unauthenticated(format!( "Date is not within 4 sec of the time of the server: CLI {} vs Server {}", parsed_time, now ))); } let signature = match req.metadata().get("request-signature") { Some(t) => t, None => return Err(Status::unauthenticated("signature not found in metadata.")), }; let signature = bs58::decode(signature) .into_vec() .map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?; let signature = ed25519_dalek::Signature::from_bytes( signature .as_slice() .try_into() .map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?, ); let pubkey_value = match req.metadata().get("pubkey") { Some(p) => p.clone(), None => return Err(Status::unauthenticated("pubkey not found in metadata.")), }; let pubkey = ed25519_dalek::VerifyingKey::from_bytes( &bs58::decode(&pubkey_value) .into_vec() .map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))? .try_into() .map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?, ) .map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?; let req = req.into_inner(); let message = format!("{time}{req:?}"); use ed25519_dalek::Verifier; pubkey .verify(message.as_bytes(), &signature) .map_err(|_| Status::unauthenticated("the signature is not valid"))?; if let Some(req_pubkey) = req.get_pubkey() { if pubkey_value.to_str().unwrap().to_string() != req_pubkey { return Err(Status::unauthenticated( "pubkey of signature does not match pubkey of request", )); } } Ok(req) }