#![allow(dead_code)] use crate::constants::{ACCOUNT, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE}; use crate::db::prelude as db; use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use detee_shared::common_proto::Empty; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli; use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemon; use detee_shared::vm_proto::{ListVmContractsReq, *}; use surrealdb::engine::remote::ws::Client; use surrealdb::Surreal; use log::info; use std::pin::Pin; use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; pub struct VmDaemonServer { pub db: Arc>, } impl VmDaemonServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] impl BrainVmDaemon for VmDaemonServer { type BrainMessagesStream = Pin> + Send>>; type RegisterVmNodeStream = Pin> + Send>>; async fn register_vm_node( &self, req: Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("Starting registration process for {:?}", req); db::VmNode { id: surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())), operator: surrealdb::RecordId::from((ACCOUNT, req.operator_wallet)), country: req.country, region: req.region, city: req.city, ip: req.main_ip, price: req.price, avail_mem_mb: 0, avail_vcpus: 0, avail_storage_gbs: 0, avail_ipv4: 0, avail_ipv6: 0, avail_ports: 0, max_ports_per_vm: 0, offline_minutes: 0, } .register(&self.db) .await?; info!("Sending existing contracts to {}", req.node_pubkey); let contracts = db::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { let _ = tx.send(Ok(contract.into())).await; } }); let output_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(output_stream) as Self::RegisterVmNodeStream)) } async fn brain_messages( &self, req: Request, ) -> Result, Status> { let auth = req.into_inner(); let pubkey = auth.pubkey.clone(); check_sig_from_parts( &pubkey, &auth.timestamp, &format!("{:?}", auth.contracts), &auth.signature, )?; info!("Daemon {} connected to receive brain messages", pubkey); let (tx, rx) = mpsc::channel(6); { let db = self.db.clone(); let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { match db::live_vmnode_msgs::(&db, &pubkey, tx).await { Ok(()) => log::info!("live_vmnode_msgs ended for {pubkey}"), Err(e) => { log::warn!("live_vmnode_msgs errored for {pubkey}: {e}") } }; }); } { let db = self.db.clone(); let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { let _ = db::live_vmnode_msgs::(&db, &pubkey, tx.clone()).await; }); } { let db = self.db.clone(); let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { let _ = db::live_vmnode_msgs::(&db, &pubkey, tx.clone()).await; }); } let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into())); Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream)) } async fn daemon_messages( &self, req: Request>, ) -> Result, Status> { let mut req_stream = req.into_inner(); let pubkey: String; if let Some(Ok(msg)) = req_stream.next().await { log::debug!("demon_messages received the following auth message: {:?}", msg.msg); if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg { pubkey = auth.pubkey.clone(); check_sig_from_parts( &pubkey, &auth.timestamp, &format!("{:?}", auth.contracts), &auth.signature, )?; } else { return Err(Status::unauthenticated( "Could not authenticate the daemon: could not extract auth signature", )); } } else { return Err(Status::unauthenticated("Could not authenticate the daemon")); } while let Some(daemon_message) = req_stream.next().await { match daemon_message { Ok(msg) => match msg.msg { Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => { if !new_vm_resp.error.is_empty() { db::NewVmReq::submit_error( &self.db, &new_vm_resp.uuid, new_vm_resp.error, ) .await?; } else { db::upsert_record( &self.db, "measurement_args", &new_vm_resp.uuid, new_vm_resp.args.clone(), ) .await?; if let Some(args) = new_vm_resp.args { db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?; } } } Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => { if !update_vm_resp.error.is_empty() { db::UpdateVmReq::submit_error( &self.db, &update_vm_resp.uuid, update_vm_resp.error, ) .await?; } else { db::upsert_record( &self.db, "measurement_args", &update_vm_resp.uuid, update_vm_resp.args.clone(), ) .await?; db::ActiveVm::update(&self.db, &update_vm_resp.uuid).await?; } } Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { let node_resources: db::VmNodeResources = node_resources.into(); node_resources.merge(&self.db, &pubkey).await?; } _ => {} }, Err(e) => { log::warn!("Daemon disconnected: {e:?}"); } } } Ok(Response::new(Empty {})) } } pub struct VmCliServer { pub db: Arc>, } impl VmCliServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] impl BrainVmCli for VmCliServer { type ListVmContractsStream = Pin> + Send>>; type ListVmNodesStream = Pin> + Send>>; async fn new_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("New VM requested via CLI: {req:?}"); if db::general::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey) .await? { return Err(Status::permission_denied("This operator banned you. What did you do?")); } let db_req: db::NewVmReq = req.into(); let id = db_req.id.key().to_string(); let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); let db = self.db.clone(); tokio::spawn(async move { let _ = oneshot_tx.send(db::WrappedMeasurement::listen(&db, &id, NEW_VM_REQ).await); }); db_req.submit(&self.db).await?; match oneshot_rx.await { Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( "Network timeout. Please try again later or contact the DeTEE devs team.", )), Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), Err(e) => { log::error!("Something weird happened during CLI NewVmReq. Reached error {e:?}"); Err(Status::unknown( "Unknown error. Please try again or contact the DeTEE devs team.", )) } } } async fn update_vm(&self, req: Request) -> Result, Status> { let req = check_sig_from_req(req)?; info!("Update VM requested via CLI: {req:?}"); let db_req: db::UpdateVmReq = req.clone().into(); let id = db_req.id.key().to_string(); let mut hostname_changed = false; if !req.hostname.is_empty() { hostname_changed = db::ActiveVm::change_hostname(&self.db, &req.uuid, &req.hostname).await?; } let hw_change_submitted = db_req.request_hw_update(&self.db).await?; if hw_change_submitted.is_none() { return Ok(Response::new(UpdateVmResp { uuid: req.uuid.clone(), error: "VM Contract does not exist.".to_string(), args: None, })); } let hw_change_needed = hw_change_submitted.unwrap(); if !hostname_changed && !hw_change_needed { return Ok(Response::new(UpdateVmResp { uuid: req.uuid.clone(), error: "No modification required".to_string(), args: None, })); } // if only the hostname got changed, return a confirmation if !hw_change_needed { return Ok(Response::new(UpdateVmResp { uuid: req.uuid.clone(), error: String::new(), args: None, })); } // if HW changes got requested, wait for the new args let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); let db = self.db.clone(); tokio::spawn(async move { let _ = oneshot_tx.send(db::WrappedMeasurement::listen(&db, &id, UPDATE_VM_REQ).await); }); match oneshot_rx.await { Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded( "Network timeout. Please try again later or contact the DeTEE devs team.", )), Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())), Err(e) => { log::error!("Something weird happened during CLI VM Update. Reached error {e:?}"); Err(Status::unknown( "Unknown error. Please try again or contact the DeTEE devs team.", )) } } } async fn extend_vm(&self, req: Request) -> Result, Status> { let _req = check_sig_from_req(req)?; todo!(); // match self // .data // .extend_vm_contract_time(&req.uuid, &req.admin_pubkey, req.locked_nano) // { // Ok(()) => Ok(Response::new(Empty {})), // Err(e) => Err(Status::unknown(format!("Could not extend contract: {e}"))), // } } async fn delete_vm(&self, req: Request) -> Result, Status> { let _req = check_sig_from_req(req)?; todo!(); // match self.data.delete_vm(req).await { // Ok(()) => Ok(Response::new(Empty {})), // Err(e) => Err(Status::not_found(e.to_string())), // } } async fn list_vm_contracts( &self, req: Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; info!( "CLI {} requested ListVMVmContractsStream. As operator: {}", req.wallet, req.as_operator ); let mut contracts = Vec::new(); if !req.uuid.is_empty() { if let Some(specific_contract) = db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? { if specific_contract.admin.key().to_string() == req.wallet { contracts.push(specific_contract); } // TODO: allow operator to inspect contracts } } else if req.as_operator { contracts .append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?); } else { contracts .append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?); } let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { let _ = tx.send(Ok(contract.into())).await; } }); let output_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(output_stream) as Self::ListVmContractsStream)) } async fn list_vm_nodes( &self, req: Request, ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("CLI requested ListVmNodesStream: {req:?}"); let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for node in nodes { let _ = tx.send(Ok(node.into())).await; } }); let output_stream = ReceiverStream::new(rx); Ok(Response::new(Box::pin(output_stream) as Self::ListVmNodesStream)) } async fn get_one_vm_node( &self, req: Request, ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("Unknown CLI requested ListVmNodesStream: {req:?}"); // TODO: optimize this query so that it gets only one node let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; if let Some(node) = nodes.into_iter().next() { return Ok(Response::new(node.into())); } Err(Status::not_found("Could not find any node based on your search criteria")) } }