brain/src/grpc/vm.rs
ghe0 1ca66f3bc3
switch from LP to credits
As part of open sourcing the software product, we should consider that
loyalty points are not the best language. Switching to "credits" makes
sense from a lot of points of view.

At the same time, this change allows an achitectural change towards
slots. Slots allow daemon resources to get booked based on the HW ratio
configured in the daemon config.
2025-06-25 03:51:02 +03:00

457 lines
18 KiB
Rust

#![allow(dead_code)]
// SPDX-License-Identifier: Apache-2.0
use crate::constants::{ACCOUNT, DEFAULT_ENDPOINT, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE};
use crate::db::prelude as db;
use crate::grpc::{check_sig_from_parts, check_sig_from_req};
use detee_shared::common_proto::Empty;
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli;
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemon;
use detee_shared::vm_proto::{ListVmContractsReq, *};
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use log::info;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};
pub struct VmDaemonServer {
pub db: Arc<Surreal<Client>>,
}
impl VmDaemonServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db }
}
}
#[tonic::async_trait]
impl BrainVmDaemon for VmDaemonServer {
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainVmMessage, Status>> + Send>>;
type RegisterVmNodeStream = Pin<Box<dyn Stream<Item = Result<DeleteVmReq, Status>> + Send>>;
async fn register_vm_node(
&self,
req: Request<RegisterVmNodeReq>,
) -> Result<Response<Self::RegisterVmNodeStream>, Status> {
let req = check_sig_from_req(req)?;
info!("Starting registration process for {:?}", req);
db::VmNode {
id: surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())),
operator: surrealdb::RecordId::from((ACCOUNT, req.operator_wallet)),
pub_sub_node: std::env::var("BRAIN_PUBLIC_ENDPOINT")
.unwrap_or(DEFAULT_ENDPOINT.to_string()),
country: req.country,
region: req.region,
city: req.city,
ip: req.main_ip,
price: req.price,
avail_mem_mib: 0,
avail_vcpus: 0,
avail_storage_mib: 0,
avail_ipv4: 0,
avail_ipv6: 0,
avail_ports: 0,
max_ports_per_vm: 0,
disconnected_at: surrealdb::sql::Datetime::default(),
connected_at: surrealdb::sql::Datetime::default(),
}
.register(&self.db)
.await?;
info!("Sending deleted contracts to {}", req.node_pubkey);
let deleted_vms = db::DeletedVm::list_by_node(&self.db, &req.node_pubkey).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for deleted_vm in deleted_vms {
let _ = tx.send(Ok(deleted_vm.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream) as Self::RegisterVmNodeStream))
}
async fn brain_messages(
&self,
req: Request<DaemonStreamAuth>,
) -> Result<Response<Self::BrainMessagesStream>, Status> {
let auth = req.into_inner();
let pubkey = auth.pubkey.clone();
check_sig_from_parts(
&pubkey,
&auth.timestamp,
&format!("{:?}", auth.contracts),
&auth.signature,
)?;
info!("Daemon {} connected to receive brain messages", pubkey);
let _ = db::VmNode::set_online(&self.db, &pubkey).await;
let (tx, rx) = mpsc::channel(6);
{
let db = self.db.clone();
let pubkey = pubkey.clone();
let tx = tx.clone();
tokio::spawn(async move {
match db::live_vmnode_msgs::<db::DeletedVm>(&db, &pubkey, tx).await {
Ok(()) => log::info!("live_vmnode_msgs ended for {pubkey}"),
Err(e) => {
log::warn!("live_vmnode_msgs errored for {pubkey}: {e}")
}
};
});
}
{
let db = self.db.clone();
let pubkey = pubkey.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _ = db::live_vmnode_msgs::<db::NewVmReq>(&db, &pubkey, tx.clone()).await;
});
}
{
let db = self.db.clone();
let pubkey = pubkey.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _ = db::live_vmnode_msgs::<db::UpdateVmReq>(&db, &pubkey, tx.clone()).await;
});
}
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into()));
Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream))
}
async fn daemon_messages(
&self,
req: Request<Streaming<VmDaemonMessage>>,
) -> Result<Response<Empty>, Status> {
let mut req_stream = req.into_inner();
let pubkey: String;
if let Some(Ok(msg)) = req_stream.next().await {
log::debug!("demon_messages received the following auth message: {:?}", msg.msg);
if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg {
pubkey = auth.pubkey.clone();
check_sig_from_parts(
&pubkey,
&auth.timestamp,
&format!("{:?}", auth.contracts),
&auth.signature,
)?;
} else {
return Err(Status::unauthenticated(
"Could not authenticate the daemon: could not extract auth signature",
));
}
} else {
return Err(Status::unauthenticated("Could not authenticate the daemon"));
}
while let Some(daemon_message) = req_stream.next().await {
match daemon_message {
Ok(msg) => match msg.msg {
Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => {
if !new_vm_resp.error.is_empty() {
db::NewVmReq::submit_error(
&self.db,
&new_vm_resp.uuid,
new_vm_resp.error,
)
.await?;
} else {
db::upsert_record(
&self.db,
"measurement_args",
&new_vm_resp.uuid,
new_vm_resp.args.clone(),
)
.await?;
if let Some(args) = new_vm_resp.args {
db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?;
}
}
}
Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {
if !update_vm_resp.error.is_empty() {
db::UpdateVmReq::submit_error(
&self.db,
&update_vm_resp.uuid,
update_vm_resp.error,
)
.await?;
} else {
db::upsert_record(
&self.db,
"measurement_args",
&update_vm_resp.uuid,
update_vm_resp.args.clone(),
)
.await?;
db::ActiveVm::update(&self.db, &update_vm_resp.uuid).await?;
}
}
Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => {
let node_resources: db::VmNodeResources = node_resources.into();
node_resources.merge(&self.db, &pubkey).await?;
}
_ => {}
},
Err(e) => {
log::warn!("Daemon disconnected for {pubkey}: {e:?}");
let _ = db::VmNode::set_offline(&self.db, &pubkey).await;
}
}
}
Ok(Response::new(Empty {}))
}
}
pub struct VmCliServer {
pub db: Arc<Surreal<Client>>,
}
impl VmCliServer {
pub fn new(db: Arc<Surreal<Client>>) -> Self {
Self { db }
}
}
#[tonic::async_trait]
impl BrainVmCli for VmCliServer {
type ListVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
type ListVmNodesStream = Pin<Box<dyn Stream<Item = Result<VmNodeListResp, Status>> + Send>>;
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
let req = check_sig_from_req(req)?;
let id = surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone()));
if let Some(redirect) = db::check_pubsub_node(&self.db, id).await? {
log::info!("redirect: {redirect}");
return Err(redirect);
}
// TODO: make it atleast 1 hour
if req.locked_nano < 100 {
log::error!("locking lessthan 100 nano lps: {}", req.locked_nano);
return Err(Status::unknown("lock atleaset 100 nano lps"));
}
info!("New VM requested via CLI: {req:?}");
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
return Err(Status::permission_denied("This operator banned you. What did you do?"));
}
let db_req: db::NewVmReq = req.into();
let id = db_req.id.key().to_string();
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
let db = self.db.clone();
tokio::spawn(async move {
let _ = oneshot_tx.send(db::WrappedMeasurement::listen(&db, &id, NEW_VM_REQ).await);
});
db_req.submit(&self.db).await?;
match oneshot_rx.await {
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
"Network timeout. Please try again later or contact the DeTEE devs team.",
)),
Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())),
Err(e) => {
log::error!("Something weird happened during CLI NewVmReq. Reached error {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> {
let req = check_sig_from_req(req)?;
info!("Update VM requested via CLI: {req:?}");
let db_req: db::UpdateVmReq = req.clone().into();
println!("The node is {}", db_req.vm_node);
// TODO: vm_node is not known at this point. It is populated by `request_hw_update`.
// As such, the pubsub node cannot be checked at this stage. This code should be moved,
// however we are not working on the redirect mechanic at the moment
//
// if let Some(redirect) = db::check_pubsub_node(&self.db, db_req.vm_node.clone()).await? {
// log::info!("redirect: {redirect}");
// return Err(redirect);
// }
let id = db_req.id.key().to_string();
let mut hostname_changed = false;
if !req.hostname.is_empty() {
hostname_changed =
db::ActiveVm::change_hostname(&self.db, &req.uuid, &req.hostname).await?;
}
let hw_change_submitted = db_req.request_hw_update(&self.db).await?;
if hw_change_submitted.is_none() {
return Ok(Response::new(UpdateVmResp {
uuid: req.uuid.clone(),
error: "VM Contract does not exist.".to_string(),
args: None,
}));
}
let hw_change_needed = hw_change_submitted.unwrap();
if !hostname_changed && !hw_change_needed {
return Ok(Response::new(UpdateVmResp {
uuid: req.uuid.clone(),
error: "No modification required".to_string(),
args: None,
}));
}
// if only the hostname got changed, return a confirmation
if !hw_change_needed {
return Ok(Response::new(UpdateVmResp {
uuid: req.uuid.clone(),
error: String::new(),
args: None,
}));
}
// if HW changes got requested, wait for the new args
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
let db = self.db.clone();
tokio::spawn(async move {
let _ = oneshot_tx.send(db::WrappedMeasurement::listen(&db, &id, UPDATE_VM_REQ).await);
});
match oneshot_rx.await {
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
"Network timeout. Please try again later or contact the DeTEE devs team.",
)),
Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())),
Err(e) => {
log::error!("Something weird happened during CLI VM Update. Reached error {e:?}");
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn extend_vm(&self, req: Request<ExtendVmReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
match db::ActiveVm::extend_time(&self.db, &req.uuid, &req.admin_pubkey, req.locked_nano)
.await
{
Ok(()) => Ok(Response::new(Empty {})),
Err(e)
if matches!(
e,
db::Error::ContractNotFound
| db::Error::AccessDenied
| db::Error::InsufficientFunds
) =>
{
Err(Status::failed_precondition(e.to_string()))
}
Err(e) => {
log::error!("Error extending VM contract {}: {e}", &req.uuid);
Err(Status::unknown(format!("Could not extend contract: {e}")))
}
}
}
async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
let req = check_sig_from_req(req)?;
let vm_node = db::ActiveVm::get_by_uuid(&self.db, &req.uuid)
.await?
.ok_or(Status::permission_denied("Unauthorized"))?
.vm_node;
if let Some(redirect) = db::check_pubsub_node(&self.db, vm_node).await? {
log::info!("redirect: {redirect}");
return Err(redirect);
}
match db::ActiveVm::delete(&self.db, &req.admin_pubkey, &req.uuid).await {
Ok(()) => Ok(Response::new(Empty {})),
Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")),
Err(e) => {
log::error!("Error deleting VM contract {}: {e}", &req.uuid);
Err(Status::unknown(
"Unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn list_vm_contracts(
&self,
req: Request<ListVmContractsReq>,
) -> Result<Response<Self::ListVmContractsStream>, Status> {
let req = check_sig_from_req(req)?;
info!(
"CLI {} requested ListVMVmContractsStream. As operator: {}",
req.wallet, req.as_operator
);
let mut contracts = Vec::new();
if !req.uuid.is_empty() {
if let Some(specific_contract) =
db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await?
{
if specific_contract.admin.key().to_string() == req.wallet
|| specific_contract.vm_node.operator.key().to_string() == req.wallet
{
contracts.push(specific_contract);
}
}
} else if req.as_operator {
contracts
.append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?);
} else {
contracts
.append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?);
}
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for contract in contracts {
let _ = tx.send(Ok(contract.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream) as Self::ListVmContractsStream))
}
async fn list_vm_nodes(
&self,
req: Request<VmNodeFilters>,
) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> {
let req = check_sig_from_req(req)?;
info!("CLI requested ListVmNodesStream: {req:?}");
let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?;
let (tx, rx) = mpsc::channel(6);
tokio::spawn(async move {
for node in nodes {
let _ = tx.send(Ok(node.into())).await;
}
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream) as Self::ListVmNodesStream))
}
async fn get_one_vm_node(
&self,
req: Request<VmNodeFilters>,
) -> Result<Response<VmNodeListResp>, Status> {
let req = check_sig_from_req(req)?;
info!("Unknown CLI requested ListVmNodesStream: {req:?}");
// TODO: optimize this query so that it gets only one node
let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?;
if let Some(node) = nodes.into_iter().next() {
return Ok(Response::new(node.into()));
}
Err(Status::not_found("Could not find any node based on your search criteria"))
}
}