455 lines
18 KiB
Rust
455 lines
18 KiB
Rust
#![allow(dead_code)]
|
|
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
// SPDX-License-Identifier: Unlicense
|
|
|
|
use crate::constants::{ACCOUNT, DEFAULT_ENDPOINT, NEW_VM_REQ, UPDATE_VM_REQ, VM_NODE};
|
|
use crate::db::prelude as db;
|
|
use crate::grpc::{check_sig_from_parts, check_sig_from_req};
|
|
use detee_shared::common_proto::Empty;
|
|
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli;
|
|
use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemon;
|
|
use detee_shared::vm_proto::{ListVmContractsReq, *};
|
|
use surrealdb::engine::remote::ws::Client;
|
|
use surrealdb::Surreal;
|
|
|
|
use log::info;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use tokio::sync::mpsc;
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
use tokio_stream::{Stream, StreamExt};
|
|
use tonic::{Request, Response, Status, Streaming};
|
|
|
|
pub struct VmDaemonServer {
|
|
pub db: Arc<Surreal<Client>>,
|
|
}
|
|
|
|
impl VmDaemonServer {
|
|
pub fn new(db: Arc<Surreal<Client>>) -> Self {
|
|
Self { db }
|
|
}
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl BrainVmDaemon for VmDaemonServer {
|
|
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainVmMessage, Status>> + Send>>;
|
|
type RegisterVmNodeStream = Pin<Box<dyn Stream<Item = Result<DeleteVmReq, Status>> + Send>>;
|
|
|
|
async fn register_vm_node(
|
|
&self,
|
|
req: Request<RegisterVmNodeReq>,
|
|
) -> Result<Response<Self::RegisterVmNodeStream>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
info!("Starting registration process for {:?}", req);
|
|
db::VmNode {
|
|
id: surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())),
|
|
operator: surrealdb::RecordId::from((ACCOUNT, req.operator_wallet)),
|
|
pub_sub_node: std::env::var("BRAIN_PUBLIC_ENDPOINT")
|
|
.unwrap_or(DEFAULT_ENDPOINT.to_string()),
|
|
country: req.country,
|
|
region: req.region,
|
|
city: req.city,
|
|
ip: req.main_ip,
|
|
avail_ipv4: 0,
|
|
avail_ipv6: 0,
|
|
avail_ports: 0,
|
|
max_ports_per_vm: 0,
|
|
disconnected_at: surrealdb::sql::Datetime::default(),
|
|
connected_at: surrealdb::sql::Datetime::default(),
|
|
offers: Vec::new(),
|
|
}
|
|
.register(&self.db)
|
|
.await?;
|
|
|
|
info!("Sending deleted contracts to {}", req.node_pubkey);
|
|
let deleted_vms = db::DeletedVm::list_by_node(&self.db, &req.node_pubkey).await?;
|
|
let (tx, rx) = mpsc::channel(6);
|
|
tokio::spawn(async move {
|
|
for deleted_vm in deleted_vms {
|
|
let _ = tx.send(Ok(deleted_vm.into())).await;
|
|
}
|
|
});
|
|
let output_stream = ReceiverStream::new(rx);
|
|
Ok(Response::new(Box::pin(output_stream) as Self::RegisterVmNodeStream))
|
|
}
|
|
|
|
async fn brain_messages(
|
|
&self,
|
|
req: Request<DaemonStreamAuth>,
|
|
) -> Result<Response<Self::BrainMessagesStream>, Status> {
|
|
let auth = req.into_inner();
|
|
let pubkey = auth.pubkey.clone();
|
|
check_sig_from_parts(
|
|
&pubkey,
|
|
&auth.timestamp,
|
|
&format!("{:?}", auth.contracts),
|
|
&auth.signature,
|
|
)?;
|
|
info!("Daemon {} connected to receive brain messages", pubkey);
|
|
let _ = db::VmNode::set_online(&self.db, &pubkey).await;
|
|
|
|
let (tx, rx) = mpsc::channel(6);
|
|
{
|
|
let db = self.db.clone();
|
|
let pubkey = pubkey.clone();
|
|
let tx = tx.clone();
|
|
tokio::spawn(async move {
|
|
match db::live_vmnode_msgs::<db::DeletedVm>(&db, &pubkey, tx).await {
|
|
Ok(()) => log::info!("live_vmnode_msgs ended for {pubkey}"),
|
|
Err(e) => {
|
|
log::warn!("live_vmnode_msgs errored for {pubkey}: {e}")
|
|
}
|
|
};
|
|
});
|
|
}
|
|
{
|
|
let db = self.db.clone();
|
|
let pubkey = pubkey.clone();
|
|
let tx = tx.clone();
|
|
tokio::spawn(async move {
|
|
let _ = db::live_vmnode_msgs::<db::NewVmReq>(&db, &pubkey, tx.clone()).await;
|
|
});
|
|
}
|
|
{
|
|
let db = self.db.clone();
|
|
let pubkey = pubkey.clone();
|
|
let tx = tx.clone();
|
|
tokio::spawn(async move {
|
|
let _ = db::live_vmnode_msgs::<db::UpdateVmReq>(&db, &pubkey, tx.clone()).await;
|
|
});
|
|
}
|
|
|
|
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into()));
|
|
Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream))
|
|
}
|
|
|
|
async fn daemon_messages(
|
|
&self,
|
|
req: Request<Streaming<VmDaemonMessage>>,
|
|
) -> Result<Response<Empty>, Status> {
|
|
let mut req_stream = req.into_inner();
|
|
let pubkey: String;
|
|
if let Some(Ok(msg)) = req_stream.next().await {
|
|
log::debug!("demon_messages received the following auth message: {:?}", msg.msg);
|
|
if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg {
|
|
pubkey = auth.pubkey.clone();
|
|
check_sig_from_parts(
|
|
&pubkey,
|
|
&auth.timestamp,
|
|
&format!("{:?}", auth.contracts),
|
|
&auth.signature,
|
|
)?;
|
|
} else {
|
|
return Err(Status::unauthenticated(
|
|
"Could not authenticate the daemon: could not extract auth signature",
|
|
));
|
|
}
|
|
} else {
|
|
return Err(Status::unauthenticated("Could not authenticate the daemon"));
|
|
}
|
|
|
|
while let Some(daemon_message) = req_stream.next().await {
|
|
match daemon_message {
|
|
Ok(msg) => match msg.msg {
|
|
Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => {
|
|
if !new_vm_resp.error.is_empty() {
|
|
db::NewVmReq::submit_error(
|
|
&self.db,
|
|
&new_vm_resp.vm_id,
|
|
new_vm_resp.error,
|
|
)
|
|
.await?;
|
|
} else {
|
|
db::upsert_record(
|
|
&self.db,
|
|
"measurement_args",
|
|
&new_vm_resp.vm_id,
|
|
new_vm_resp.args.clone(),
|
|
)
|
|
.await?;
|
|
if let Some(args) = new_vm_resp.args {
|
|
db::ActiveVm::activate(&self.db, &new_vm_resp.vm_id, args).await?;
|
|
}
|
|
}
|
|
}
|
|
Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {
|
|
if !update_vm_resp.error.is_empty() {
|
|
db::UpdateVmReq::submit_error(
|
|
&self.db,
|
|
&update_vm_resp.vm_id,
|
|
update_vm_resp.error,
|
|
)
|
|
.await?;
|
|
} else {
|
|
db::upsert_record(
|
|
&self.db,
|
|
"measurement_args",
|
|
&update_vm_resp.vm_id,
|
|
update_vm_resp.args.clone(),
|
|
)
|
|
.await?;
|
|
db::ActiveVm::update(&self.db, &update_vm_resp.vm_id).await?;
|
|
}
|
|
}
|
|
Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => {
|
|
let node_resources: db::VmNodeResources = node_resources.into();
|
|
node_resources.merge(&self.db, &pubkey).await?;
|
|
}
|
|
_ => {}
|
|
},
|
|
Err(e) => {
|
|
log::warn!("Daemon disconnected for {pubkey}: {e:?}");
|
|
let _ = db::VmNode::set_offline(&self.db, &pubkey).await;
|
|
}
|
|
}
|
|
}
|
|
Ok(Response::new(Empty {}))
|
|
}
|
|
}
|
|
|
|
pub struct VmCliServer {
|
|
pub db: Arc<Surreal<Client>>,
|
|
}
|
|
|
|
impl VmCliServer {
|
|
pub fn new(db: Arc<Surreal<Client>>) -> Self {
|
|
Self { db }
|
|
}
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl BrainVmCli for VmCliServer {
|
|
type ListVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
|
|
type ListVmNodesStream = Pin<Box<dyn Stream<Item = Result<VmNodeListResp, Status>> + Send>>;
|
|
|
|
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
let id = surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone()));
|
|
if let Some(redirect) = db::check_pubsub_node(&self.db, id).await? {
|
|
log::info!("redirect: {redirect}");
|
|
return Err(redirect);
|
|
}
|
|
|
|
// TODO: make it atleast 1 hour
|
|
if req.locked_nano < 100 {
|
|
log::error!("locking lessthan 100 nano lps: {}", req.locked_nano);
|
|
return Err(Status::unknown("lock atleaset 100 nano lps"));
|
|
}
|
|
info!("New VM requested via CLI: {req:?}");
|
|
if db::Account::is_banned_by_node(&self.db, &req.admin_pubkey, &req.node_pubkey).await? {
|
|
return Err(Status::permission_denied("This operator banned you. What did you do?"));
|
|
}
|
|
|
|
let db_req: db::NewVmReq = req.into();
|
|
let id = db_req.id.key().to_string();
|
|
|
|
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
|
|
let db = self.db.clone();
|
|
tokio::spawn(async move {
|
|
let _ = oneshot_tx.send(db::WrappedMeasurement::listen(&db, &id, NEW_VM_REQ).await);
|
|
});
|
|
db_req.submit(&self.db).await?;
|
|
|
|
match oneshot_rx.await {
|
|
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
|
|
"Network timeout. Please try again later or contact the DeTEE devs team.",
|
|
)),
|
|
Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())),
|
|
Err(e) => {
|
|
log::error!("Something weird happened during CLI NewVmReq. Reached error {e:?}");
|
|
Err(Status::unknown(
|
|
"Unknown error. Please try again or contact the DeTEE devs team.",
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
info!("Update VM requested via CLI: {req:?}");
|
|
|
|
let db_req: db::UpdateVmReq = req.clone().into();
|
|
|
|
println!("The node is {}", db_req.vm_node);
|
|
|
|
// TODO: vm_node is not known at this point. It is populated by `request_hw_update`.
|
|
// As such, the pubsub node cannot be checked at this stage. This code should be moved,
|
|
// however we are not working on the redirect mechanic at the moment
|
|
//
|
|
// if let Some(redirect) = db::check_pubsub_node(&self.db, db_req.vm_node.clone()).await? {
|
|
// log::info!("redirect: {redirect}");
|
|
// return Err(redirect);
|
|
// }
|
|
|
|
let id = db_req.id.key().to_string();
|
|
|
|
let mut hostname_changed = false;
|
|
if !req.hostname.is_empty() {
|
|
hostname_changed =
|
|
db::ActiveVm::change_hostname(&self.db, &req.vm_id, &req.hostname).await?;
|
|
}
|
|
|
|
let hw_change_submitted = db_req.request_hw_update(&self.db).await?;
|
|
if hw_change_submitted.is_none() {
|
|
return Ok(Response::new(UpdateVmResp {
|
|
vm_id: req.vm_id.clone(),
|
|
error: "VM Contract does not exist.".to_string(),
|
|
args: None,
|
|
}));
|
|
}
|
|
let hw_change_needed = hw_change_submitted.unwrap();
|
|
|
|
if !hostname_changed && !hw_change_needed {
|
|
return Ok(Response::new(UpdateVmResp {
|
|
vm_id: req.vm_id.clone(),
|
|
error: "No modification required".to_string(),
|
|
args: None,
|
|
}));
|
|
}
|
|
|
|
// if only the hostname got changed, return a confirmation
|
|
if !hw_change_needed {
|
|
return Ok(Response::new(UpdateVmResp {
|
|
vm_id: req.vm_id.clone(),
|
|
error: String::new(),
|
|
args: None,
|
|
}));
|
|
}
|
|
|
|
// if HW changes got requested, wait for the new args
|
|
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
|
|
let db = self.db.clone();
|
|
tokio::spawn(async move {
|
|
let _ = oneshot_tx.send(db::WrappedMeasurement::listen(&db, &id, UPDATE_VM_REQ).await);
|
|
});
|
|
|
|
match oneshot_rx.await {
|
|
Ok(Err(db::Error::TimeOut(_))) => Err(Status::deadline_exceeded(
|
|
"Network timeout. Please try again later or contact the DeTEE devs team.",
|
|
)),
|
|
Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())),
|
|
Err(e) => {
|
|
log::error!("Something weird happened during CLI VM Update. Reached error {e:?}");
|
|
Err(Status::unknown(
|
|
"Unknown error. Please try again or contact the DeTEE devs team.",
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn extend_vm(&self, req: Request<ExtendVmReq>) -> Result<Response<Empty>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
match db::ActiveVm::extend_time(&self.db, &req.vm_id, &req.admin_pubkey, req.locked_nano)
|
|
.await
|
|
{
|
|
Ok(()) => Ok(Response::new(Empty {})),
|
|
Err(e)
|
|
if matches!(
|
|
e,
|
|
db::Error::ContractNotFound
|
|
| db::Error::AccessDenied
|
|
| db::Error::InsufficientFunds
|
|
) =>
|
|
{
|
|
Err(Status::failed_precondition(e.to_string()))
|
|
}
|
|
Err(e) => {
|
|
log::error!("Error extending VM contract {}: {e}", &req.vm_id);
|
|
Err(Status::unknown(format!("Could not extend contract: {e}")))
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn delete_vm(&self, req: Request<DeleteVmReq>) -> Result<Response<Empty>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
let vm_node = db::ActiveVm::get_by_id(&self.db, &req.vm_id)
|
|
.await?
|
|
.ok_or(Status::permission_denied("Unauthorized"))?
|
|
.vm_node;
|
|
|
|
if let Some(redirect) = db::check_pubsub_node(&self.db, vm_node).await? {
|
|
log::info!("redirect: {redirect}");
|
|
return Err(redirect);
|
|
}
|
|
match db::ActiveVm::delete(&self.db, &req.admin_pubkey, &req.vm_id).await {
|
|
Ok(()) => Ok(Response::new(Empty {})),
|
|
Err(db::Error::AccessDenied) => Err(Status::permission_denied("Unauthorized")),
|
|
Err(e) => {
|
|
log::error!("Error deleting VM contract {}: {e}", &req.vm_id);
|
|
Err(Status::unknown(
|
|
"Unknown error. Please try again or contact the DeTEE devs team.",
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn list_vm_contracts(
|
|
&self,
|
|
req: Request<ListVmContractsReq>,
|
|
) -> Result<Response<Self::ListVmContractsStream>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
info!(
|
|
"CLI {} requested ListVMVmContractsStream. As operator: {}",
|
|
req.wallet, req.as_operator
|
|
);
|
|
let mut contracts = Vec::new();
|
|
if !req.vm_id.is_empty() {
|
|
if let Some(specific_contract) =
|
|
db::ActiveVmWithNode::get_by_id(&self.db, &req.vm_id).await?
|
|
{
|
|
if specific_contract.admin.key().to_string() == req.wallet
|
|
|| specific_contract.vm_node.operator.key().to_string() == req.wallet
|
|
{
|
|
contracts.push(specific_contract);
|
|
}
|
|
}
|
|
} else if req.as_operator {
|
|
contracts
|
|
.append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?);
|
|
} else {
|
|
contracts
|
|
.append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?);
|
|
}
|
|
let (tx, rx) = mpsc::channel(6);
|
|
tokio::spawn(async move {
|
|
for contract in contracts {
|
|
let _ = tx.send(Ok(contract.into())).await;
|
|
}
|
|
});
|
|
let output_stream = ReceiverStream::new(rx);
|
|
Ok(Response::new(Box::pin(output_stream) as Self::ListVmContractsStream))
|
|
}
|
|
|
|
async fn list_vm_nodes(
|
|
&self,
|
|
req: Request<VmNodeFilters>,
|
|
) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
info!("CLI requested ListVmNodesStream: {req:?}");
|
|
let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?;
|
|
let (tx, rx) = mpsc::channel(6);
|
|
tokio::spawn(async move {
|
|
for node in nodes {
|
|
let _ = tx.send(Ok(node.into())).await;
|
|
}
|
|
});
|
|
let output_stream = ReceiverStream::new(rx);
|
|
Ok(Response::new(Box::pin(output_stream) as Self::ListVmNodesStream))
|
|
}
|
|
|
|
async fn get_one_vm_node(
|
|
&self,
|
|
req: Request<VmNodeFilters>,
|
|
) -> Result<Response<VmNodeListResp>, Status> {
|
|
let req = check_sig_from_req(req)?;
|
|
info!("Unknown CLI requested ListVmNodesStream: {req:?}");
|
|
// TODO: optimize this query so that it gets only one node
|
|
let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?;
|
|
if let Some(node) = nodes.into_iter().next() {
|
|
return Ok(Response::new(node.into()));
|
|
}
|
|
Err(Status::not_found("Could not find any node based on your search criteria"))
|
|
}
|
|
}
|