diff --git a/src/bin/brain.rs b/src/bin/brain.rs index a39b13f..6cf2e26 100644 --- a/src/bin/brain.rs +++ b/src/bin/brain.rs @@ -6,8 +6,8 @@ use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; use dotenv::dotenv; use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH}; use surreal_brain::db; -use surreal_brain::grpc; - +use surreal_brain::grpc::general::GeneralCliServer; +use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; use tonic::transport::{Identity, Server, ServerTlsConfig}; #[tokio::main] @@ -26,11 +26,9 @@ async fn main() { let addr = BRAIN_GRPC_ADDR.parse().unwrap(); - let snp_daemon_server = - BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone())); - let snp_cli_server = BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone())); - let general_service_server = - BrainGeneralCliServer::new(grpc::general::BrainCliServer::new(db_arc.clone())); + let snp_daemon_server = BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())); + let snp_cli_server = BrainVmCliServer::new(VmCliServer::new(db_arc.clone())); + let general_service_server = BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone())); let cert = std::fs::read_to_string(CERT_PATH).unwrap(); let key = std::fs::read_to_string(CERT_KEY_PATH).unwrap(); diff --git a/src/db/app.rs b/src/db/app.rs index 324a079..c49bb0b 100644 --- a/src/db/app.rs +++ b/src/db/app.rs @@ -9,7 +9,7 @@ use surrealdb::sql::Datetime; use surrealdb::{RecordId, Surreal}; #[derive(Debug, Serialize, Deserialize)] -pub struct Node { +pub struct AppNode { pub id: RecordId, pub operator: RecordId, pub country: String, @@ -26,7 +26,7 @@ pub struct Node { } #[derive(Debug, Serialize, Deserialize)] -pub struct NodeWithReports { +pub struct AppNodeWithReports { pub id: RecordId, pub operator: RecordId, pub country: String, @@ -71,7 +71,7 @@ pub struct ActiveAppWithNode { #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] - pub app_node: Node, + pub app_node: AppNode, pub app_name: String, pub mapped_ports: Vec<(u64, u64)>, pub host_ipv4: String, @@ -95,11 +95,11 @@ impl ActiveAppWithNode { } } -impl From<&old_brain::BrainData> for Vec { +impl From<&old_brain::BrainData> for Vec { fn from(old_data: &old_brain::BrainData) -> Self { let mut nodes = Vec::new(); for old_node in old_data.app_nodes.iter() { - nodes.push(Node { + nodes.push(AppNode { id: RecordId::from(("app_node", old_node.node_pubkey.clone())), operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), country: old_node.country.clone(), diff --git a/src/db/general.rs b/src/db/general.rs index a7360ce..940de21 100644 --- a/src/db/general.rs +++ b/src/db/general.rs @@ -1,6 +1,5 @@ use crate::constants::ACCOUNT; -use crate::db::app; -use crate::db::vm; +use crate::db::prelude::*; use super::Error; use crate::old_brain; @@ -193,7 +192,7 @@ impl Operator { pub async fn inspect_nodes( db: &Surreal, account: &str, - ) -> Result<(Option, Vec, Vec), Error> { + ) -> Result<(Option, Vec, Vec), Error> { let operator = Self::inspect(db, account).await?; let mut result = db .query(format!( @@ -205,8 +204,8 @@ impl Operator { where operator = account:{account};" )) .await?; - let vm_nodes: Vec = result.take(0)?; - let app_nodes: Vec = result.take(1)?; + let vm_nodes: Vec = result.take(0)?; + let app_nodes: Vec = result.take(1)?; Ok((operator, vm_nodes, app_nodes)) } diff --git a/src/db/mod.rs b/src/db/mod.rs index 706f387..670f6d7 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -4,6 +4,7 @@ pub mod vm; use crate::constants::{DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ}; use crate::old_brain; +use prelude::*; use serde::{Deserialize, Serialize}; use surrealdb::engine::remote::ws::{Client, Ws}; use surrealdb::opt::auth::Root; @@ -16,13 +17,20 @@ pub enum Error { #[error("Internal DB error: {0}")] DataBase(#[from] surrealdb::Error), #[error("Daemon channel got closed: {0}")] - VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), + VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError), #[error(transparent)] StdIo(#[from] std::io::Error), #[error(transparent)] TimeOut(#[from] tokio::time::error::Elapsed), } +pub mod prelude { + pub use super::app::*; + pub use super::general::*; + pub use super::vm::*; + pub use super::*; +} + pub async fn db_connection( db_address: &str, username: &str, @@ -42,22 +50,22 @@ pub async fn migration0( db: &Surreal, old_data: &old_brain::BrainData, ) -> Result<(), Error> { - let accounts: Vec = old_data.into(); - let vm_nodes: Vec = old_data.into(); - let app_nodes: Vec = old_data.into(); - let vm_contracts: Vec = old_data.into(); + let accounts: Vec = old_data.into(); + let vm_nodes: Vec = old_data.into(); + let app_nodes: Vec = old_data.into(); + let vm_contracts: Vec = old_data.into(); let schema = std::fs::read_to_string(crate::constants::DB_SCHEMA_FILE)?; db.query(schema).await?; println!("Inserting accounts..."); - let _: Vec = db.insert(()).content(accounts).await?; + let _: Vec = db.insert(()).content(accounts).await?; println!("Inserting vm nodes..."); - let _: Vec = db.insert(()).content(vm_nodes).await?; + let _: Vec = db.insert(()).content(vm_nodes).await?; println!("Inserting app nodes..."); - let _: Vec = db.insert(()).content(app_nodes).await?; + let _: Vec = db.insert(()).content(app_nodes).await?; println!("Inserting vm contracts..."); - let _: Vec = db.insert("vm_contract").relation(vm_contracts).await?; + let _: Vec = db.insert("vm_contract").relation(vm_contracts).await?; Ok(()) } @@ -75,11 +83,11 @@ pub async fn upsert_record( } pub async fn listen_for_vm_node< - T: Into + std::marker::Unpin + for<'de> Deserialize<'de>, + T: Into + std::marker::Unpin + for<'de> Deserialize<'de>, >( db: &Surreal, node: &str, - tx: Sender, + tx: Sender, ) -> Result<(), Error> { let table_name = match std::any::type_name::() { "surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(), diff --git a/src/db/vm.rs b/src/db/vm.rs index 13fee49..5524d38 100644 --- a/src/db/vm.rs +++ b/src/db/vm.rs @@ -12,7 +12,7 @@ use surrealdb::{Notification, RecordId, Surreal}; use tokio_stream::StreamExt as _; #[derive(Debug, Serialize, Deserialize)] -pub struct Node { +pub struct VmNode { pub id: RecordId, pub operator: RecordId, pub country: String, @@ -31,7 +31,7 @@ pub struct Node { } #[derive(Serialize)] -pub struct NodeResources { +pub struct VmNodeResources { pub avail_mem_mb: u32, pub avail_vcpus: u32, pub avail_storage_gbs: u32, @@ -41,22 +41,22 @@ pub struct NodeResources { pub max_ports_per_vm: u32, } -impl NodeResources { +impl VmNodeResources { pub async fn merge(self, db: &Surreal, node_id: &str) -> Result<(), Error> { - let _: Option = db.update((VM_NODE, node_id)).merge(self).await?; + let _: Option = db.update((VM_NODE, node_id)).merge(self).await?; Ok(()) } } -impl Node { +impl VmNode { pub async fn register(self, db: &Surreal) -> Result<(), Error> { - let _: Option = db.upsert(self.id.clone()).content(self).await?; + let _: Option = db.upsert(self.id.clone()).content(self).await?; Ok(()) } } #[derive(Debug, Serialize, Deserialize)] -pub struct NodeWithReports { +pub struct VmNodeWithReports { pub id: RecordId, pub operator: RecordId, pub country: String, @@ -75,7 +75,7 @@ pub struct NodeWithReports { pub reports: Vec, } -impl NodeWithReports { +impl VmNodeWithReports { // TODO: find a more elegant way to do this than importing gRPC in the DB module // https://en.wikipedia.org/wiki/Dependency_inversion_principle pub async fn find_by_filters( @@ -118,25 +118,25 @@ impl NodeWithReports { } } -pub enum DaemonNotification { +pub enum VmDaemonNotification { Create(NewVmReq), Update(UpdateVmReq), Delete(DeletedVm), } -impl From for DaemonNotification { +impl From for VmDaemonNotification { fn from(value: NewVmReq) -> Self { Self::Create(value) } } -impl From for DaemonNotification { +impl From for VmDaemonNotification { fn from(value: UpdateVmReq) -> Self { Self::Update(value) } } -impl From for DaemonNotification { +impl From for VmDaemonNotification { fn from(value: DeletedVm) -> Self { Self::Delete(value) } @@ -455,7 +455,7 @@ pub struct ActiveVmWithNode { #[serde(rename = "in")] pub admin: RecordId, #[serde(rename = "out")] - pub vm_node: Node, + pub vm_node: VmNode, pub hostname: String, pub mapped_ports: Vec<(u32, u32)>, pub public_ipv4: String, @@ -537,11 +537,11 @@ impl ActiveVmWithNode { // TODO: delete all of these From implementation after migration 0 gets executed -impl From<&old_brain::BrainData> for Vec { +impl From<&old_brain::BrainData> for Vec { fn from(old_data: &old_brain::BrainData) -> Self { let mut nodes = Vec::new(); for old_node in old_data.vm_nodes.iter() { - nodes.push(Node { + nodes.push(VmNode { id: RecordId::from((VM_NODE, old_node.public_key.clone())), operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), country: old_node.country.clone(), diff --git a/src/grpc/general.rs b/src/grpc/general.rs index 6f15431..9dc6244 100644 --- a/src/grpc/general.rs +++ b/src/grpc/general.rs @@ -18,18 +18,18 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_stream::Stream; use tonic::{Request, Response, Status}; -pub struct BrainCliServer { +pub struct GeneralCliServer { pub db: Arc>, } -impl BrainCliServer { +impl GeneralCliServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] -impl BrainGeneralCli for BrainCliServer { +impl BrainGeneralCli for GeneralCliServer { type ListAccountsStream = Pin> + Send>>; type ListAllAppContractsStream = Pin> + Send>>; diff --git a/src/grpc/types.rs b/src/grpc/types.rs index 932418b..0b22164 100644 --- a/src/grpc/types.rs +++ b/src/grpc/types.rs @@ -1,5 +1,5 @@ use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE}; -use crate::db; +use crate::db::prelude as db; use detee_shared::app_proto::AppNodeListResp; use detee_shared::general_proto::{AccountBalance, ListOperatorsResp}; use detee_shared::vm_proto::*; @@ -7,13 +7,13 @@ use nanoid::nanoid; use surrealdb::RecordId; -impl From for AccountBalance { - fn from(account: db::general::Account) -> Self { +impl From for AccountBalance { + fn from(account: db::Account) -> Self { AccountBalance { balance: account.balance, tmp_locked: account.tmp_locked } } } -impl From for db::vm::NewVmReq { +impl From for db::NewVmReq { fn from(new_vm_req: NewVmReq) -> Self { Self { id: RecordId::from((NEW_VM_REQ, nanoid!(40, &ID_ALPHABET))), @@ -38,8 +38,8 @@ impl From for db::vm::NewVmReq { } } -impl From for NewVmReq { - fn from(new_vm_req: db::vm::NewVmReq) -> Self { +impl From for NewVmReq { + fn from(new_vm_req: db::NewVmReq) -> Self { Self { uuid: new_vm_req.id.key().to_string(), hostname: new_vm_req.hostname, @@ -61,21 +61,21 @@ impl From for NewVmReq { } } -impl From for NewVmResp { - fn from(resp: db::vm::NewVmResp) -> Self { +impl From for NewVmResp { + fn from(resp: db::NewVmResp) -> Self { match resp { // TODO: This will require a small architecture change to pass MeasurementArgs from // Daemon to CLI - db::vm::NewVmResp::Args(uuid, args) => { + db::NewVmResp::Args(uuid, args) => { NewVmResp { uuid, error: String::new(), args: Some(args) } } - db::vm::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None }, + db::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None }, } } } -impl From for UpdateVmReq { - fn from(update_vm_req: db::vm::UpdateVmReq) -> Self { +impl From for UpdateVmReq { + fn from(update_vm_req: db::UpdateVmReq) -> Self { Self { uuid: update_vm_req.id.key().to_string(), // daemon does not care about VM hostname @@ -92,8 +92,8 @@ impl From for UpdateVmReq { } } -impl From for DeleteVmReq { - fn from(delete_vm_req: db::vm::DeletedVm) -> Self { +impl From for DeleteVmReq { + fn from(delete_vm_req: db::DeletedVm) -> Self { Self { uuid: delete_vm_req.id.key().to_string(), admin_pubkey: delete_vm_req.admin.key().to_string(), @@ -101,24 +101,24 @@ impl From for DeleteVmReq { } } -impl From for BrainVmMessage { - fn from(notification: db::vm::DaemonNotification) -> Self { +impl From for BrainVmMessage { + fn from(notification: db::VmDaemonNotification) -> Self { match notification { - db::vm::DaemonNotification::Create(new_vm_req) => { + db::VmDaemonNotification::Create(new_vm_req) => { BrainVmMessage { msg: Some(brain_vm_message::Msg::NewVmReq(new_vm_req.into())) } } - db::vm::DaemonNotification::Update(update_vm_req) => BrainVmMessage { + db::VmDaemonNotification::Update(update_vm_req) => BrainVmMessage { msg: Some(brain_vm_message::Msg::UpdateVmReq(update_vm_req.into())), }, - db::vm::DaemonNotification::Delete(deleted_vm) => { + db::VmDaemonNotification::Delete(deleted_vm) => { BrainVmMessage { msg: Some(brain_vm_message::Msg::DeleteVm(deleted_vm.into())) } } } } } -impl From for VmContract { - fn from(db_c: db::vm::ActiveVmWithNode) -> Self { +impl From for VmContract { + fn from(db_c: db::ActiveVmWithNode) -> Self { let mut exposed_ports = Vec::new(); for port in db_c.mapped_ports.iter() { exposed_ports.push(port.0); @@ -163,8 +163,8 @@ impl From for tonic::Status { } } -impl From for ListOperatorsResp { - fn from(db_o: db::general::Operator) -> Self { +impl From for ListOperatorsResp { + fn from(db_o: db::Operator) -> Self { ListOperatorsResp { pubkey: db_o.account.key().to_string(), escrow: db_o.escrow, @@ -176,8 +176,8 @@ impl From for ListOperatorsResp { } } -impl From for VmNodeListResp { - fn from(vm_node: db::vm::NodeWithReports) -> Self { +impl From for VmNodeListResp { + fn from(vm_node: db::VmNodeWithReports) -> Self { Self { operator: vm_node.operator.key().to_string(), node_pubkey: vm_node.id.key().to_string(), @@ -191,8 +191,8 @@ impl From for VmNodeListResp { } } -impl From for AppNodeListResp { - fn from(app_node: db::app::NodeWithReports) -> Self { +impl From for AppNodeListResp { + fn from(app_node: db::AppNodeWithReports) -> Self { Self { operator: app_node.operator.key().to_string(), node_pubkey: app_node.id.key().to_string(), @@ -206,7 +206,7 @@ impl From for AppNodeListResp { } } -impl From for db::vm::NodeResources { +impl From for db::VmNodeResources { fn from(res: VmNodeResources) -> Self { Self { avail_mem_mb: res.avail_memory_mb, diff --git a/src/grpc/vm.rs b/src/grpc/vm.rs index d0a83b3..961eed1 100644 --- a/src/grpc/vm.rs +++ b/src/grpc/vm.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] use crate::constants::{ACCOUNT, VM_NODE}; -use crate::db; +use crate::db::prelude as db; use crate::grpc::{check_sig_from_parts, check_sig_from_req}; use detee_shared::common_proto::Empty; use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli; @@ -17,18 +17,18 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; -pub struct BrainDaemonServer { +pub struct VmDaemonServer { pub db: Arc>, } -impl BrainDaemonServer { +impl VmDaemonServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] -impl BrainVmDaemon for BrainDaemonServer { +impl BrainVmDaemon for VmDaemonServer { type BrainMessagesStream = Pin> + Send>>; type RegisterVmNodeStream = Pin> + Send>>; @@ -38,7 +38,7 @@ impl BrainVmDaemon for BrainDaemonServer { ) -> Result, Status> { let req = check_sig_from_req(req)?; info!("Starting registration process for {:?}", req); - db::vm::Node { + db::VmNode { id: surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())), operator: surrealdb::RecordId::from((ACCOUNT, req.operator_wallet)), country: req.country, @@ -59,7 +59,7 @@ impl BrainVmDaemon for BrainDaemonServer { .await?; info!("Sending existing contracts to {}", req.node_pubkey); - let contracts = db::vm::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?; + let contracts = db::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for contract in contracts { @@ -90,7 +90,7 @@ impl BrainVmDaemon for BrainDaemonServer { let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { - match db::listen_for_vm_node::(&db, &pubkey, tx).await { + match db::listen_for_vm_node::(&db, &pubkey, tx).await { Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"), Err(e) => { log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}") @@ -103,7 +103,7 @@ impl BrainVmDaemon for BrainDaemonServer { let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { - let _ = db::listen_for_vm_node::(&db, &pubkey, tx.clone()).await; + let _ = db::listen_for_vm_node::(&db, &pubkey, tx.clone()).await; }); } { @@ -111,7 +111,7 @@ impl BrainVmDaemon for BrainDaemonServer { let pubkey = pubkey.clone(); let tx = tx.clone(); tokio::spawn(async move { - let _ = db::listen_for_vm_node::(&db, &pubkey, tx.clone()).await; + let _ = db::listen_for_vm_node::(&db, &pubkey, tx.clone()).await; }); } @@ -151,7 +151,7 @@ impl BrainVmDaemon for BrainDaemonServer { // TODO: move new_vm_req to active_vm // also handle failure properly if !new_vm_resp.error.is_empty() { - db::vm::NewVmReq::submit_error( + db::NewVmReq::submit_error( &self.db, &new_vm_resp.uuid, new_vm_resp.error, @@ -166,7 +166,7 @@ impl BrainVmDaemon for BrainDaemonServer { ) .await?; if let Some(args) = new_vm_resp.args { - db::vm::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?; + db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?; } } } @@ -175,7 +175,7 @@ impl BrainVmDaemon for BrainDaemonServer { // self.data.submit_updatevm_resp(update_vm_resp).await; } Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { - let node_resources: db::vm::NodeResources = node_resources.into(); + let node_resources: db::VmNodeResources = node_resources.into(); node_resources.merge(&self.db, &pubkey).await?; } _ => {} @@ -189,18 +189,18 @@ impl BrainVmDaemon for BrainDaemonServer { } } -pub struct BrainCliServer { +pub struct VmCliServer { pub db: Arc>, } -impl BrainCliServer { +impl VmCliServer { pub fn new(db: Arc>) -> Self { Self { db } } } #[tonic::async_trait] -impl BrainVmCli for BrainCliServer { +impl BrainVmCli for VmCliServer { type ListVmContractsStream = Pin> + Send>>; type ListVmNodesStream = Pin> + Send>>; @@ -211,14 +211,14 @@ impl BrainVmCli for BrainCliServer { return Err(Status::permission_denied("This operator banned you. What did you do?")); } - let new_vm_req: db::vm::NewVmReq = req.into(); + let new_vm_req: db::NewVmReq = req.into(); let id = new_vm_req.id.key().to_string(); let db = self.db.clone(); let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); tokio::spawn(async move { - let _ = oneshot_tx.send(db::vm::NewVmResp::listen(&db, &id).await); + let _ = oneshot_tx.send(db::NewVmResp::listen(&db, &id).await); }); new_vm_req.submit(&self.db).await?; @@ -284,7 +284,7 @@ impl BrainVmCli for BrainCliServer { let mut contracts = Vec::new(); if !req.uuid.is_empty() { if let Some(specific_contract) = - db::vm::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? + db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? { if specific_contract.admin.key().to_string() == req.wallet { contracts.push(specific_contract); @@ -293,10 +293,10 @@ impl BrainVmCli for BrainCliServer { } } else if req.as_operator { contracts - .append(&mut db::vm::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?); + .append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?); } else { contracts - .append(&mut db::vm::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?); + .append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?); } let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { @@ -314,7 +314,7 @@ impl BrainVmCli for BrainCliServer { ) -> Result, tonic::Status> { let req = check_sig_from_req(req)?; info!("CLI requested ListVmNodesStream: {req:?}"); - let nodes = db::vm::NodeWithReports::find_by_filters(&self.db, req).await?; + let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; let (tx, rx) = mpsc::channel(6); tokio::spawn(async move { for node in nodes { @@ -332,7 +332,7 @@ impl BrainVmCli for BrainCliServer { let req = check_sig_from_req(req)?; info!("Unknown CLI requested ListVmNodesStream: {req:?}"); // TODO: optimize this query so that it gets only one node - let nodes = db::vm::NodeWithReports::find_by_filters(&self.db, req).await?; + let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; if let Some(node) = nodes.into_iter().next() { return Ok(Response::new(node.into())); } diff --git a/tests/common/prepare_test_env.rs b/tests/common/prepare_test_env.rs index b8ffb71..d3e8b06 100644 --- a/tests/common/prepare_test_env.rs +++ b/tests/common/prepare_test_env.rs @@ -5,7 +5,10 @@ use dotenv::dotenv; use hyper_util::rt::TokioIo; use std::net::SocketAddr; use std::sync::Arc; -use surreal_brain::grpc; +use surreal_brain::grpc::{ + general::GeneralCliServer, + vm::{VmCliServer, VmDaemonServer}, +}; use surrealdb::engine::remote::ws::Client; use surrealdb::Surreal; use tokio::io::DuplexStream; @@ -57,11 +60,9 @@ pub async fn run_service_in_background() -> SocketAddr { let db_arc = Arc::new(db); Server::builder() - .add_service(BrainGeneralCliServer::new(grpc::general::BrainCliServer::new( - db_arc.clone(), - ))) - .add_service(BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone()))) - .add_service(BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone()))) + .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) + .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) + .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) .await .unwrap(); @@ -89,9 +90,9 @@ pub async fn run_service_for_stream_server() -> DuplexStream { let db_arc = Arc::new(db); tonic::transport::Server::builder() - .add_service(BrainGeneralCliServer::new(grpc::general::BrainCliServer::new(db_arc.clone()))) - .add_service(BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone()))) - .add_service(BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone()))) + .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) + .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) + .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await }); diff --git a/tests/common/vm_cli_utils.rs b/tests/common/vm_cli_utils.rs index e48a66c..55b0853 100644 --- a/tests/common/vm_cli_utils.rs +++ b/tests/common/vm_cli_utils.rs @@ -2,7 +2,7 @@ use super::test_utils::Key; use detee_shared::vm_proto; use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; -use surreal_brain::db; +use surreal_brain::db::prelude as db; use surrealdb::engine::remote::ws::Client; use surrealdb::Surreal; use tonic::transport::Channel; @@ -32,14 +32,14 @@ pub async fn create_new_vm( // wait for update db tokio::time::sleep(tokio::time::Duration::from_millis(700)).await; - let vm_req_db: Option = + let vm_req_db: Option = db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap(); if let Some(new_vm_req) = vm_req_db { panic!("New VM request found in DB: {:?}", new_vm_req); } - let active_vm_op: Option = + let active_vm_op: Option = db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap(); let active_vm = active_vm_op.unwrap(); diff --git a/tests/grpc_test.rs b/tests/grpc_test.rs index aa4b512..33d6ce5 100644 --- a/tests/grpc_test.rs +++ b/tests/grpc_test.rs @@ -11,7 +11,7 @@ use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; use detee_shared::vm_proto::ListVmContractsReq; use futures::StreamExt; use surreal_brain::constants::VM_NODE; -use surreal_brain::db; +use surreal_brain::db::vm::VmNodeWithReports; mod common; @@ -88,7 +88,7 @@ async fn test_report_node() { .unwrap() .into_inner(); - let vm_nodes: Vec = db + let vm_nodes: Vec = db .query(format!( "SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};" ))