forked from ghe0/brain-to-surreal
Compare commits
4 Commits
app_data_i
...
master
Author | SHA1 | Date | |
---|---|---|---|
363724e5d7 | |||
6a99c146ce | |||
d9f4df2c3d | |||
ee75412bb0 |
@ -1,10 +1,11 @@
|
||||
DEFINE TABLE account SCHEMAFULL;
|
||||
DEFINE FIELD balance ON TABLE account TYPE int;
|
||||
DEFINE FIELD tmp_locked ON TABLE account TYPE int;
|
||||
DEFINE FIELD escrow ON TABLE account TYPE int;
|
||||
DEFINE FIELD email ON TABLE account TYPE string;
|
||||
DEFINE FIELD balance ON TABLE account TYPE int DEFAULT 0;
|
||||
DEFINE FIELD tmp_locked ON TABLE account TYPE int DEFAULT 0;
|
||||
DEFINE FIELD escrow ON TABLE account TYPE int DEFAULT 0;
|
||||
DEFINE FIELD email ON TABLE account TYPE string DEFAULT "";
|
||||
|
||||
DEFINE TABLE vm_node SCHEMAFULL;
|
||||
DEFINE FIELD operator ON TABLE vm_node TYPE record<account>;
|
||||
DEFINE FIELD country ON TABLE vm_node TYPE string;
|
||||
DEFINE FIELD region ON TABLE vm_node TYPE string;
|
||||
DEFINE FIELD city ON TABLE vm_node TYPE string;
|
||||
@ -19,24 +20,63 @@ DEFINE FIELD max_ports_per_vm ON TABLE vm_node TYPE int;
|
||||
DEFINE FIELD price ON TABLE vm_node TYPE int;
|
||||
DEFINE FIELD offline_minutes ON TABLE vm_node TYPE int;
|
||||
|
||||
DEFINE TABLE vm_contract TYPE RELATION FROM account TO vm_node SCHEMAFULL;
|
||||
DEFINE FIELD state ON TABLE vm_contract TYPE string;
|
||||
DEFINE FIELD hostname ON TABLE vm_contract TYPE string;
|
||||
DEFINE FIELD mapped_ports ON TABLE vm_contract TYPE array<[int, int]>;
|
||||
DEFINE FIELD public_ipv4 ON TABLE vm_contract TYPE string;
|
||||
DEFINE FIELD public_ipv6 ON TABLE vm_contract TYPE string;
|
||||
DEFINE FIELD disk_size_gb ON TABLE vm_contract TYPE int;
|
||||
DEFINE FIELD vcpus ON TABLE vm_contract TYPE int;
|
||||
DEFINE FIELD memory_mb ON TABLE vm_contract TYPE int;
|
||||
DEFINE FIELD dtrfs_sha ON TABLE vm_contract TYPE string;
|
||||
DEFINE FIELD kernel_sha ON TABLE vm_contract TYPE string;
|
||||
DEFINE FIELD created_at ON TABLE vm_contract TYPE datetime;
|
||||
DEFINE FIELD updated_at ON TABLE vm_contract TYPE datetime;
|
||||
DEFINE FIELD price_per_unit ON TABLE vm_contract TYPE int;
|
||||
DEFINE FIELD locked_nano ON TABLE vm_contract TYPE int;
|
||||
DEFINE FIELD collected_at ON TABLE vm_contract TYPE datetime;
|
||||
DEFINE TABLE new_vm_req TYPE RELATION FROM account TO vm_node SCHEMAFULL;
|
||||
DEFINE FIELD hostname ON TABLE new_vm_req TYPE string;
|
||||
DEFINE FIELD extra_ports ON TABLE new_vm_req TYPE array<int>;
|
||||
DEFINE FIELD public_ipv4 ON TABLE new_vm_req TYPE bool;
|
||||
DEFINE FIELD public_ipv6 ON TABLE new_vm_req TYPE bool;
|
||||
DEFINE FIELD disk_size_gb ON TABLE new_vm_req TYPE int;
|
||||
DEFINE FIELD vcpus ON TABLE new_vm_req TYPE int;
|
||||
DEFINE FIELD memory_mb ON TABLE new_vm_req TYPE int;
|
||||
DEFINE FIELD dtrfs_sha ON TABLE new_vm_req TYPE string;
|
||||
DEFINE FIELD dtrfs_url ON TABLE new_vm_req TYPE string;
|
||||
DEFINE FIELD kernel_sha ON TABLE new_vm_req TYPE string;
|
||||
DEFINE FIELD kernel_url ON TABLE new_vm_req TYPE string;
|
||||
DEFINE FIELD created_at ON TABLE new_vm_req TYPE datetime;
|
||||
DEFINE FIELD updated_at ON TABLE new_vm_req TYPE datetime;
|
||||
DEFINE FIELD price_per_unit ON TABLE new_vm_req TYPE int;
|
||||
DEFINE FIELD locked_nano ON TABLE new_vm_req TYPE int;
|
||||
|
||||
DEFINE TABLE active_vm TYPE RELATION FROM account TO vm_node SCHEMAFULL;
|
||||
DEFINE FIELD hostname ON TABLE active_vm TYPE string;
|
||||
DEFINE FIELD mapped_ports ON TABLE active_vm TYPE array<[int, int]>;
|
||||
DEFINE FIELD public_ipv4 ON TABLE active_vm TYPE string;
|
||||
DEFINE FIELD public_ipv6 ON TABLE active_vm TYPE string;
|
||||
DEFINE FIELD disk_size_gb ON TABLE active_vm TYPE int;
|
||||
DEFINE FIELD vcpus ON TABLE active_vm TYPE int;
|
||||
DEFINE FIELD memory_mb ON TABLE active_vm TYPE int;
|
||||
DEFINE FIELD dtrfs_sha ON TABLE active_vm TYPE string;
|
||||
DEFINE FIELD kernel_sha ON TABLE active_vm TYPE string;
|
||||
DEFINE FIELD created_at ON TABLE active_vm TYPE datetime;
|
||||
DEFINE FIELD price_per_unit ON TABLE active_vm TYPE int;
|
||||
DEFINE FIELD locked_nano ON TABLE active_vm TYPE int;
|
||||
DEFINE FIELD collected_at ON TABLE active_vm TYPE datetime;
|
||||
|
||||
DEFINE TABLE update_vm_req TYPE RELATION FROM account TO vm_node SCHEMAFULL;
|
||||
DEFINE FIELD vcpus ON TABLE update_vm_req TYPE int;
|
||||
DEFINE FIELD memory_mb ON TABLE update_vm_req TYPE int;
|
||||
DEFINE FIELD disk_size_gb ON TABLE update_vm_req TYPE int;
|
||||
DEFINE FIELD dtrfs_sha ON TABLE update_vm_req TYPE string;
|
||||
DEFINE FIELD dtrfs_url ON TABLE update_vm_req TYPE string;
|
||||
DEFINE FIELD kernel_sha ON TABLE update_vm_req TYPE string;
|
||||
DEFINE FIELD kernel_url ON TABLE update_vm_req TYPE string;
|
||||
|
||||
DEFINE TABLE deleted_vm TYPE RELATION FROM account TO vm_node SCHEMAFULL;
|
||||
DEFINE FIELD hostname ON TABLE deleted_vm TYPE string;
|
||||
DEFINE FIELD mapped_ports ON TABLE deleted_vm TYPE array<[int, int]>;
|
||||
DEFINE FIELD public_ipv4 ON TABLE deleted_vm TYPE string;
|
||||
DEFINE FIELD public_ipv6 ON TABLE deleted_vm TYPE string;
|
||||
DEFINE FIELD disk_size_gb ON TABLE deleted_vm TYPE int;
|
||||
DEFINE FIELD vcpus ON TABLE deleted_vm TYPE int;
|
||||
DEFINE FIELD memory_mb ON TABLE deleted_vm TYPE int;
|
||||
DEFINE FIELD dtrfs_sha ON TABLE deleted_vm TYPE string;
|
||||
DEFINE FIELD kernel_sha ON TABLE deleted_vm TYPE string;
|
||||
DEFINE FIELD created_at ON TABLE deleted_vm TYPE datetime;
|
||||
DEFINE FIELD deleted_at ON TABLE deleted_vm TYPE datetime;
|
||||
DEFINE FIELD price_per_unit ON TABLE deleted_vm TYPE int;
|
||||
|
||||
DEFINE TABLE app_node SCHEMAFULL;
|
||||
DEFINE FIELD operator ON TABLE app_node TYPE record<account>;
|
||||
DEFINE FIELD country ON TABLE app_node TYPE string;
|
||||
DEFINE FIELD region ON TABLE app_node TYPE string;
|
||||
DEFINE FIELD city ON TABLE app_node TYPE string;
|
||||
@ -49,22 +89,36 @@ DEFINE FIELD max_ports_per_app ON TABLE app_node TYPE int;
|
||||
DEFINE FIELD price ON TABLE app_node TYPE int;
|
||||
DEFINE FIELD offline_minutes ON TABLE app_node TYPE int;
|
||||
|
||||
DEFINE TABLE app_contract TYPE RELATION FROM account TO app_node SCHEMAFULL;
|
||||
DEFINE FIELD state ON TABLE app_contract TYPE string;
|
||||
DEFINE FIELD app_name ON TABLE app_contract TYPE string;
|
||||
DEFINE FIELD mapped_ports ON TABLE app_contract TYPE array<[int, int]>;
|
||||
DEFINE FIELD host_ipv4 ON TABLE app_contract TYPE string;
|
||||
DEFINE FIELD vcpus ON TABLE app_contract TYPE int;
|
||||
DEFINE FIELD memory_mb ON TABLE app_contract TYPE int;
|
||||
DEFINE FIELD disk_size_gb ON TABLE app_contract TYPE int;
|
||||
DEFINE FIELD created_at ON TABLE app_contract TYPE datetime;
|
||||
DEFINE FIELD updated_at ON TABLE app_contract TYPE datetime;
|
||||
DEFINE FIELD price_per_unit ON TABLE app_contract TYPE int;
|
||||
DEFINE FIELD locked_nano ON TABLE app_contract TYPE int;
|
||||
DEFINE FIELD collected_at ON TABLE app_contract TYPE datetime;
|
||||
DEFINE FIELD mr_enclave ON TABLE app_contract TYPE string;
|
||||
DEFINE FIELD package_url ON TABLE app_contract TYPE string;
|
||||
DEFINE FIELD hratls_pubkey ON TABLE app_contract TYPE string;
|
||||
DEFINE TABLE active_app TYPE RELATION FROM account TO app_node SCHEMAFULL;
|
||||
DEFINE FIELD app_name ON TABLE active_app TYPE string;
|
||||
DEFINE FIELD mapped_ports ON TABLE active_app TYPE array<[int, int]>;
|
||||
DEFINE FIELD host_ipv4 ON TABLE active_app TYPE string;
|
||||
DEFINE FIELD vcpus ON TABLE active_app TYPE int;
|
||||
DEFINE FIELD memory_mb ON TABLE active_app TYPE int;
|
||||
DEFINE FIELD disk_size_gb ON TABLE active_app TYPE int;
|
||||
DEFINE FIELD created_at ON TABLE active_app TYPE datetime;
|
||||
DEFINE FIELD price_per_unit ON TABLE active_app TYPE int;
|
||||
DEFINE FIELD locked_nano ON TABLE active_app TYPE int;
|
||||
DEFINE FIELD collected_at ON TABLE active_app TYPE datetime;
|
||||
DEFINE FIELD mr_enclave ON TABLE active_app TYPE string;
|
||||
DEFINE FIELD package_url ON TABLE active_app TYPE string;
|
||||
DEFINE FIELD hratls_pubkey ON TABLE active_app TYPE string;
|
||||
|
||||
DEFINE TABLE deleted_app TYPE RELATION FROM account TO app_node SCHEMAFULL;
|
||||
DEFINE FIELD app_name ON TABLE deleted_app TYPE string;
|
||||
DEFINE FIELD mapped_ports ON TABLE deleted_app TYPE array<[int, int]>;
|
||||
DEFINE FIELD host_ipv4 ON TABLE deleted_app TYPE string;
|
||||
DEFINE FIELD vcpus ON TABLE deleted_app TYPE int;
|
||||
DEFINE FIELD memory_mb ON TABLE deleted_app TYPE int;
|
||||
DEFINE FIELD disk_size_gb ON TABLE deleted_app TYPE int;
|
||||
DEFINE FIELD created_at ON TABLE deleted_app TYPE datetime;
|
||||
DEFINE FIELD deleted_at ON TABLE deleted_app TYPE datetime;
|
||||
DEFINE FIELD price_per_unit ON TABLE deleted_app TYPE int;
|
||||
DEFINE FIELD locked_nano ON TABLE deleted_app TYPE int;
|
||||
DEFINE FIELD collected_at ON TABLE deleted_app TYPE datetime;
|
||||
DEFINE FIELD mr_enclave ON TABLE deleted_app TYPE string;
|
||||
DEFINE FIELD package_url ON TABLE deleted_app TYPE string;
|
||||
DEFINE FIELD hratls_pubkey ON TABLE deleted_app TYPE string;
|
||||
|
||||
DEFINE TABLE ban TYPE RELATION FROM account TO account;
|
||||
DEFINE FIELD created_at ON TABLE ban TYPE datetime;
|
||||
@ -72,10 +126,8 @@ DEFINE FIELD created_at ON TABLE ban TYPE datetime;
|
||||
DEFINE TABLE kick TYPE RELATION FROM account TO account;
|
||||
DEFINE FIELD created_at ON TABLE kick TYPE datetime;
|
||||
DEFINE FIELD reason ON TABLE kick TYPE string;
|
||||
DEFINE FIELD contract ON TABLE kick TYPE record<vm_contract|app_contract>;
|
||||
DEFINE FIELD contract ON TABLE kick TYPE record<deleted_vm|deleted_app>;
|
||||
|
||||
DEFINE TABLE report TYPE RELATION FROM account TO vm_node|app_node;
|
||||
DEFINE FIELD created_at ON TABLE ban TYPE datetime;
|
||||
DEFINE FIELD reason ON TABLE ban TYPE string;
|
||||
|
||||
DEFINE TABLE operator TYPE RELATION FROM account TO vm_node|app_node;
|
||||
DEFINE FIELD created_at ON TABLE report TYPE datetime;
|
||||
DEFINE FIELD reason ON TABLE report TYPE string;
|
||||
|
@ -129,7 +129,7 @@ operators:
|
||||
app_nodes: []
|
||||
7V3rEuh6j8VuwMVB5PyGqWKLmjJ4fYSv6WtrTL51NZTB:
|
||||
escrow: 0
|
||||
email: ""
|
||||
email: ''
|
||||
banned_users: []
|
||||
vm_nodes: []
|
||||
app_nodes:
|
||||
@ -238,7 +238,7 @@ vm_contracts:
|
||||
node_pubkey: Du3UfPSUUZmA5thQmc9Vrxdy7UimpygcpDsQNnwRQPtu
|
||||
exposed_ports: []
|
||||
public_ipv4: 156.146.63.216
|
||||
public_ipv6: ""
|
||||
public_ipv6: ''
|
||||
disk_size_gb: 10
|
||||
vcpus: 2
|
||||
memory_mb: 3000
|
||||
@ -255,7 +255,7 @@ vm_contracts:
|
||||
node_pubkey: 7Xw3RxbP5pvfjZ8U6yA3HHVSS9YXjKH5Vkas3JRbQYd9
|
||||
exposed_ports: []
|
||||
public_ipv4: 173.234.136.154
|
||||
public_ipv6: ""
|
||||
public_ipv6: ''
|
||||
disk_size_gb: 10
|
||||
vcpus: 2
|
||||
memory_mb: 3000
|
||||
@ -272,8 +272,8 @@ vm_contracts:
|
||||
node_pubkey: 7Xw3RxbP5pvfjZ8U6yA3HHVSS9YXjKH5Vkas3JRbQYd9
|
||||
exposed_ports:
|
||||
- 38288
|
||||
public_ipv4: ""
|
||||
public_ipv6: ""
|
||||
public_ipv4: ''
|
||||
public_ipv6: ''
|
||||
disk_size_gb: 10
|
||||
vcpus: 1
|
||||
memory_mb: 1000
|
||||
@ -290,7 +290,7 @@ vm_contracts:
|
||||
node_pubkey: DgkbsrwttkZXvzxY5kDwQQoDd79GLmZ5tc7fYJUFkQQb
|
||||
exposed_ports: []
|
||||
public_ipv4: 149.22.95.2
|
||||
public_ipv6: ""
|
||||
public_ipv6: ''
|
||||
disk_size_gb: 10
|
||||
vcpus: 2
|
||||
memory_mb: 3000
|
||||
@ -307,7 +307,7 @@ vm_contracts:
|
||||
node_pubkey: Du3UfPSUUZmA5thQmc9Vrxdy7UimpygcpDsQNnwRQPtu
|
||||
exposed_ports: []
|
||||
public_ipv4: 156.146.63.217
|
||||
public_ipv6: ""
|
||||
public_ipv6: ''
|
||||
disk_size_gb: 10
|
||||
vcpus: 2
|
||||
memory_mb: 3000
|
||||
@ -324,7 +324,7 @@ vm_contracts:
|
||||
node_pubkey: DgkbsrwttkZXvzxY5kDwQQoDd79GLmZ5tc7fYJUFkQQb
|
||||
exposed_ports: []
|
||||
public_ipv4: 149.22.95.2
|
||||
public_ipv6: ""
|
||||
public_ipv6: ''
|
||||
disk_size_gb: 30
|
||||
vcpus: 1
|
||||
memory_mb: 1000
|
||||
@ -341,7 +341,7 @@ vm_contracts:
|
||||
node_pubkey: 3zRxiGRnf46vd3zAEmpaYBJocTV9oJB6yXf5GZFR1Sq4
|
||||
exposed_ports: []
|
||||
public_ipv4: 149.36.48.100
|
||||
public_ipv6: ""
|
||||
public_ipv6: ''
|
||||
disk_size_gb: 10
|
||||
vcpus: 4
|
||||
memory_mb: 4000
|
||||
@ -358,8 +358,8 @@ vm_contracts:
|
||||
node_pubkey: 3zRxiGRnf46vd3zAEmpaYBJocTV9oJB6yXf5GZFR1Sq4
|
||||
exposed_ports:
|
||||
- 46393
|
||||
public_ipv4: ""
|
||||
public_ipv6: ""
|
||||
public_ipv4: ''
|
||||
public_ipv6: ''
|
||||
disk_size_gb: 10
|
||||
vcpus: 1
|
||||
memory_mb: 1000
|
||||
@ -384,57 +384,4 @@ app_nodes:
|
||||
max_ports_per_app: 9
|
||||
price: 20000
|
||||
offline_minutes: 0
|
||||
app_contracts:
|
||||
- uuid: e3d01f25-2b2a-410b-80e3-12f44e474334
|
||||
package_url: https://registry.detee.ltd/sgx/packages/base_package_2025-04-17_11-01-08.tar.gz
|
||||
admin_pubkey: H21Shi4iE7vgfjWEQNvzmpmBMJSaiZ17PYUcdNoAoKNc
|
||||
node_pubkey: BiqoPUEoAxYxMRXUmyofoS9H1TBQgQqvLJ6MbWh88AQg
|
||||
mapped_ports:
|
||||
- - 27158
|
||||
- 34500
|
||||
- - 28667
|
||||
- 8080
|
||||
host_ipv4: 212.95.45.139
|
||||
disk_size_mb: 1000
|
||||
vcpus: 1
|
||||
memory_mb: 1000
|
||||
created_at: 2025-04-21T11:27:28.833236909Z
|
||||
updated_at: 2025-04-21T11:27:28.833237729Z
|
||||
price_per_unit: 200000
|
||||
locked_nano: 121200000
|
||||
collected_at: 2025-04-21T11:28:24.905665571Z
|
||||
hratls_pubkey: 7E0F887AA6BB9104EEC1066F454D4C2D9063D676715F55F919D3FBCEDC63240B
|
||||
public_package_mr_enclave:
|
||||
- 52
|
||||
- 183
|
||||
- 102
|
||||
- 210
|
||||
- 251
|
||||
- 219
|
||||
- 218
|
||||
- 140
|
||||
- 168
|
||||
- 118
|
||||
- 10
|
||||
- 193
|
||||
- 98
|
||||
- 240
|
||||
- 147
|
||||
- 124
|
||||
- 240
|
||||
- 189
|
||||
- 46
|
||||
- 95
|
||||
- 138
|
||||
- 172
|
||||
- 15
|
||||
- 246
|
||||
- 227
|
||||
- 114
|
||||
- 70
|
||||
- 159
|
||||
- 232
|
||||
- 212
|
||||
- 9
|
||||
- 234
|
||||
app_name: diligent-seahorse
|
||||
app_contracts: []
|
||||
|
@ -1,7 +1,7 @@
|
||||
use detee_shared::general_proto::brain_general_cli_server::BrainGeneralCliServer;
|
||||
use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCliServer;
|
||||
use surreal_brain::grpc::BrainGeneralCliMock;
|
||||
use surreal_brain::grpc::BrainVmCliMock;
|
||||
use surreal_brain::grpc::BrainGeneralCliForReal;
|
||||
use surreal_brain::grpc::BrainVmCliForReal;
|
||||
use surreal_brain::db;
|
||||
use tonic::transport::{Identity, Server, ServerTlsConfig};
|
||||
|
||||
@ -11,8 +11,8 @@ async fn main() {
|
||||
db::init().await.unwrap();
|
||||
let addr = "0.0.0.0:31337".parse().unwrap();
|
||||
|
||||
let snp_cli_server = BrainVmCliServer::new(BrainVmCliMock {});
|
||||
let general_service_server = BrainGeneralCliServer::new(BrainGeneralCliMock {});
|
||||
let snp_cli_server = BrainVmCliServer::new(BrainVmCliForReal {});
|
||||
let general_service_server = BrainGeneralCliServer::new(BrainGeneralCliForReal {});
|
||||
|
||||
let cert = std::fs::read_to_string("./tmp/brain-crt.pem").unwrap();
|
||||
let key = std::fs::read_to_string("./tmp/brain-key.pem").unwrap();
|
||||
|
462
src/db.rs
462
src/db.rs
@ -5,19 +5,25 @@ use surrealdb::{
|
||||
engine::remote::ws::{Client, Ws},
|
||||
opt::auth::Root,
|
||||
sql::Datetime,
|
||||
RecordId, Surreal,
|
||||
Notification, RecordId, Surreal,
|
||||
};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio_stream::StreamExt as _;
|
||||
|
||||
static DB: LazyLock<Surreal<Client>> = LazyLock::new(Surreal::init);
|
||||
const ACCOUNT: &str = "account";
|
||||
const OPERATOR: &str = "operator";
|
||||
const VM_CONTRACT: &str = "vm_contract";
|
||||
const VM_NODE: &str = "vm_node";
|
||||
pub const ACCOUNT: &str = "account";
|
||||
pub const VM_NODE: &str = "vm_node";
|
||||
pub const ACTIVE_VM: &str = "active_vm";
|
||||
pub const NEW_VM_REQ: &str = "new_vm_req";
|
||||
pub const UPDATE_VM_REQ: &str = "update_vm_req";
|
||||
pub const DELETED_VM: &str = "deleted_vm";
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error(transparent)]
|
||||
#[error("Internal DB error: {0}")]
|
||||
DataBase(#[from] surrealdb::Error),
|
||||
#[error("Daemon channel got closed: {0}")]
|
||||
DaemonConnection(#[from] tokio::sync::mpsc::error::SendError<DaemonNotification>),
|
||||
}
|
||||
|
||||
pub async fn init() -> surrealdb::Result<()> {
|
||||
@ -32,9 +38,7 @@ pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()
|
||||
let accounts: Vec<Account> = old_data.into();
|
||||
let vm_nodes: Vec<VmNode> = old_data.into();
|
||||
let app_nodes: Vec<AppNode> = old_data.into();
|
||||
let vm_contracts: Vec<VmContract> = old_data.into();
|
||||
let operators: Vec<OperatorRelation> = old_data.into();
|
||||
let app_contracts: Vec<AppContract> = old_data.into();
|
||||
let vm_contracts: Vec<ActiveVm> = old_data.into();
|
||||
|
||||
init().await?;
|
||||
|
||||
@ -45,11 +49,7 @@ pub async fn migration0(old_data: &old_brain::BrainData) -> surrealdb::Result<()
|
||||
println!("Inserting app nodes...");
|
||||
let _: Vec<AppNode> = DB.insert(()).content(app_nodes).await?;
|
||||
println!("Inserting vm contracts...");
|
||||
let _: Vec<VmContract> = DB.insert("vm_contract").relation(vm_contracts).await?;
|
||||
println!("Inserting app contracts...");
|
||||
let _: Vec<AppContract> = DB.insert("app_contract").relation(app_contracts).await?;
|
||||
println!("Inserting operators...");
|
||||
let _: Vec<OperatorRelation> = DB.insert(OPERATOR).relation(operators).await?;
|
||||
let _: Vec<ActiveVm> = DB.insert("vm_contract").relation(vm_contracts).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -88,6 +88,7 @@ impl Account {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct VmNode {
|
||||
pub id: RecordId,
|
||||
pub operator: RecordId,
|
||||
pub country: String,
|
||||
pub region: String,
|
||||
pub city: String,
|
||||
@ -103,14 +104,87 @@ pub struct VmNode {
|
||||
pub offline_minutes: u64,
|
||||
}
|
||||
|
||||
impl VmNode {
|
||||
pub async fn register(self) -> Result<(), Error> {
|
||||
let _: Option<VmNode> = DB.upsert(self.id.clone()).content(self).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct VmContract {
|
||||
pub struct VmNodeWithReports {
|
||||
pub id: RecordId,
|
||||
pub operator: RecordId,
|
||||
pub country: String,
|
||||
pub region: String,
|
||||
pub city: String,
|
||||
pub ip: String,
|
||||
pub avail_mem_mb: u32,
|
||||
pub avail_vcpus: u32,
|
||||
pub avail_storage_gbs: u32,
|
||||
pub avail_ipv4: u32,
|
||||
pub avail_ipv6: u32,
|
||||
pub avail_ports: u32,
|
||||
pub max_ports_per_vm: u32,
|
||||
pub price: u64,
|
||||
pub offline_minutes: u64,
|
||||
pub reports: Vec<Report>,
|
||||
}
|
||||
|
||||
pub enum DaemonNotification {
|
||||
Create(NewVmReq),
|
||||
Update(UpdateVmReq),
|
||||
Delete(DeletedVm),
|
||||
}
|
||||
|
||||
impl From<NewVmReq> for DaemonNotification {
|
||||
fn from(value: NewVmReq) -> Self {
|
||||
Self::Create(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<UpdateVmReq> for DaemonNotification {
|
||||
fn from(value: UpdateVmReq) -> Self {
|
||||
Self::Update(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DeletedVm> for DaemonNotification {
|
||||
fn from(value: DeletedVm) -> Self {
|
||||
Self::Delete(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct NewVmReq {
|
||||
pub id: RecordId,
|
||||
#[serde(rename = "in")]
|
||||
pub admin: RecordId,
|
||||
#[serde(rename = "out")]
|
||||
pub vm_node: RecordId,
|
||||
pub hostname: String,
|
||||
pub extra_ports: Vec<u32>,
|
||||
pub public_ipv4: bool,
|
||||
pub public_ipv6: bool,
|
||||
pub disk_size_gb: u32,
|
||||
pub vcpus: u32,
|
||||
pub memory_mb: u32,
|
||||
pub dtrfs_url: String,
|
||||
pub dtrfs_sha: String,
|
||||
pub kernel_sha: String,
|
||||
pub kernel_url: String,
|
||||
pub created_at: Datetime,
|
||||
pub price_per_unit: u64,
|
||||
pub locked_nano: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ActiveVm {
|
||||
pub id: RecordId,
|
||||
#[serde(rename = "in")]
|
||||
pub admin: RecordId,
|
||||
#[serde(rename = "out")]
|
||||
pub vm_node: RecordId,
|
||||
pub state: String,
|
||||
pub hostname: String,
|
||||
pub mapped_ports: Vec<(u32, u32)>,
|
||||
pub public_ipv4: String,
|
||||
@ -121,13 +195,124 @@ pub struct VmContract {
|
||||
pub dtrfs_sha: String,
|
||||
pub kernel_sha: String,
|
||||
pub created_at: Datetime,
|
||||
pub updated_at: Datetime,
|
||||
pub price_per_unit: u64,
|
||||
pub locked_nano: u64,
|
||||
pub collected_at: Datetime,
|
||||
}
|
||||
|
||||
impl VmContract {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct UpdateVmReq {
|
||||
pub id: RecordId,
|
||||
#[serde(rename = "in")]
|
||||
pub admin: RecordId,
|
||||
#[serde(rename = "out")]
|
||||
pub vm_node: RecordId,
|
||||
pub disk_size_gb: u32,
|
||||
pub vcpus: u32,
|
||||
pub memory_mb: u32,
|
||||
pub dtrfs_url: String,
|
||||
pub dtrfs_sha: String,
|
||||
pub kernel_sha: String,
|
||||
pub kernel_url: String,
|
||||
pub created_at: Datetime,
|
||||
pub price_per_unit: u64,
|
||||
pub locked_nano: u64,
|
||||
}
|
||||
|
||||
pub async fn listen_for_node<T: Into<DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>>(
|
||||
node: &str,
|
||||
tx: Sender<DaemonNotification>,
|
||||
) -> Result<(), Error> {
|
||||
let table_name = match std::any::type_name::<T>() {
|
||||
"surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(),
|
||||
"surreal_brain::db::UpdateVmReq" => UPDATE_VM_REQ.to_string(),
|
||||
"surreal_brain::db::DeletedVm" => DELETED_VM.to_string(),
|
||||
wat => {
|
||||
log::error!("listen_for_node: T has type {wat}");
|
||||
String::from("wat")
|
||||
},
|
||||
};
|
||||
let mut resp =
|
||||
DB.query(format!("live select * from {table_name} where out = vm_node:{node};")).await?;
|
||||
let mut live_stream = resp.stream::<Notification<T>>(0)?;
|
||||
while let Some(result) = live_stream.next().await {
|
||||
match result {
|
||||
Ok(notification) => match notification.action {
|
||||
surrealdb::Action::Create => tx.send(notification.data.into()).await?,
|
||||
_ => {}
|
||||
},
|
||||
Err(e) => {
|
||||
log::warn!("listen_for_deletion DB stream failed for {node}: {e}");
|
||||
return Err(Error::from(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct DeletedVm {
|
||||
pub id: RecordId,
|
||||
#[serde(rename = "in")]
|
||||
pub admin: RecordId,
|
||||
#[serde(rename = "out")]
|
||||
pub vm_node: RecordId,
|
||||
pub hostname: String,
|
||||
pub mapped_ports: Vec<(u32, u32)>,
|
||||
pub public_ipv4: String,
|
||||
pub public_ipv6: String,
|
||||
pub disk_size_gb: u32,
|
||||
pub vcpus: u32,
|
||||
pub memory_mb: u32,
|
||||
pub dtrfs_sha: String,
|
||||
pub kernel_sha: String,
|
||||
pub created_at: Datetime,
|
||||
pub deleted_at: Datetime,
|
||||
pub price_per_unit: u64,
|
||||
}
|
||||
|
||||
impl DeletedVm {
|
||||
pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> {
|
||||
let contract: Option<Self> =
|
||||
DB.query(format!("select * from {DELETED_VM}:{uuid};")).await?.take(0)?;
|
||||
Ok(contract)
|
||||
}
|
||||
|
||||
pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result =
|
||||
DB.query(format!("select * from {DELETED_VM} where in = {ACCOUNT}:{admin};")).await?;
|
||||
let contracts: Vec<Self> = result.take(0)?;
|
||||
Ok(contracts)
|
||||
}
|
||||
|
||||
pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result =
|
||||
DB.query(format!("select * from {DELETED_VM} where out = {VM_NODE}:{admin};")).await?;
|
||||
let contracts: Vec<Self> = result.take(0)?;
|
||||
Ok(contracts)
|
||||
}
|
||||
|
||||
pub async fn list_by_operator(operator: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result = DB
|
||||
.query(format!(
|
||||
"select
|
||||
(select * from ->operator->vm_node<-{DELETED_VM}) as contracts
|
||||
from {ACCOUNT}:{operator};"
|
||||
))
|
||||
.await?;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Wrapper {
|
||||
contracts: Vec<DeletedVm>,
|
||||
}
|
||||
|
||||
let c: Option<Wrapper> = result.take(0)?;
|
||||
match c {
|
||||
Some(c) => Ok(c.contracts),
|
||||
None => Ok(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// total hardware units of this VM
|
||||
fn total_units(&self) -> u64 {
|
||||
// TODO: Optimize this based on price of hardware.
|
||||
@ -145,8 +330,27 @@ impl VmContract {
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveVm {
|
||||
/// total hardware units of this VM
|
||||
fn total_units(&self) -> u64 {
|
||||
// TODO: Optimize this based on price of hardware.
|
||||
// I tried, but this can be done better.
|
||||
// Storage cost should also be based on tier
|
||||
(self.vcpus as u64 * 10)
|
||||
+ ((self.memory_mb + 256) as u64 / 200)
|
||||
+ (self.disk_size_gb as u64 / 10)
|
||||
+ (!self.public_ipv4.is_empty() as u64 * 10)
|
||||
}
|
||||
|
||||
/// Returns price per minute in nanoLP
|
||||
pub fn price_per_minute(&self) -> u64 {
|
||||
self.total_units() * self.price_per_unit
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct VmContractWithNode {
|
||||
pub struct ActiveVmWithNode {
|
||||
pub id: RecordId,
|
||||
#[serde(rename = "in")]
|
||||
pub admin: RecordId,
|
||||
@ -169,16 +373,24 @@ pub struct VmContractWithNode {
|
||||
pub collected_at: Datetime,
|
||||
}
|
||||
|
||||
impl VmContractWithNode {
|
||||
impl ActiveVmWithNode {
|
||||
pub async fn get_by_uuid(uuid: &str) -> Result<Option<Self>, Error> {
|
||||
let contract: Option<Self> =
|
||||
DB.query(format!("select * from {VM_CONTRACT}:{uuid} fetch out;")).await?.take(0)?;
|
||||
DB.query(format!("select * from {ACTIVE_VM}:{uuid} fetch out;")).await?.take(0)?;
|
||||
Ok(contract)
|
||||
}
|
||||
|
||||
pub async fn list_by_admin(admin: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result = DB
|
||||
.query(format!("select * from {VM_CONTRACT} where in = {ACCOUNT}:{admin} fetch out;"))
|
||||
.query(format!("select * from {ACTIVE_VM} where in = {ACCOUNT}:{admin} fetch out;"))
|
||||
.await?;
|
||||
let contracts: Vec<Self> = result.take(0)?;
|
||||
Ok(contracts)
|
||||
}
|
||||
|
||||
pub async fn list_by_node(admin: &str) -> Result<Vec<Self>, Error> {
|
||||
let mut result = DB
|
||||
.query(format!("select * from {ACTIVE_VM} where out = {VM_NODE}:{admin} fetch out;"))
|
||||
.await?;
|
||||
let contracts: Vec<Self> = result.take(0)?;
|
||||
Ok(contracts)
|
||||
@ -188,14 +400,14 @@ impl VmContractWithNode {
|
||||
let mut result = DB
|
||||
.query(format!(
|
||||
"select
|
||||
(select * from ->operator->vm_node<-vm_contract fetch out) as contracts
|
||||
(select * from ->operator->vm_node<-{ACTIVE_VM} fetch out) as contracts
|
||||
from {ACCOUNT}:{operator};"
|
||||
))
|
||||
.await?;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Wrapper {
|
||||
contracts: Vec<VmContractWithNode>,
|
||||
contracts: Vec<ActiveVmWithNode>,
|
||||
}
|
||||
|
||||
let c: Option<Wrapper> = result.take(0)?;
|
||||
@ -224,18 +436,37 @@ impl VmContractWithNode {
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AppNode {
|
||||
id: RecordId,
|
||||
country: String,
|
||||
region: String,
|
||||
city: String,
|
||||
ip: String,
|
||||
avail_mem_mb: u32,
|
||||
avail_vcpus: u32,
|
||||
avail_storage_gbs: u32,
|
||||
avail_ports: u32,
|
||||
max_ports_per_app: u32,
|
||||
price: u64,
|
||||
offline_minutes: u64,
|
||||
pub id: RecordId,
|
||||
pub operator: RecordId,
|
||||
pub country: String,
|
||||
pub region: String,
|
||||
pub city: String,
|
||||
pub ip: String,
|
||||
pub avail_mem_mb: u32,
|
||||
pub avail_vcpus: u32,
|
||||
pub avail_storage_gbs: u32,
|
||||
pub avail_ports: u32,
|
||||
pub max_ports_per_app: u32,
|
||||
pub price: u64,
|
||||
pub offline_minutes: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AppNodeWithReports {
|
||||
pub id: RecordId,
|
||||
pub operator: RecordId,
|
||||
pub country: String,
|
||||
pub region: String,
|
||||
pub city: String,
|
||||
pub ip: String,
|
||||
pub avail_mem_mb: u32,
|
||||
pub avail_vcpus: u32,
|
||||
pub avail_storage_gbs: u32,
|
||||
pub avail_ports: u32,
|
||||
pub max_ports_per_app: u32,
|
||||
pub price: u64,
|
||||
pub offline_minutes: u64,
|
||||
pub reports: Vec<Report>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@ -247,11 +478,11 @@ pub struct AppContract {
|
||||
app_node: RecordId,
|
||||
state: String,
|
||||
app_name: String,
|
||||
mapped_ports: Vec<(u32, u32)>,
|
||||
mapped_ports: Vec<(u64, u64)>,
|
||||
host_ipv4: String,
|
||||
vcpus: u32,
|
||||
memory_mb: u32,
|
||||
disk_size_gb: u32,
|
||||
vcpus: u64,
|
||||
memory_mb: u64,
|
||||
disk_size_gb: u64,
|
||||
created_at: Datetime,
|
||||
updated_at: Datetime,
|
||||
price_per_unit: u64,
|
||||
@ -291,7 +522,7 @@ pub struct Report {
|
||||
#[serde(rename = "out")]
|
||||
to_node: RecordId,
|
||||
created_at: Datetime,
|
||||
reason: String,
|
||||
pub reason: String,
|
||||
}
|
||||
|
||||
impl Report {
|
||||
@ -309,23 +540,6 @@ impl Report {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct OperatorRelation {
|
||||
#[serde(rename = "in")]
|
||||
pub account: RecordId,
|
||||
#[serde(rename = "out")]
|
||||
pub node: RecordId,
|
||||
}
|
||||
|
||||
impl OperatorRelation {
|
||||
fn new(account: &str, vm_node: &str) -> Self {
|
||||
Self {
|
||||
account: RecordId::from(("account", account.to_string())),
|
||||
node: RecordId::from(("vm_node", vm_node.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is the operator obtained from the DB,
|
||||
/// however the relation is defined using OperatorRelation
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@ -342,21 +556,62 @@ impl Operator {
|
||||
pub async fn list() -> Result<Vec<Self>, Error> {
|
||||
let mut result = DB
|
||||
.query(format!(
|
||||
"select *,
|
||||
in as account,
|
||||
<-account.email[0] as email,
|
||||
<-account.escrow[0] as escrow,
|
||||
count(->vm_node) as vm_nodes,
|
||||
count(->app_node) as app_nodes,
|
||||
(select in from <-account->operator->vm_node<-report).len() +
|
||||
(select in from <-account->operator->app_node<-report).len()
|
||||
as reports
|
||||
from operator group by in;"
|
||||
"array::distinct(array::flatten( [
|
||||
(select operator from vm_node group by operator).operator,
|
||||
(select operator from app_node group by operator).operator
|
||||
]));"
|
||||
))
|
||||
.await?;
|
||||
let operators: Vec<Self> = result.take(0)?;
|
||||
let operator_accounts: Vec<RecordId> = result.take(0)?;
|
||||
let mut operators: Vec<Self> = Vec::new();
|
||||
for account in operator_accounts.iter() {
|
||||
if let Some(operator) = Self::inspect(&account.key().to_string()).await? {
|
||||
operators.push(operator);
|
||||
}
|
||||
}
|
||||
Ok(operators)
|
||||
}
|
||||
|
||||
pub async fn inspect(account: &str) -> Result<Option<Self>, Error> {
|
||||
let mut result = DB
|
||||
.query(format!(
|
||||
"$vm_nodes = (select id from vm_node where operator = account:{account}).id;
|
||||
$app_nodes = (select id from app_node where operator = account:{account}).id;
|
||||
select *,
|
||||
id as account,
|
||||
email,
|
||||
escrow,
|
||||
$vm_nodes.len() as vm_nodes,
|
||||
$app_nodes.len() as app_nodes,
|
||||
(select id from report where $vm_nodes contains out).len() +
|
||||
(select id from report where $app_nodes contains out).len()
|
||||
as reports
|
||||
from account where id = account:{account};"
|
||||
))
|
||||
.await?;
|
||||
let operator: Option<Self> = result.take(2)?;
|
||||
Ok(operator)
|
||||
}
|
||||
|
||||
pub async fn inspect_nodes(
|
||||
account: &str,
|
||||
) -> Result<(Option<Self>, Vec<VmNodeWithReports>, Vec<AppNodeWithReports>), Error> {
|
||||
let operator = Self::inspect(account).await?;
|
||||
let mut result = DB
|
||||
.query(format!(
|
||||
"select *, operator, <-report.* as reports from vm_node
|
||||
where operator = account:{account};"
|
||||
))
|
||||
.query(format!(
|
||||
"select *, operator, <-report.* as reports from app_node
|
||||
where operator = account:{account};"
|
||||
))
|
||||
.await?;
|
||||
let vm_nodes: Vec<VmNodeWithReports> = result.take(0)?;
|
||||
let app_nodes: Vec<AppNodeWithReports> = result.take(1)?;
|
||||
|
||||
Ok((operator, vm_nodes, app_nodes))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: delete all of these From implementation after migration 0 gets executed
|
||||
@ -366,7 +621,8 @@ impl From<&old_brain::BrainData> for Vec<VmNode> {
|
||||
let mut nodes = Vec::new();
|
||||
for old_node in old_data.vm_nodes.iter() {
|
||||
nodes.push(VmNode {
|
||||
id: RecordId::from(("vm_node", old_node.public_key.clone())),
|
||||
id: RecordId::from((VM_NODE, old_node.public_key.clone())),
|
||||
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
|
||||
country: old_node.country.clone(),
|
||||
region: old_node.region.clone(),
|
||||
city: old_node.city.clone(),
|
||||
@ -386,7 +642,7 @@ impl From<&old_brain::BrainData> for Vec<VmNode> {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&old_brain::BrainData> for Vec<VmContract> {
|
||||
impl From<&old_brain::BrainData> for Vec<ActiveVm> {
|
||||
fn from(old_data: &old_brain::BrainData) -> Self {
|
||||
let mut contracts = Vec::new();
|
||||
for old_c in old_data.vm_contracts.iter() {
|
||||
@ -394,11 +650,10 @@ impl From<&old_brain::BrainData> for Vec<VmContract> {
|
||||
for port in old_c.exposed_ports.iter() {
|
||||
mapped_ports.push((*port, 8080 as u32));
|
||||
}
|
||||
contracts.push(VmContract {
|
||||
id: RecordId::from((VM_CONTRACT, old_c.uuid.replace("-", ""))),
|
||||
contracts.push(ActiveVm {
|
||||
id: RecordId::from((ACTIVE_VM, old_c.uuid.replace("-", ""))),
|
||||
admin: RecordId::from((ACCOUNT, old_c.admin_pubkey.clone())),
|
||||
vm_node: RecordId::from((VM_NODE, old_c.node_pubkey.clone())),
|
||||
state: "active".to_string(),
|
||||
hostname: old_c.hostname.clone(),
|
||||
mapped_ports,
|
||||
public_ipv4: old_c.public_ipv4.clone(),
|
||||
@ -411,7 +666,6 @@ impl From<&old_brain::BrainData> for Vec<VmContract> {
|
||||
price_per_unit: old_c.price_per_unit,
|
||||
locked_nano: old_c.locked_nano,
|
||||
created_at: old_c.created_at.into(),
|
||||
updated_at: old_c.updated_at.into(),
|
||||
collected_at: old_c.collected_at.into(),
|
||||
});
|
||||
}
|
||||
@ -425,6 +679,7 @@ impl From<&old_brain::BrainData> for Vec<AppNode> {
|
||||
for old_node in old_data.app_nodes.iter() {
|
||||
nodes.push(AppNode {
|
||||
id: RecordId::from(("app_node", old_node.node_pubkey.clone())),
|
||||
operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())),
|
||||
country: old_node.country.clone(),
|
||||
region: old_node.region.clone(),
|
||||
city: old_node.city.clone(),
|
||||
@ -442,48 +697,6 @@ impl From<&old_brain::BrainData> for Vec<AppNode> {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&old_brain::BrainData> for Vec<AppContract> {
|
||||
fn from(old_data: &old_brain::BrainData) -> Self {
|
||||
let mut contracts = Vec::new();
|
||||
for old_c in old_data.app_contracts.iter() {
|
||||
let mut mapped_ports = Vec::new();
|
||||
for port in old_c.mapped_ports.clone().into_iter().map(|(b, c)| (b as u32, c as u32)) {
|
||||
mapped_ports.push(port);
|
||||
}
|
||||
|
||||
let mr_enclave_hex = old_c
|
||||
.public_package_mr_enclave
|
||||
.clone()
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.map(|byte| format!("{:02X}", byte))
|
||||
.collect();
|
||||
|
||||
contracts.push(AppContract {
|
||||
id: RecordId::from(("app_contract", old_c.uuid.replace("-", ""))),
|
||||
admin: RecordId::from(("account", old_c.admin_pubkey.clone())),
|
||||
app_node: RecordId::from(("app_node", old_c.node_pubkey.clone())),
|
||||
state: "active".to_string(),
|
||||
mapped_ports,
|
||||
host_ipv4: old_c.host_ipv4.clone(),
|
||||
disk_size_gb: old_c.disk_size_mb * 1024,
|
||||
vcpus: old_c.vcpus,
|
||||
memory_mb: old_c.memory_mb,
|
||||
price_per_unit: old_c.price_per_unit,
|
||||
locked_nano: old_c.locked_nano,
|
||||
created_at: old_c.created_at.into(),
|
||||
updated_at: old_c.updated_at.into(),
|
||||
collected_at: old_c.collected_at.into(),
|
||||
app_name: old_c.app_name.clone(),
|
||||
mr_enclave: mr_enclave_hex,
|
||||
package_url: old_c.package_url.clone(),
|
||||
hratls_pubkey: old_c.hratls_pubkey.clone(),
|
||||
});
|
||||
}
|
||||
contracts
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&old_brain::BrainData> for Vec<Account> {
|
||||
fn from(old_data: &old_brain::BrainData) -> Self {
|
||||
let mut accounts = Vec::new();
|
||||
@ -504,18 +717,3 @@ impl From<&old_brain::BrainData> for Vec<Account> {
|
||||
accounts
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&old_brain::BrainData> for Vec<OperatorRelation> {
|
||||
fn from(old_data: &old_brain::BrainData) -> Self {
|
||||
let mut operator_entries = Vec::new();
|
||||
for operator in old_data.operators.clone() {
|
||||
for vm_node in operator.1.vm_nodes.iter() {
|
||||
operator_entries.push(OperatorRelation::new(&operator.0, vm_node));
|
||||
}
|
||||
for app_node in operator.1.app_nodes.iter() {
|
||||
operator_entries.push(OperatorRelation::new(&operator.0, app_node));
|
||||
}
|
||||
}
|
||||
operator_entries
|
||||
}
|
||||
}
|
||||
|
328
src/grpc.rs
328
src/grpc.rs
@ -1,6 +1,6 @@
|
||||
#![allow(dead_code)]
|
||||
use crate::db;
|
||||
use detee_shared::app_proto::AppContract;
|
||||
use detee_shared::app_proto::{AppContract, AppNodeListResp};
|
||||
use detee_shared::{
|
||||
common_proto::{Empty, Pubkey},
|
||||
general_proto::{
|
||||
@ -8,19 +8,20 @@ use detee_shared::{
|
||||
InspectOperatorResp, KickReq, KickResp, ListOperatorsResp, RegOperatorReq, ReportNodeReq,
|
||||
SlashReq,
|
||||
},
|
||||
vm_proto::{brain_vm_cli_server::BrainVmCli, ListVmContractsReq, *},
|
||||
vm_proto::{
|
||||
brain_vm_cli_server::BrainVmCli, brain_vm_daemon_server::BrainVmDaemon, ListVmContractsReq,
|
||||
*,
|
||||
},
|
||||
};
|
||||
|
||||
use log::info;
|
||||
use std::pin::Pin;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
// use tokio::sync::mpsc;
|
||||
// use tokio_stream::{wrappers::ReceiverStream, Stream};
|
||||
use tokio_stream::Stream;
|
||||
use tonic::{Request, Response, Status};
|
||||
use tokio_stream::{Stream, StreamExt};
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
|
||||
pub struct BrainGeneralCliMock {}
|
||||
pub struct BrainGeneralCliForReal {}
|
||||
|
||||
impl From<db::Account> for AccountBalance {
|
||||
fn from(account: db::Account) -> Self {
|
||||
@ -28,8 +29,74 @@ impl From<db::Account> for AccountBalance {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::VmContractWithNode> for VmContract {
|
||||
fn from(db_c: db::VmContractWithNode) -> Self {
|
||||
impl From<db::NewVmReq> for NewVmReq {
|
||||
fn from(new_vm_req: db::NewVmReq) -> Self {
|
||||
Self {
|
||||
uuid: new_vm_req.id.key().to_string(),
|
||||
hostname: new_vm_req.hostname,
|
||||
admin_pubkey: new_vm_req.admin.key().to_string(),
|
||||
node_pubkey: new_vm_req.vm_node.key().to_string(),
|
||||
extra_ports: new_vm_req.extra_ports,
|
||||
public_ipv4: new_vm_req.public_ipv4,
|
||||
public_ipv6: new_vm_req.public_ipv6,
|
||||
disk_size_gb: new_vm_req.disk_size_gb,
|
||||
vcpus: new_vm_req.vcpus,
|
||||
memory_mb: new_vm_req.memory_mb,
|
||||
kernel_url: new_vm_req.kernel_url,
|
||||
kernel_sha: new_vm_req.kernel_sha,
|
||||
dtrfs_url: new_vm_req.dtrfs_url,
|
||||
dtrfs_sha: new_vm_req.dtrfs_sha,
|
||||
price_per_unit: new_vm_req.price_per_unit,
|
||||
locked_nano: new_vm_req.locked_nano,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::UpdateVmReq> for UpdateVmReq {
|
||||
fn from(update_vm_req: db::UpdateVmReq) -> Self {
|
||||
Self {
|
||||
uuid: update_vm_req.id.key().to_string(),
|
||||
// daemon does not care about VM hostname
|
||||
hostname: String::new(),
|
||||
admin_pubkey: update_vm_req.admin.key().to_string(),
|
||||
disk_size_gb: update_vm_req.disk_size_gb,
|
||||
vcpus: update_vm_req.vcpus,
|
||||
memory_mb: update_vm_req.memory_mb,
|
||||
kernel_url: update_vm_req.kernel_url,
|
||||
kernel_sha: update_vm_req.kernel_sha,
|
||||
dtrfs_url: update_vm_req.dtrfs_url,
|
||||
dtrfs_sha: update_vm_req.dtrfs_sha,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::DeletedVm> for DeleteVmReq {
|
||||
fn from(delete_vm_req: db::DeletedVm) -> Self {
|
||||
Self {
|
||||
uuid: delete_vm_req.id.key().to_string(),
|
||||
admin_pubkey: delete_vm_req.admin.key().to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::DaemonNotification> for BrainVmMessage {
|
||||
fn from(notification: db::DaemonNotification) -> Self {
|
||||
match notification {
|
||||
db::DaemonNotification::Create(new_vm_req) => {
|
||||
BrainVmMessage { msg: Some(brain_vm_message::Msg::NewVmReq(new_vm_req.into())) }
|
||||
}
|
||||
db::DaemonNotification::Update(update_vm_req) => BrainVmMessage {
|
||||
msg: Some(brain_vm_message::Msg::UpdateVmReq(update_vm_req.into())),
|
||||
},
|
||||
db::DaemonNotification::Delete(deleted_vm) => {
|
||||
BrainVmMessage { msg: Some(brain_vm_message::Msg::DeleteVm(deleted_vm.into())) }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::ActiveVmWithNode> for VmContract {
|
||||
fn from(db_c: db::ActiveVmWithNode) -> Self {
|
||||
let mut exposed_ports = Vec::new();
|
||||
for port in db_c.mapped_ports.iter() {
|
||||
exposed_ports.push(port.0);
|
||||
@ -84,8 +151,179 @@ impl From<db::Operator> for ListOperatorsResp {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::VmNodeWithReports> for VmNodeListResp {
|
||||
fn from(vm_node: db::VmNodeWithReports) -> Self {
|
||||
Self {
|
||||
operator: vm_node.operator.key().to_string(),
|
||||
node_pubkey: vm_node.id.key().to_string(),
|
||||
country: vm_node.country,
|
||||
region: vm_node.region,
|
||||
city: vm_node.city,
|
||||
ip: vm_node.ip,
|
||||
reports: vm_node.reports.iter().map(|n| n.reason.clone()).collect(),
|
||||
price: vm_node.price,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::AppNodeWithReports> for AppNodeListResp {
|
||||
fn from(app_node: db::AppNodeWithReports) -> Self {
|
||||
Self {
|
||||
operator: app_node.operator.key().to_string(),
|
||||
node_pubkey: app_node.id.key().to_string(),
|
||||
country: app_node.country,
|
||||
region: app_node.region,
|
||||
city: app_node.city,
|
||||
ip: app_node.ip,
|
||||
reports: app_node.reports.iter().map(|n| n.reason.clone()).collect(),
|
||||
price: app_node.price,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct BrainVmDaemonForReal {}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl BrainGeneralCli for BrainGeneralCliMock {
|
||||
impl BrainVmDaemon for BrainVmDaemonForReal {
|
||||
type RegisterVmNodeStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
|
||||
async fn register_vm_node(
|
||||
&self,
|
||||
req: Request<RegisterVmNodeReq>,
|
||||
) -> Result<Response<Self::RegisterVmNodeStream>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
info!("Starting registration process for {:?}", req);
|
||||
db::VmNode {
|
||||
id: surrealdb::RecordId::from((db::VM_NODE, req.node_pubkey.clone())),
|
||||
operator: surrealdb::RecordId::from((db::ACCOUNT, req.operator_wallet)),
|
||||
country: req.country,
|
||||
region: req.region,
|
||||
city: req.city,
|
||||
ip: req.main_ip,
|
||||
price: req.price,
|
||||
avail_mem_mb: 0,
|
||||
avail_vcpus: 0,
|
||||
avail_storage_gbs: 0,
|
||||
avail_ipv4: 0,
|
||||
avail_ipv6: 0,
|
||||
avail_ports: 0,
|
||||
max_ports_per_vm: 0,
|
||||
offline_minutes: 0,
|
||||
}
|
||||
.register()
|
||||
.await?;
|
||||
|
||||
info!("Sending existing contracts to {}", req.node_pubkey);
|
||||
let contracts = db::ActiveVmWithNode::list_by_node(&req.node_pubkey).await?;
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
tokio::spawn(async move {
|
||||
for contract in contracts {
|
||||
let _ = tx.send(Ok(contract.into())).await;
|
||||
}
|
||||
});
|
||||
let output_stream = ReceiverStream::new(rx);
|
||||
Ok(Response::new(Box::pin(output_stream) as Self::RegisterVmNodeStream))
|
||||
}
|
||||
|
||||
type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainVmMessage, Status>> + Send>>;
|
||||
async fn brain_messages(
|
||||
&self,
|
||||
req: Request<DaemonStreamAuth>,
|
||||
) -> Result<Response<Self::BrainMessagesStream>, Status> {
|
||||
let auth = req.into_inner();
|
||||
let pubkey = auth.pubkey.clone();
|
||||
check_sig_from_parts(
|
||||
&pubkey,
|
||||
&auth.timestamp,
|
||||
&format!("{:?}", auth.contracts),
|
||||
&auth.signature,
|
||||
)?;
|
||||
info!("Daemon {} connected to receive brain messages", pubkey);
|
||||
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
{
|
||||
let pubkey = pubkey.clone();
|
||||
let tx = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
match db::listen_for_node::<db::DeletedVm>(&pubkey, tx).await {
|
||||
Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"),
|
||||
Err(e) => {
|
||||
log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}")
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
{
|
||||
let pubkey = pubkey.clone();
|
||||
let tx = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = db::listen_for_node::<db::NewVmReq>(&pubkey, tx.clone()).await;
|
||||
});
|
||||
}
|
||||
{
|
||||
let pubkey = pubkey.clone();
|
||||
let tx = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = db::listen_for_node::<db::UpdateVmReq>(&pubkey, tx.clone()).await;
|
||||
});
|
||||
}
|
||||
|
||||
let output_stream = ReceiverStream::new(rx).map(|msg| Ok(msg.into()));
|
||||
Ok(Response::new(Box::pin(output_stream) as Self::BrainMessagesStream))
|
||||
}
|
||||
|
||||
async fn daemon_messages(
|
||||
&self,
|
||||
_req: Request<Streaming<VmDaemonMessage>>,
|
||||
) -> Result<Response<Empty>, Status> {
|
||||
todo!();
|
||||
// let mut req_stream = req.into_inner();
|
||||
// let pubkey: String;
|
||||
// if let Some(Ok(msg)) = req_stream.next().await {
|
||||
// log::debug!("demon_messages received the following auth message: {:?}", msg.msg);
|
||||
// if let Some(vm_daemon_message::Msg::Auth(auth)) = msg.msg {
|
||||
// pubkey = auth.pubkey.clone();
|
||||
// check_sig_from_parts(
|
||||
// &pubkey,
|
||||
// &auth.timestamp,
|
||||
// &format!("{:?}", auth.contracts),
|
||||
// &auth.signature,
|
||||
// )?;
|
||||
// } else {
|
||||
// return Err(Status::unauthenticated(
|
||||
// "Could not authenticate the daemon: could not extract auth signature",
|
||||
// ));
|
||||
// }
|
||||
// } else {
|
||||
// return Err(Status::unauthenticated("Could not authenticate the daemon"));
|
||||
// }
|
||||
|
||||
// // info!("Received a message from daemon {pubkey}: {daemon_message:?}");
|
||||
// while let Some(daemon_message) = req_stream.next().await {
|
||||
// match daemon_message {
|
||||
// Ok(msg) => match msg.msg {
|
||||
// Some(vm_daemon_message::Msg::NewVmResp(new_vm_resp)) => {
|
||||
// self.data.submit_newvm_resp(new_vm_resp).await;
|
||||
// }
|
||||
// Some(vm_daemon_message::Msg::UpdateVmResp(update_vm_resp)) => {
|
||||
// self.data.submit_updatevm_resp(update_vm_resp).await;
|
||||
// }
|
||||
// Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => {
|
||||
// self.data.submit_node_resources(node_resources);
|
||||
// }
|
||||
// _ => {}
|
||||
// },
|
||||
// Err(e) => {
|
||||
// log::warn!("Daemon disconnected: {e:?}");
|
||||
// self.data.del_daemon_tx(&pubkey);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Ok(Response::new(Empty {}))
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl BrainGeneralCli for BrainGeneralCliForReal {
|
||||
type ListAccountsStream = Pin<Box<dyn Stream<Item = Result<Account, Status>> + Send>>;
|
||||
type ListAllAppContractsStream =
|
||||
Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>;
|
||||
@ -100,7 +338,7 @@ impl BrainGeneralCli for BrainGeneralCliMock {
|
||||
|
||||
async fn report_node(&self, req: Request<ReportNodeReq>) -> Result<Response<Empty>, Status> {
|
||||
let req = check_sig_from_req(req)?;
|
||||
let (account, node) = match db::VmContractWithNode::get_by_uuid(&req.contract).await? {
|
||||
let (account, node) = match db::ActiveVmWithNode::get_by_uuid(&req.contract).await? {
|
||||
Some(vm_contract)
|
||||
if vm_contract.admin.key().to_string() == req.admin_pubkey
|
||||
&& vm_contract.vm_node.id.key().to_string() == req.node_pubkey =>
|
||||
@ -134,13 +372,16 @@ impl BrainGeneralCli for BrainGeneralCliMock {
|
||||
|
||||
async fn inspect_operator(
|
||||
&self,
|
||||
_req: Request<Pubkey>,
|
||||
req: Request<Pubkey>,
|
||||
) -> Result<Response<InspectOperatorResp>, Status> {
|
||||
todo!();
|
||||
// match self.data.inspect_operator(&req.into_inner().pubkey) {
|
||||
// Some(op) => Ok(Response::new(op.into())),
|
||||
// None => Err(Status::not_found("The wallet you specified is not an operator")),
|
||||
// }
|
||||
match db::Operator::inspect_nodes(&req.into_inner().pubkey).await? {
|
||||
(Some(op), vm_nodes, app_nodes) => Ok(Response::new(InspectOperatorResp {
|
||||
operator: Some(op.into()),
|
||||
vm_nodes: vm_nodes.into_iter().map(|n| n.into()).collect(),
|
||||
app_nodes: app_nodes.into_iter().map(|n| n.into()).collect(),
|
||||
})),
|
||||
(None, _, _) => Err(Status::not_found("The wallet you specified is not an operator")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn register_operator(
|
||||
@ -244,10 +485,10 @@ impl BrainGeneralCli for BrainGeneralCliMock {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BrainVmCliMock {}
|
||||
pub struct BrainVmCliForReal {}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl BrainVmCli for BrainVmCliMock {
|
||||
impl BrainVmCli for BrainVmCliForReal {
|
||||
type ListVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>;
|
||||
type ListVmNodesStream = Pin<Box<dyn Stream<Item = Result<VmNodeListResp, Status>> + Send>>;
|
||||
|
||||
@ -329,7 +570,7 @@ impl BrainVmCli for BrainVmCliMock {
|
||||
);
|
||||
let mut contracts = Vec::new();
|
||||
if !req.uuid.is_empty() {
|
||||
if let Some(specific_contract) = db::VmContractWithNode::get_by_uuid(&req.uuid).await? {
|
||||
if let Some(specific_contract) = db::ActiveVmWithNode::get_by_uuid(&req.uuid).await? {
|
||||
if specific_contract.admin.key().to_string() == req.wallet {
|
||||
contracts.push(specific_contract.into());
|
||||
}
|
||||
@ -337,12 +578,11 @@ impl BrainVmCli for BrainVmCliMock {
|
||||
}
|
||||
} else {
|
||||
if req.as_operator {
|
||||
contracts.append(
|
||||
&mut db::VmContractWithNode::list_by_operator(&req.wallet).await?.into(),
|
||||
);
|
||||
contracts
|
||||
.append(&mut db::ActiveVmWithNode::list_by_operator(&req.wallet).await?.into());
|
||||
} else {
|
||||
contracts
|
||||
.append(&mut db::VmContractWithNode::list_by_admin(&req.wallet).await?.into());
|
||||
.append(&mut db::ActiveVmWithNode::list_by_admin(&req.wallet).await?.into());
|
||||
}
|
||||
}
|
||||
let (tx, rx) = mpsc::channel(6);
|
||||
@ -499,6 +739,46 @@ fn check_sig_from_req<T: std::fmt::Debug + PubkeyGetter>(req: Request<T>) -> Res
|
||||
Ok(req)
|
||||
}
|
||||
|
||||
fn check_sig_from_parts(pubkey: &str, time: &str, msg: &str, sig: &str) -> Result<(), Status> {
|
||||
let now = chrono::Utc::now();
|
||||
let parsed_time = chrono::DateTime::parse_from_rfc3339(time)
|
||||
.map_err(|_| Status::unauthenticated("Coult not parse timestamp"))?;
|
||||
let seconds_elapsed = now.signed_duration_since(parsed_time).num_seconds();
|
||||
if seconds_elapsed > 4 || seconds_elapsed < -4 {
|
||||
return Err(Status::unauthenticated(format!(
|
||||
"Date is not within 4 sec of the time of the server: CLI {} vs Server {}",
|
||||
parsed_time, now
|
||||
)));
|
||||
}
|
||||
|
||||
let signature = bs58::decode(sig)
|
||||
.into_vec()
|
||||
.map_err(|_| Status::unauthenticated("signature is not a bs58 string"))?;
|
||||
let signature = ed25519_dalek::Signature::from_bytes(
|
||||
signature
|
||||
.as_slice()
|
||||
.try_into()
|
||||
.map_err(|_| Status::unauthenticated("could not parse ed25519 signature"))?,
|
||||
);
|
||||
|
||||
let pubkey = ed25519_dalek::VerifyingKey::from_bytes(
|
||||
&bs58::decode(&pubkey)
|
||||
.into_vec()
|
||||
.map_err(|_| Status::unauthenticated("pubkey is not a bs58 string"))?
|
||||
.try_into()
|
||||
.map_err(|_| Status::unauthenticated("pubkey does not have the correct size."))?,
|
||||
)
|
||||
.map_err(|_| Status::unauthenticated("could not parse ed25519 pubkey"))?;
|
||||
|
||||
let msg = time.to_string() + msg;
|
||||
use ed25519_dalek::Verifier;
|
||||
pubkey
|
||||
.verify(msg.as_bytes(), &signature)
|
||||
.map_err(|_| Status::unauthenticated("the signature is not valid"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
const ADMIN_ACCOUNTS: &[&str] = &[
|
||||
"x52w7jARC5erhWWK65VZmjdGXzBK6ZDgfv1A283d8XK",
|
||||
"FHuecMbeC1PfjkW2JKyoicJAuiU7khgQT16QUB3Q1XdL",
|
||||
|
Loading…
Reference in New Issue
Block a user