detee-cli/src/snp/grpc.rs

232 lines
8.1 KiB
Rust

// SPDX-License-Identifier: Apache-2.0
pub mod proto {
pub use detee_shared::general_proto::*;
pub use detee_shared::vm_proto::*;
}
use crate::call_with_follow_redirect;
use crate::config::Config;
use crate::utils::{self, sign_request};
use log::{debug, info, warn};
use proto::brain_vm_cli_client::BrainVmCliClient;
use proto::{
DeleteVmReq, ExtendVmReq, ListVmContractsReq, NewVmReq, NewVmResp, UpdateVmReq, UpdateVmResp,
VmContract, VmNodeFilters, VmNodeListResp,
};
use tokio_stream::StreamExt;
use tonic::metadata::errors::InvalidMetadataValue;
use tonic::transport::Channel;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Failed to connect to the brain: {0}")]
BrainConnection(#[from] tonic::transport::Error),
#[error("Received error from brain: status: {}, message: {}",
_0.code().to_string(), _0.message())]
ResponseStatus(#[from] tonic::Status),
#[error(transparent)]
ConfigError(#[from] crate::config::Error),
#[error("Could not find contract {0}")]
VmContractNotFound(String),
#[error(transparent)]
InternalParsingError(#[from] InvalidMetadataValue),
#[error("The Root CA file got corrupted.")]
CorruptedRootCa(#[from] std::io::Error),
#[error("Internal app error: could not parse Brain URL")]
CorruptedBrainUrl,
#[error("Max redirects exceeded: {0}")]
MaxRedirectsExceeded(String),
#[error("Redirect error: {0}")]
RedirectError(String),
#[error(transparent)]
InternalError(#[from] utils::Error),
}
impl crate::HumanOutput for VmContract {
fn human_cli_print(&self) {
println!(
"The VM {} has the ID {}, and it runs on the node {}",
self.hostname, self.vm_id, self.node_pubkey
);
if self.vm_public_ipv4.is_empty() {
println!(
"The VM has no public IPv4. The ports mapped from the host to the VM are: {:?}",
self.mapped_ports
);
} else {
println!("The Public IPv4 address of the VM is: {}", self.vm_public_ipv4);
}
if self.vm_public_ipv6.is_empty() {
println!("The VM does not have a public IPv6 address.");
} else {
println!("The Public IPv6 address of the VM is: {}", self.vm_public_ipv6);
}
println!(
"The VM has {} vCPUS, {}MB of memory and a disk of {} GB.",
self.vcpus, self.memory_mb, self.disk_size_gb
);
println!("You have locked {} nanocredits in the contract, that get collected at a rate of {} nanocredits per minute.",
self.locked_nano, self.nano_per_minute);
}
}
impl crate::HumanOutput for VmNodeListResp {
fn human_cli_print(&self) {
println!("The pubkey of this node is {} and the IP is {}", self.node_pubkey, self.ip);
println!("It belongs to the operator {}", self.operator);
println!(
"This node is located in the city {}, within the region of {}, in {}",
self.city, self.region, self.country
);
if self.offers.len() > 0 {
println!("The node is offering the following offers:");
}
for offer in self.offers.iter() {
println!(
" - {} vcpus, {} MiB of memory, {} MiB of storage at a price per unit of {}",
offer.vcpus, offer.memory_mib, offer.disk_mib, offer.price
);
}
}
}
async fn client() -> Result<BrainVmCliClient<Channel>, Error> {
let default_brain_url = Config::get_brain_info().0;
debug!("brain_url: {default_brain_url}");
Ok(BrainVmCliClient::new(Config::connect_brain_channel(default_brain_url).await?))
}
async fn client_from_endpoint(
reconnect_endpoint: String,
) -> Result<BrainVmCliClient<Channel>, Error> {
Ok(BrainVmCliClient::new(Config::connect_brain_channel(reconnect_endpoint).await?))
}
pub async fn get_node_list(req: VmNodeFilters) -> Result<Vec<VmNodeListResp>, Error> {
debug!("Getting nodes from brain...");
let mut client = client().await?;
let mut nodes = Vec::new();
let mut grpc_stream = client.list_vm_nodes(sign_request(req)?).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(node) => {
debug!("Received node from brain: {node:?}");
nodes.push(node);
}
Err(e) => {
warn!("Received error instead of node list: {e:?}");
}
}
}
debug!("Brain terminated list_nodes stream.");
Ok(nodes)
}
pub async fn get_one_node(req: VmNodeFilters) -> Result<VmNodeListResp, Error> {
let mut client = client().await?;
let response = client.get_one_vm_node(sign_request(req)?).await?;
Ok(response.into_inner())
}
pub async fn create_vm(req: NewVmReq) -> Result<NewVmResp, Error> {
debug!("Sending NewVmReq to brain: {req:?}");
let client = client().await?;
match call_with_follow_redirect!(client, req, new_vm).await {
Ok(resp) => Ok(resp.into_inner()),
Err(e) => Err(e.into()),
}
}
pub async fn list_contracts(req: ListVmContractsReq) -> Result<Vec<VmContract>, Error> {
debug!("Getting contracts from brain...");
let mut client = client().await?;
let mut contracts = Vec::new();
let mut grpc_stream = client.list_vm_contracts(sign_request(req)?).await?.into_inner();
while let Some(stream_update) = grpc_stream.next().await {
match stream_update {
Ok(c) => {
info!("Received contract from brain: {c:?}");
contracts.push(c);
}
Err(e) => {
warn!("Received error instead of contracts: {e:?}");
}
}
}
debug!("Brain terminated list_contracts stream.");
Ok(contracts)
}
pub async fn delete_vm(vm_id: &str) -> Result<(), Error> {
let client = client().await?;
let req = DeleteVmReq { vm_id: vm_id.to_string(), admin_pubkey: Config::get_detee_wallet()? };
match call_with_follow_redirect!(client, req, delete_vm).await {
Ok(confirmation) => {
log::debug!("VM deletion confirmation: {confirmation:?}");
}
Err(e) => {
log::error!("Could not delete vm: {e:?}");
return Err(e.into());
}
};
Ok(())
}
pub async fn extend_vm(vm_id: String, admin_pubkey: String, locked_nano: u64) -> Result<(), Error> {
let mut client = client().await?;
let req = ExtendVmReq { admin_pubkey, vm_id, locked_nano };
let result = client.extend_vm(sign_request(req)?).await;
match result {
Ok(confirmation) => {
log::debug!("VM contract extension confirmation: {confirmation:?}");
log::info!(
"VM contract got updated. It now has {} credits locked for the VM.",
locked_nano as f64 / 1_000_000_000.0
);
}
Err(e) => {
log::debug!("Got error from brain: {:?}", e);
log::error!("Could not extend VM contract: {}", e.message());
return Err(e.into());
}
};
Ok(())
}
pub async fn update_vm(req: UpdateVmReq) -> Result<UpdateVmResp, Error> {
info!("Updating VM {req:?}");
let client = client().await?;
match call_with_follow_redirect!(client, req, update_vm).await {
Ok(resp) => {
let resp = resp.into_inner();
if resp.error.is_empty() {
info!("Got VM update response: {resp:?}");
Ok(resp)
} else {
debug!("Got VM update error: {:?}", resp);
Ok(resp)
}
}
Err(e) => {
log::error!("Could not update vm: {e:?}");
Err(e.into())
}
}
}
pub async fn get_contract_by_id(vm_id: &str) -> Result<VmContract, Error> {
let req = ListVmContractsReq {
wallet: Config::get_detee_wallet()?,
vm_id: vm_id.to_string(),
..Default::default()
};
let contracts = list_contracts(req).await?;
if contracts.is_empty() {
log::error!("Could not find any contract by ID {vm_id}");
return Err(Error::VmContractNotFound(vm_id.to_string()));
}
Ok(contracts[0].clone())
}