#![allow(dead_code)] // SPDX-License-Identifier: Apache-2.0 use crate::constants::{ACCOUNT, DEFAULT_ENDPOINT, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE}; use crate::db::prelude as db; use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use detee_shared::common_proto::Empty; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemon; use detee_shared::vm_proto::{ListVmContractsReq, *}; use surrealdb::engine::remote::ws::Client; use surrealdb::Surreal; use log::info; use std::pin::Pin; use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; pub struct VmDaemonServer { pub db: Arc>, } impl VmDaemonServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] impl BrainVmDaemon for VmDaemonServer { type BrainMessagesStream = Pin> + Send>>; type RegisterVmNodeStream = Pin> + Send>>; async fn register_vm_node( &self, req: Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("Starting registration process for {:?}", req); db::VmNode { id: surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())), operator: surrealdb::RecordId::from((ACCOUNT, req.operator_wallet)), pub_sub_node: std::env::var("BRAIN_PUBLIC_ENDPOINT") .unwrap_or(DEFAULT_ENDPOINT.to_string()), country: req.country, region: req.region, city: req.city, ip: req.main_ip, price: req.price, avail_mem_mb: 0, avail_vcpus: 0, avail_storage_gbs: 0, avail_ipv4: 0, avail_ipv6: 0, avail_ports: 0, max_ports_per_vm: 0, disconnected_at: surrealdb::sql::Datetime::default(), connected_at: surrealdb::sql::Datetime::default(), } .register(&self.db) .await?; info!("Sending deleted contracts to {}", req.node_pubkey); let deleted_vms = db::DeletedVm::list_by_node(&self.db, &req.node_pubkey).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for deleted_vm in deleted_vms { let _ = tx.send(Ok(deleted_vm.into())).await; } }); let output_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(output_stream) as Self::RegisterVmNodeStream)) } async fn brain_messages( &self, req: Request, ) -> Result, Status> { let auth = req.into_inner(); let pubkey = auth.pubkey.clone(); check_sig_from_parts( &pubkey, &auth.timestamp, &format!("{:?}", auth.contracts), &auth.signature, )?; info!("Daemon {} connected to receive brain messages", pubkey); let _ = db::VmNode::set_online(&self.db, &pubkey).await; let (tx, rx) = mpsc::channel(6); { let db = self.db.clone(); let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { match db::live_vmnode_msgs::(&db, &pubkey, tx).await { Ok(()) => log::info!("live_vmnode_msgs ended for {pubkey}"), Err(e) => { log::warn!("live_vmnode_msgs errored for {pubkey}: {e}") } }; }); } { let db = self.db.clone(); let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { let _ = db::live_vmnode_msgs::(&db, &pubkey, tx.clone()).await; }); } { let db = self.db.clone(); let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { let _ = db::live_vmnode_msgs::(&db, &pubkey, tx.clone()).await; }); } let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into())); Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream)) } async fn daemon_messages( &self, req: Request>, ) -> Result, Status> { let mut req_stream = req.into_inner(); let pubkey: String; if let Some(Ok(msg)) = req_stream.next().await { log::debug!("demon_messages received the following auth message: {:?}", msg.msg); if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg { pubkey = auth.pubkey.clone(); check_sig_from_parts( &pubkey, &auth.timestamp, &format!("{:?}", auth.contracts), &auth.signature, )?; } else { return Err(Status::unauthenticated( "Could not authenticate the daemon: could not extract auth signature", )); } } else { return Err(Status::unauthenticated("Could not authenticate the daemon")); } while let Some(daemon_message) = req_stream.next().await { match daemon_message { Ok(msg) => match msg.msg { Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { if !new_vm_resp.error.is_empty() { db::NewVmReq::submit_error( &self.db, &new_vm_resp.uuid, new_vm_resp.error, ) .await?; } else { db::upsert_record( &self.db, "measurement_args", &new_vm_resp.uuid, new_vm_resp.args.clone(), ) .await?; if let Some(args) = new_vm_resp.args { db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?; } } } Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { if !update_vm_resp.error.is_empty() { db::UpdateVmReq::submit_error( &self.db, &update_vm_resp.uuid, update_vm_resp.error, ) .await?; } else { db::upsert_record( &self.db, "measurement_args", &update_vm_resp.uuid, update_vm_resp.args.clone(), ) .await?; db::ActiveVm::update(&self.db, &update_vm_resp.uuid).await?; } } Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { let node_resources: db::VmNodeResources = node_resources.into(); node_resources.merge(&self.db, &pubkey).await?; } _ => {} }, Err(e) => { log::warn!("Daemon disconnected for {pubkey}: {e:?}"); let _ = db::VmNode::set_offline(&self.db, &pubkey).await; } } } Ok(Response::new(Empty {})) } } pub struct VmCliServer { pub db: Arc>, } impl VmCliServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] impl BrainVmCli for VmCliServer { type ListVmContractsStream = Pin> + Send>>; type ListVmNodesStream = Pin> + Send>>; async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; let id = surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())); if let Some(redirect) = db::check_pubsub_node(&self.db, id).await? { log::info!("redirect: {redirect}"); return Err(redirect); } // TODO: make it atleast 1 hour if req.locked_nano < 100 { log::error!("locking lessthan 100 nano lps: {}", req.locked_nano); return Err(Status::unknown("lock atleaset 100 nano lps")); } info!("New VM requested via CLI: {req:?}"); if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); } let db_req: db::NewVmReq = req.into(); let id = db_req.id.key().to_string(); let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); let db = self.db.clone(); tokio::spawn(async move { let _ = oneshot_tx.send(db::WrappedMeasurement::listen(&db, &id, NEW_VM_REQ).await); }); db_req.submit(&self.db).await?; match oneshot_rx.await { Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( "Network timeout. Please try again later or contact the DeTEE devs team.", )), Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), Err(e) => { log::error!("Something weird happened during CLI NewVmReq. Reached error {e:?}"); Err(Status::unknown( "Unknown error. Please try again or contact the DeTEE devs team.", )) } } } async fn update_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("Update VM requested via CLI: {req:?}"); let db_req: db::UpdateVmReq = req.clone().into(); if let Some(redirect) = db::check_pubsub_node(&self.db, db_req.vm_node.clone()).await? { log::info!("redirect: {redirect}"); return Err(redirect); } let id = db_req.id.key().to_string(); let mut hostname_changed = false; if !req.hostname.is_empty() { hostname_changed = db::ActiveVm::change_hostname(&self.db, &req.uuid, &req.hostname).await?; } let hw_change_submitted = db_req.request_hw_update(&self.db).await?; if hw_change_submitted.is_none() { return Ok(Response::new(UpdateVmResp { uuid: req.uuid.clone(), error: "VM Contract does not exist.".to_string(), args: None, })); } let hw_change_needed = hw_change_submitted.unwrap(); if !hostname_changed && !hw_change_needed { return Ok(Response::new(UpdateVmResp { uuid: req.uuid.clone(), error: "No modification required".to_string(), args: None, })); } // if only the hostname got changed, return a confirmation if !hw_change_needed { return Ok(Response::new(UpdateVmResp { uuid: req.uuid.clone(), error: String::new(), args: None, })); } // if HW changes got requested, wait for the new args let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); let db = self.db.clone(); tokio::spawn(async move { let _ = oneshot_tx.send(db::WrappedMeasurement::listen(&db, &id, UPDATE_VM_REQ).await); }); match oneshot_rx.await { Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( "Network timeout. Please try again later or contact the DeTEE devs team.", )), Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), Err(e) => { log::error!("Something weird happened during CLI VM Update. Reached error {e:?}"); Err(Status::unknown( "Unknown error. Please try again or contact the DeTEE devs team.", )) } } } async fn extend_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; match db::ActiveVm::extend_time(&self.db, &req.uuid, &req.admin_pubkey, req.locked_nano) .await { Ok(()) => Ok(Response::new(Empty {})), Err(e) if matches!( e, db::Error::ContractNotFound | db::Error::AccessDenied | db::Error::InsufficientFunds ) => { Err(Status::failed_precondition(e.to_string())) } Err(e) => { log::error!("Error extending VM contract {}: {e}", &req.uuid); Err(Status::unknown(format!("Could not extend contract: {e}"))) } } } async fn delete_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; let vm_node = db::ActiveVm::get_by_uuid(&self.db, &req.uuid) .await? .ok_or(Status::permission_denied("Unauthorized"))? .vm_node; if let Some(redirect) = db::check_pubsub_node(&self.db, vm_node).await? { log::info!("redirect: {redirect}"); return Err(redirect); } match db::ActiveVm::delete(&self.db, &req.admin_pubkey, &req.uuid).await { Ok(()) => Ok(Response::new(Empty {})), Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")), Err(e) => { log::error!("Error deleting VM contract {}: {e}", &req.uuid); Err(Status::unknown( "Unknown error. Please try again or contact the DeTEE devs team.", )) } } } async fn list_vm_contracts( &self, req: Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; info!( "CLI {} requested ListVMVmContractsStream. As operator: {}", req.wallet, req.as_operator ); let mut contracts = Vec::new(); if !req.uuid.is_empty() { if let Some(specific_contract) = db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? { if specific_contract.admin.key().to_string() == req.wallet || specific_contract.vm_node.operator.key().to_string() == req.wallet { contracts.push(specific_contract); } } } else if req.as_operator { contracts .append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?); } else { contracts .append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?); } let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { let _ = tx.send(Ok(contract.into())).await; } }); let output_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(output_stream) as Self::ListVmContractsStream)) } async fn list_vm_nodes( &self, req: Request, ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("CLI requested ListVmNodesStream: {req:?}"); let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for node in nodes { let _ = tx.send(Ok(node.into())).await; } }); let output_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(output_stream) as Self::ListVmNodesStream)) } async fn get_one_vm_node( &self, req: Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("Unknown CLI requested ListVmNodesStream: {req:?}"); // TODO: optimize this query so that it gets only one node let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; if let Some(node) = nodes.into_iter().next() { return Ok(Response::new(node.into())); } Err(Status::not_found("Could not find any node based on your search criteria")) } }