added code to create new VM

compiles but not tested yet
This commit is contained in:
ghe0 2025-04-26 22:13:42 +03:00
parent 363724e5d7
commit 6f9cb36bea
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
5 changed files with 256 additions and 72 deletions

3
Cargo.lock generated

@ -1000,7 +1000,7 @@ dependencies = [
[[package]]
name = "detee-shared"
version = "0.1.0"
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#fb38352e1b47837b14f32d8df5ae7f6b17202aae"
source = "git+ssh://git@gitea.detee.cloud/testnet/proto?branch=surreal_brain#d0d4622c52efdf74ed6582fbac23a6159986ade3"
dependencies = [
"bincode 2.0.1",
"prost",
@ -3779,6 +3779,7 @@ dependencies = [
"env_logger",
"futures",
"log",
"nanoid",
"serde",
"serde_json",
"serde_yaml",

@ -20,6 +20,7 @@ tokio-stream = "0.1.17"
log = "0.4.27"
env_logger = "0.11.8"
thiserror = "2.0.12"
nanoid = "0.4.0"
[profile.release]
lto = true

@ -36,6 +36,7 @@ DEFINE FIELD created_at ON TABLE new_vm_req TYPE datetime;
DEFINE FIELD updated_at ON TABLE new_vm_req TYPE datetime;
DEFINE FIELD price_per_unit ON TABLE new_vm_req TYPE int;
DEFINE FIELD locked_nano ON TABLE new_vm_req TYPE int;
DEFINE FIELD error ON TABLE new_vm_req TYPE string;
DEFINE TABLE active_vm TYPE RELATION FROM account TO vm_node SCHEMAFULL;
DEFINE FIELD hostname ON TABLE active_vm TYPE string;

134
src/db.rs

@ -18,6 +18,13 @@ pub const NEW_VM_REQ: &str = "new_vm_req";
pub const UPDATE_VM_REQ: &str = "update_vm_req";
pub const DELETED_VM: &str = "deleted_vm";
pub const ID_ALPHABET: [char; 62] = [
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B',
'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U',
'V', 'W', 'X', 'Y', 'Z',
];
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Internal DB error: {0}")]
@ -34,6 +41,17 @@ pub async fn init() -> surrealdb::Result<()> {
Ok(())
}
pub async fn upsert_record<SomeRecord: Serialize + 'static>(
table: &str,
id: &str,
my_record: SomeRecord,
) -> Result<(), Error> {
#[derive(Deserialize)]
struct Wrapper {}
let _: Option<Wrapper> = DB.create((table, id)).content(my_record).await?;
Ok(())
}
pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()> {
let accounts: Vec<Account> = old_data.into();
let vm_nodes: Vec<VmNode> = old_data.into();
@ -85,6 +103,21 @@ impl Account {
}
}
impl Account {
pub async fn is_banned_by_node(user: &str, node: &str) -> Result<bool, Error> {
let ban: Option<Self> = DB
.query(format!(
"(select operator->ban[0] as ban
from vm_node:{node}
where operator->ban->account contains account:{user}
).ban;"
))
.await?
.take(0)?;
Ok(ban.is_some())
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct VmNode {
pub id: RecordId,
@ -104,6 +137,24 @@ pub struct VmNode {
pub offline_minutes: u64,
}
#[derive(Serialize)]
pub struct VmNodeResources {
pub avail_mem_mb: u32,
pub avail_vcpus: u32,
pub avail_storage_gbs: u32,
pub avail_ipv4: u32,
pub avail_ipv6: u32,
pub avail_ports: u32,
pub max_ports_per_vm: u32,
}
impl VmNodeResources {
pub async fn merge(self, node_id: &str) -> Result<(), Error> {
let _: Option<VmNode> = DB.update((VM_NODE, node_id)).merge(self).await?;
Ok(())
}
}
impl VmNode {
pub async fn register(self) -> Result<(), Error> {
let _: Option<VmNode> = DB.upsert(self.id.clone()).content(self).await?;
@ -176,6 +227,82 @@ pub struct NewVmReq {
pub created_at: Datetime,
pub price_per_unit: u64,
pub locked_nano: u64,
pub error: String,
}
impl NewVmReq {
pub async fn submit_error(id: &str, error: String) -> Result<(), Error> {
#[derive(Serialize)]
struct NewVmError {
error: String,
}
let _: Option<VmNode> = DB.update((NEW_VM_REQ, id)).merge(NewVmError { error }).await?;
Ok(())
}
pub async fn submit(self) -> Result<(), Error> {
let _: Option<Self> = DB.create(self.id.clone()).content(self).await?;
Ok(())
}
}
/// first string is the vm_id
pub enum NewVmResp {
// TODO: find a more elegant way to do this than importing gRPC in the DB module
// https://en.wikipedia.org/wiki/Dependency_inversion_principle
Args(String, detee_shared::snp::pb::vm_proto::MeasurementArgs),
Error(String, String),
}
impl NewVmResp {
pub async fn listen(vm_id: &str) -> Result<NewVmResp, Error> {
let mut resp = DB
.query(format!("live select * from {NEW_VM_REQ} where id = {NEW_VM_REQ}:{vm_id};"))
.query(format!(
"live select * from measurement_args where id = measurement_args:{vm_id};"
))
.await?;
let mut live_stream1 = resp.stream::<Notification<NewVmReq>>(0)?;
let mut live_stream2 =
resp.stream::<Notification<detee_shared::snp::pb::vm_proto::MeasurementArgs>>(1)?;
loop {
tokio::select! {
new_vm_req_notif = live_stream1.next() => {
if let Some(new_vm_req_notif) = new_vm_req_notif {
match new_vm_req_notif {
Ok(new_vm_req_notif) => {
match new_vm_req_notif.action {
surrealdb::Action::Update => {
if !new_vm_req_notif.data.error.is_empty() {
return Ok(Self::Error(vm_id.to_string(), new_vm_req_notif.data.error));
}
},
_ => {}
};
},
Err(e) => return Err(e.into()),
}
}
}
args_notif = live_stream2.next() => {
if let Some(args_notif) = args_notif {
match args_notif {
Ok(args_notif) => {
match args_notif.action {
surrealdb::Action::Create => {
return Ok(Self::Args(vm_id.to_string(), args_notif.data));
},
_ => {}
};
},
Err(e) => return Err(e.into()),
}
}
}
}
}
}
}
#[derive(Debug, Serialize, Deserialize)]
@ -219,7 +346,9 @@ pub struct UpdateVmReq {
pub locked_nano: u64,
}
pub async fn listen_for_node<T: Into<DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>>(
pub async fn listen_for_node<
T: Into<DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>,
>(
node: &str,
tx: Sender<DaemonNotification>,
) -> Result<(), Error> {
@ -230,7 +359,7 @@ pub async fn listen_for_node<T: Into<DaemonNotification> + std::marker::Unpin +
wat => {
log::error!("listen_for_node: T has type {wat}");
String::from("wat")
},
}
};
let mut resp =
DB.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?;
@ -346,7 +475,6 @@ impl ActiveVm {
pub fn price_per_minute(&self) -> u64 {
self.total_units() * self.price_per_unit
}
}
#[derive(Debug, Serialize, Deserialize)]

@ -13,9 +13,11 @@ use detee_shared::{
*,
},
};
use nanoid::nanoid;
use log::info;
use std::pin::Pin;
use surrealdb::RecordId;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};
@ -29,6 +31,31 @@ impl From<db::Account> for AccountBalance {
}
}
impl From<NewVmReq> for db::NewVmReq {
fn from(new_vm_req: NewVmReq) -> Self {
Self {
id: RecordId::from((db::NEW_VM_REQ, nanoid!(40, &db::ID_ALPHABET))),
hostname: new_vm_req.hostname,
admin: RecordId::from((db::ACCOUNT, new_vm_req.admin_pubkey)),
vm_node: RecordId::from((db::VM_NODE, new_vm_req.node_pubkey)),
extra_ports: new_vm_req.extra_ports,
public_ipv4: new_vm_req.public_ipv4,
public_ipv6: new_vm_req.public_ipv6,
disk_size_gb: new_vm_req.disk_size_gb,
vcpus: new_vm_req.vcpus,
memory_mb: new_vm_req.memory_mb,
kernel_url: new_vm_req.kernel_url,
kernel_sha: new_vm_req.kernel_sha,
dtrfs_url: new_vm_req.dtrfs_url,
dtrfs_sha: new_vm_req.dtrfs_sha,
price_per_unit: new_vm_req.price_per_unit,
locked_nano: new_vm_req.locked_nano,
created_at: surrealdb::sql::Datetime::default(),
error: String::new(),
}
}
}
impl From<db::NewVmReq> for NewVmReq {
fn from(new_vm_req: db::NewVmReq) -> Self {
Self {
@ -52,6 +79,19 @@ impl From<db::NewVmReq> for NewVmReq {
}
}
impl From<db::NewVmResp> for NewVmResp {
fn from(resp: db::NewVmResp) -> Self {
match resp {
// TODO: This will require a small architecture change to pass MeasurementArgs from
// Daemon to CLI
db::NewVmResp::Args(uuid, args) => {
NewVmResp { uuid, error: String::new(), args: Some(args) }
}
db::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None },
}
}
}
impl From<db::UpdateVmReq> for UpdateVmReq {
fn from(update_vm_req: db::UpdateVmReq) -> Self {
Self {
@ -181,6 +221,20 @@ impl From<db::AppNodeWithReports> for AppNodeListResp {
}
}
impl From<VmNodeResources> for db::VmNodeResources {
fn from(res: VmNodeResources) -> Self {
Self {
avail_mem_mb: res.avail_memory_mb,
avail_vcpus: res.avail_vcpus,
avail_storage_gbs: res.avail_storage_gb,
avail_ipv4: res.avail_ipv4,
avail_ipv6: res.avail_ipv6,
avail_ports: res.avail_ports,
max_ports_per_vm: res.max_ports_per_vm,
}
}
}
struct BrainVmDaemonForReal {}
#[tonic::async_trait]
@ -273,52 +327,54 @@ impl BrainVmDaemon for BrainVmDaemonForReal {
async fn daemon_messages(
&self,
_req: Request<Streaming<VmDaemonMessage>>,
req: Request<Streaming<VmDaemonMessage>>,
) -> Result<Response<Empty>, Status> {
todo!();
// let mut req_stream = req.into_inner();
// let pubkey: String;
// if let Some(Ok(msg)) = req_stream.next().await {
// log::debug!("demon_messages received the following auth message: {:?}", msg.msg);
// if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg {
// pubkey = auth.pubkey.clone();
// check_sig_from_parts(
// &pubkey,
// &auth.timestamp,
// &format!("{:?}", auth.contracts),
// &auth.signature,
// )?;
// } else {
// return Err(Status::unauthenticated(
// "Could not authenticate the daemon: could not extract auth signature",
// ));
// }
// } else {
// return Err(Status::unauthenticated("Could not authenticate the daemon"));
// }
let mut req_stream = req.into_inner();
let pubkey: String;
if let Some(Ok(msg)) = req_stream.next().await {
log::debug!("demon_messages received the following auth message: {:?}", msg.msg);
if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg {
pubkey = auth.pubkey.clone();
check_sig_from_parts(
&pubkey,
&auth.timestamp,
&format!("{:?}", auth.contracts),
&auth.signature,
)?;
} else {
return Err(Status::unauthenticated(
"Could not authenticate the daemon: could not extract auth signature",
));
}
} else {
return Err(Status::unauthenticated("Could not authenticate the daemon"));
}
// // info!("Received a message from daemon {pubkey}: {daemon_message:?}");
// while let Some(daemon_message) = req_stream.next().await {
// match daemon_message {
// Ok(msg) => match msg.msg {
// Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => {
// self.data.submit_newvm_resp(new_vm_resp).await;
// }
// Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {
while let Some(daemon_message) = req_stream.next().await {
match daemon_message {
Ok(msg) => match msg.msg {
Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => {
if !new_vm_resp.error.is_empty() {
} else {
db::upsert_record("measurement_args", &new_vm_resp.uuid, new_vm_resp.args).await?;
}
}
Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {
todo!();
// self.data.submit_updatevm_resp(update_vm_resp).await;
// }
// Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => {
// self.data.submit_node_resources(node_resources);
// }
// _ => {}
// },
// Err(e) => {
// log::warn!("Daemon disconnected: {e:?}");
// self.data.del_daemon_tx(&pubkey);
// }
// }
// }
// Ok(Response::new(Empty {}))
}
Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => {
let node_resources: db::VmNodeResources = node_resources.into();
node_resources.merge(&pubkey).await?;
}
_ => {}
},
Err(e) => {
log::warn!("Daemon disconnected: {e:?}");
}
}
}
Ok(Response::new(Empty {}))
}
}
@ -495,30 +551,27 @@ impl BrainVmCli for BrainVmCliForReal {
async fn new_vm(&self, req: Request<NewVmReq>) -> Result<Response<NewVmResp>, Status> {
let req = check_sig_from_req(req)?;
info!("New VM requested via CLI: {req:?}");
todo!();
// if self
// .data
// .is_user_banned_by_node(&req.admin_pubkey, &req.node_pubkey)
// {
// return Err(Status::permission_denied(
// "This operator banned you. What did you do?",
// ));
// }
// let admin_pubkey = req.admin_pubkey.clone();
// let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
// self.data.submit_newvm_req(req, oneshot_tx).await;
// match oneshot_rx.await {
// Ok(response) => {
// info!("Sending VM confirmation to {admin_pubkey}: {response:?}");
// Ok(Response::new(response))
// }
// Err(e) => {
// log::error!("Something weird happened. Reached error {e:?}");
// Err(Status::unknown(
// "Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
// ))
// }
// }
if db::Account::is_banned_by_node(&req.admin_pubkey, &req.node_pubkey).await? {
return Err(Status::permission_denied("This operator banned you. What did you do?"));
}
let new_vm_req: db::NewVmReq = req.into();
let id = new_vm_req.id.key().to_string();
let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let _ = oneshot_tx.send(db::NewVmResp::listen(&id).await);
});
new_vm_req.submit().await?;
match oneshot_rx.await {
Ok(new_vm_resp) => Ok(Response::new(new_vm_resp?.into())),
Err(e) => {
log::error!("Something weird happened. Reached error {e:?}");
Err(Status::unknown(
"Request failed due to unknown error. Please try again or contact the DeTEE devs team.",
))
}
}
}
async fn update_vm(&self, req: Request<UpdateVmReq>) -> Result<Response<UpdateVmResp>, Status> {