longer struct names
This commit is contained in:
		
							parent
							
								
									d233ace7a2
								
							
						
					
					
						commit
						3bc81b65d6
					
				| @ -6,8 +6,8 @@ use detee_shared::vm_proto::brain_vm_daemon_server::BrainVmDaemonServer; | ||||
| use dotenv::dotenv; | ||||
| use surreal_brain::constants::{BRAIN_GRPC_ADDR, CERT_KEY_PATH, CERT_PATH}; | ||||
| use surreal_brain::db; | ||||
| use surreal_brain::grpc; | ||||
| 
 | ||||
| use surreal_brain::grpc::general::GeneralCliServer; | ||||
| use surreal_brain::grpc::vm::{VmCliServer, VmDaemonServer}; | ||||
| use tonic::transport::{Identity, Server, ServerTlsConfig}; | ||||
| 
 | ||||
| #[tokio::main] | ||||
| @ -26,11 +26,9 @@ async fn main() { | ||||
| 
 | ||||
|     let addr = BRAIN_GRPC_ADDR.parse().unwrap(); | ||||
| 
 | ||||
|     let snp_daemon_server = | ||||
|         BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone())); | ||||
|     let snp_cli_server = BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone())); | ||||
|     let general_service_server = | ||||
|         BrainGeneralCliServer::new(grpc::general::BrainCliServer::new(db_arc.clone())); | ||||
|     let snp_daemon_server = BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone())); | ||||
|     let snp_cli_server = BrainVmCliServer::new(VmCliServer::new(db_arc.clone())); | ||||
|     let general_service_server = BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone())); | ||||
| 
 | ||||
|     let cert = std::fs::read_to_string(CERT_PATH).unwrap(); | ||||
|     let key = std::fs::read_to_string(CERT_KEY_PATH).unwrap(); | ||||
|  | ||||
| @ -9,7 +9,7 @@ use surrealdb::sql::Datetime; | ||||
| use surrealdb::{RecordId, Surreal}; | ||||
| 
 | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| pub struct Node { | ||||
| pub struct AppNode { | ||||
|     pub id: RecordId, | ||||
|     pub operator: RecordId, | ||||
|     pub country: String, | ||||
| @ -26,7 +26,7 @@ pub struct Node { | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| pub struct NodeWithReports { | ||||
| pub struct AppNodeWithReports { | ||||
|     pub id: RecordId, | ||||
|     pub operator: RecordId, | ||||
|     pub country: String, | ||||
| @ -71,7 +71,7 @@ pub struct ActiveAppWithNode { | ||||
|     #[serde(rename = "in")] | ||||
|     pub admin: RecordId, | ||||
|     #[serde(rename = "out")] | ||||
|     pub app_node: Node, | ||||
|     pub app_node: AppNode, | ||||
|     pub app_name: String, | ||||
|     pub mapped_ports: Vec<(u64, u64)>, | ||||
|     pub host_ipv4: String, | ||||
| @ -95,11 +95,11 @@ impl ActiveAppWithNode { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<&old_brain::BrainData> for Vec<Node> { | ||||
| impl From<&old_brain::BrainData> for Vec<AppNode> { | ||||
|     fn from(old_data: &old_brain::BrainData) -> Self { | ||||
|         let mut nodes = Vec::new(); | ||||
|         for old_node in old_data.app_nodes.iter() { | ||||
|             nodes.push(Node { | ||||
|             nodes.push(AppNode { | ||||
|                 id: RecordId::from(("app_node", old_node.node_pubkey.clone())), | ||||
|                 operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), | ||||
|                 country: old_node.country.clone(), | ||||
|  | ||||
| @ -1,6 +1,5 @@ | ||||
| use crate::constants::ACCOUNT; | ||||
| use crate::db::app; | ||||
| use crate::db::vm; | ||||
| use crate::db::prelude::*; | ||||
| 
 | ||||
| use super::Error; | ||||
| use crate::old_brain; | ||||
| @ -193,7 +192,7 @@ impl Operator { | ||||
|     pub async fn inspect_nodes( | ||||
|         db: &Surreal<Client>, | ||||
|         account: &str, | ||||
|     ) -> Result<(Option<Self>, Vec<vm::NodeWithReports>, Vec<app::NodeWithReports>), Error> { | ||||
|     ) -> Result<(Option<Self>, Vec<VmNodeWithReports>, Vec<AppNodeWithReports>), Error> { | ||||
|         let operator = Self::inspect(db, account).await?; | ||||
|         let mut result = db | ||||
|             .query(format!( | ||||
| @ -205,8 +204,8 @@ impl Operator { | ||||
|                  where operator = account:{account};" | ||||
|             )) | ||||
|             .await?; | ||||
|         let vm_nodes: Vec<vm::NodeWithReports> = result.take(0)?; | ||||
|         let app_nodes: Vec<app::NodeWithReports> = result.take(1)?; | ||||
|         let vm_nodes: Vec<VmNodeWithReports> = result.take(0)?; | ||||
|         let app_nodes: Vec<AppNodeWithReports> = result.take(1)?; | ||||
| 
 | ||||
|         Ok((operator, vm_nodes, app_nodes)) | ||||
|     } | ||||
|  | ||||
| @ -4,6 +4,7 @@ pub mod vm; | ||||
| 
 | ||||
| use crate::constants::{DELETED_VM, NEW_VM_REQ, UPDATE_VM_REQ}; | ||||
| use crate::old_brain; | ||||
| use prelude::*; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use surrealdb::engine::remote::ws::{Client, Ws}; | ||||
| use surrealdb::opt::auth::Root; | ||||
| @ -16,13 +17,20 @@ pub enum Error { | ||||
|     #[error("Internal DB error: {0}")] | ||||
|     DataBase(#[from] surrealdb::Error), | ||||
|     #[error("Daemon channel got closed: {0}")] | ||||
|     VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<vm::DaemonNotification>), | ||||
|     VmDaemonConnection(#[from] tokio::sync::mpsc::error::SendError<VmDaemonNotification>), | ||||
|     #[error(transparent)] | ||||
|     StdIo(#[from] std::io::Error), | ||||
|     #[error(transparent)] | ||||
|     TimeOut(#[from] tokio::time::error::Elapsed), | ||||
| } | ||||
| 
 | ||||
| pub mod prelude { | ||||
|     pub use super::app::*; | ||||
|     pub use super::general::*; | ||||
|     pub use super::vm::*; | ||||
|     pub use super::*; | ||||
| } | ||||
| 
 | ||||
| pub async fn db_connection( | ||||
|     db_address: &str, | ||||
|     username: &str, | ||||
| @ -42,22 +50,22 @@ pub async fn migration0( | ||||
|     db: &Surreal<Client>, | ||||
|     old_data: &old_brain::BrainData, | ||||
| ) -> Result<(), Error> { | ||||
|     let accounts: Vec<crate::db::general::Account> = old_data.into(); | ||||
|     let vm_nodes: Vec<crate::db::vm::Node> = old_data.into(); | ||||
|     let app_nodes: Vec<crate::db::app::Node> = old_data.into(); | ||||
|     let vm_contracts: Vec<crate::db::vm::ActiveVm> = old_data.into(); | ||||
|     let accounts: Vec<Account> = old_data.into(); | ||||
|     let vm_nodes: Vec<VmNode> = old_data.into(); | ||||
|     let app_nodes: Vec<AppNode> = old_data.into(); | ||||
|     let vm_contracts: Vec<ActiveVm> = old_data.into(); | ||||
| 
 | ||||
|     let schema = std::fs::read_to_string(crate::constants::DB_SCHEMA_FILE)?; | ||||
|     db.query(schema).await?; | ||||
| 
 | ||||
|     println!("Inserting accounts..."); | ||||
|     let _: Vec<crate::db::general::Account> = db.insert(()).content(accounts).await?; | ||||
|     let _: Vec<Account> = db.insert(()).content(accounts).await?; | ||||
|     println!("Inserting vm nodes..."); | ||||
|     let _: Vec<crate::db::vm::Node> = db.insert(()).content(vm_nodes).await?; | ||||
|     let _: Vec<VmNode> = db.insert(()).content(vm_nodes).await?; | ||||
|     println!("Inserting app nodes..."); | ||||
|     let _: Vec<crate::db::app::Node> = db.insert(()).content(app_nodes).await?; | ||||
|     let _: Vec<AppNode> = db.insert(()).content(app_nodes).await?; | ||||
|     println!("Inserting vm contracts..."); | ||||
|     let _: Vec<crate::db::vm::ActiveVm> = db.insert("vm_contract").relation(vm_contracts).await?; | ||||
|     let _: Vec<ActiveVm> = db.insert("vm_contract").relation(vm_contracts).await?; | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
| @ -75,11 +83,11 @@ pub async fn upsert_record<SomeRecord: Serialize + 'static>( | ||||
| } | ||||
| 
 | ||||
| pub async fn listen_for_vm_node< | ||||
|     T: Into<vm::DaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>, | ||||
|     T: Into<vm::VmDaemonNotification> + std::marker::Unpin + for<'de> Deserialize<'de>, | ||||
| >( | ||||
|     db: &Surreal<Client>, | ||||
|     node: &str, | ||||
|     tx: Sender<vm::DaemonNotification>, | ||||
|     tx: Sender<vm::VmDaemonNotification>, | ||||
| ) -> Result<(), Error> { | ||||
|     let table_name = match std::any::type_name::<T>() { | ||||
|         "surreal_brain::db::NewVmReq" => NEW_VM_REQ.to_string(), | ||||
|  | ||||
							
								
								
									
										30
									
								
								src/db/vm.rs
									
									
									
									
									
								
							
							
								
								
								
								
								
									
									
								
							
						
						
									
										30
									
								
								src/db/vm.rs
									
									
									
									
									
								
							| @ -12,7 +12,7 @@ use surrealdb::{Notification, RecordId, Surreal}; | ||||
| use tokio_stream::StreamExt as _; | ||||
| 
 | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| pub struct Node { | ||||
| pub struct VmNode { | ||||
|     pub id: RecordId, | ||||
|     pub operator: RecordId, | ||||
|     pub country: String, | ||||
| @ -31,7 +31,7 @@ pub struct Node { | ||||
| } | ||||
| 
 | ||||
| #[derive(Serialize)] | ||||
| pub struct NodeResources { | ||||
| pub struct VmNodeResources { | ||||
|     pub avail_mem_mb: u32, | ||||
|     pub avail_vcpus: u32, | ||||
|     pub avail_storage_gbs: u32, | ||||
| @ -41,22 +41,22 @@ pub struct NodeResources { | ||||
|     pub max_ports_per_vm: u32, | ||||
| } | ||||
| 
 | ||||
| impl NodeResources { | ||||
| impl VmNodeResources { | ||||
|     pub async fn merge(self, db: &Surreal<Client>, node_id: &str) -> Result<(), Error> { | ||||
|         let _: Option<Node> = db.update((VM_NODE, node_id)).merge(self).await?; | ||||
|         let _: Option<VmNode> = db.update((VM_NODE, node_id)).merge(self).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl Node { | ||||
| impl VmNode { | ||||
|     pub async fn register(self, db: &Surreal<Client>) -> Result<(), Error> { | ||||
|         let _: Option<Node> = db.upsert(self.id.clone()).content(self).await?; | ||||
|         let _: Option<VmNode> = db.upsert(self.id.clone()).content(self).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| pub struct NodeWithReports { | ||||
| pub struct VmNodeWithReports { | ||||
|     pub id: RecordId, | ||||
|     pub operator: RecordId, | ||||
|     pub country: String, | ||||
| @ -75,7 +75,7 @@ pub struct NodeWithReports { | ||||
|     pub reports: Vec<Report>, | ||||
| } | ||||
| 
 | ||||
| impl NodeWithReports { | ||||
| impl VmNodeWithReports { | ||||
|     // TODO: find a more elegant way to do this than importing gRPC in the DB module
 | ||||
|     // https://en.wikipedia.org/wiki/Dependency_inversion_principle
 | ||||
|     pub async fn find_by_filters( | ||||
| @ -118,25 +118,25 @@ impl NodeWithReports { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub enum DaemonNotification { | ||||
| pub enum VmDaemonNotification { | ||||
|     Create(NewVmReq), | ||||
|     Update(UpdateVmReq), | ||||
|     Delete(DeletedVm), | ||||
| } | ||||
| 
 | ||||
| impl From<NewVmReq> for DaemonNotification { | ||||
| impl From<NewVmReq> for VmDaemonNotification { | ||||
|     fn from(value: NewVmReq) -> Self { | ||||
|         Self::Create(value) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<UpdateVmReq> for DaemonNotification { | ||||
| impl From<UpdateVmReq> for VmDaemonNotification { | ||||
|     fn from(value: UpdateVmReq) -> Self { | ||||
|         Self::Update(value) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<DeletedVm> for DaemonNotification { | ||||
| impl From<DeletedVm> for VmDaemonNotification { | ||||
|     fn from(value: DeletedVm) -> Self { | ||||
|         Self::Delete(value) | ||||
|     } | ||||
| @ -455,7 +455,7 @@ pub struct ActiveVmWithNode { | ||||
|     #[serde(rename = "in")] | ||||
|     pub admin: RecordId, | ||||
|     #[serde(rename = "out")] | ||||
|     pub vm_node: Node, | ||||
|     pub vm_node: VmNode, | ||||
|     pub hostname: String, | ||||
|     pub mapped_ports: Vec<(u32, u32)>, | ||||
|     pub public_ipv4: String, | ||||
| @ -537,11 +537,11 @@ impl ActiveVmWithNode { | ||||
| 
 | ||||
| // TODO: delete all of these From implementation after migration 0 gets executed
 | ||||
| 
 | ||||
| impl From<&old_brain::BrainData> for Vec<Node> { | ||||
| impl From<&old_brain::BrainData> for Vec<VmNode> { | ||||
|     fn from(old_data: &old_brain::BrainData) -> Self { | ||||
|         let mut nodes = Vec::new(); | ||||
|         for old_node in old_data.vm_nodes.iter() { | ||||
|             nodes.push(Node { | ||||
|             nodes.push(VmNode { | ||||
|                 id: RecordId::from((VM_NODE, old_node.public_key.clone())), | ||||
|                 operator: RecordId::from((ACCOUNT, old_node.operator_wallet.clone())), | ||||
|                 country: old_node.country.clone(), | ||||
|  | ||||
| @ -18,18 +18,18 @@ use tokio_stream::wrappers::ReceiverStream; | ||||
| use tokio_stream::Stream; | ||||
| use tonic::{Request, Response, Status}; | ||||
| 
 | ||||
| pub struct BrainCliServer { | ||||
| pub struct GeneralCliServer { | ||||
|     pub db: Arc<Surreal<Client>>, | ||||
| } | ||||
| 
 | ||||
| impl BrainCliServer { | ||||
| impl GeneralCliServer { | ||||
|     pub fn new(db: Arc<Surreal<Client>>) -> Self { | ||||
|         Self { db } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[tonic::async_trait] | ||||
| impl BrainGeneralCli for BrainCliServer { | ||||
| impl BrainGeneralCli for GeneralCliServer { | ||||
|     type ListAccountsStream = Pin<Box<dyn Stream<Item = Result<Account, Status>> + Send>>; | ||||
|     type ListAllAppContractsStream = | ||||
|         Pin<Box<dyn Stream<Item = Result<AppContract, Status>> + Send>>; | ||||
|  | ||||
| @ -1,5 +1,5 @@ | ||||
| use crate::constants::{ACCOUNT, ID_ALPHABET, NEW_VM_REQ, VM_NODE}; | ||||
| use crate::db; | ||||
| use crate::db::prelude as db; | ||||
| use detee_shared::app_proto::AppNodeListResp; | ||||
| use detee_shared::general_proto::{AccountBalance, ListOperatorsResp}; | ||||
| use detee_shared::vm_proto::*; | ||||
| @ -7,13 +7,13 @@ use nanoid::nanoid; | ||||
| 
 | ||||
| use surrealdb::RecordId; | ||||
| 
 | ||||
| impl From<db::general::Account> for AccountBalance { | ||||
|     fn from(account: db::general::Account) -> Self { | ||||
| impl From<db::Account> for AccountBalance { | ||||
|     fn from(account: db::Account) -> Self { | ||||
|         AccountBalance { balance: account.balance, tmp_locked: account.tmp_locked } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<NewVmReq> for db::vm::NewVmReq { | ||||
| impl From<NewVmReq> for db::NewVmReq { | ||||
|     fn from(new_vm_req: NewVmReq) -> Self { | ||||
|         Self { | ||||
|             id: RecordId::from((NEW_VM_REQ, nanoid!(40, &ID_ALPHABET))), | ||||
| @ -38,8 +38,8 @@ impl From<NewVmReq> for db::vm::NewVmReq { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<db::vm::NewVmReq> for NewVmReq { | ||||
|     fn from(new_vm_req: db::vm::NewVmReq) -> Self { | ||||
| impl From<db::NewVmReq> for NewVmReq { | ||||
|     fn from(new_vm_req: db::NewVmReq) -> Self { | ||||
|         Self { | ||||
|             uuid: new_vm_req.id.key().to_string(), | ||||
|             hostname: new_vm_req.hostname, | ||||
| @ -61,21 +61,21 @@ impl From<db::vm::NewVmReq> for NewVmReq { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<db::vm::NewVmResp> for NewVmResp { | ||||
|     fn from(resp: db::vm::NewVmResp) -> Self { | ||||
| impl From<db::NewVmResp> for NewVmResp { | ||||
|     fn from(resp: db::NewVmResp) -> Self { | ||||
|         match resp { | ||||
|             // TODO: This will require a small architecture change to pass MeasurementArgs from
 | ||||
|             // Daemon to CLI
 | ||||
|             db::vm::NewVmResp::Args(uuid, args) => { | ||||
|             db::NewVmResp::Args(uuid, args) => { | ||||
|                 NewVmResp { uuid, error: String::new(), args: Some(args) } | ||||
|             } | ||||
|             db::vm::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None }, | ||||
|             db::NewVmResp::Error(uuid, error) => NewVmResp { uuid, error, args: None }, | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<db::vm::UpdateVmReq> for UpdateVmReq { | ||||
|     fn from(update_vm_req: db::vm::UpdateVmReq) -> Self { | ||||
| impl From<db::UpdateVmReq> for UpdateVmReq { | ||||
|     fn from(update_vm_req: db::UpdateVmReq) -> Self { | ||||
|         Self { | ||||
|             uuid: update_vm_req.id.key().to_string(), | ||||
|             // daemon does not care about VM hostname
 | ||||
| @ -92,8 +92,8 @@ impl From<db::vm::UpdateVmReq> for UpdateVmReq { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<db::vm::DeletedVm> for DeleteVmReq { | ||||
|     fn from(delete_vm_req: db::vm::DeletedVm) -> Self { | ||||
| impl From<db::DeletedVm> for DeleteVmReq { | ||||
|     fn from(delete_vm_req: db::DeletedVm) -> Self { | ||||
|         Self { | ||||
|             uuid: delete_vm_req.id.key().to_string(), | ||||
|             admin_pubkey: delete_vm_req.admin.key().to_string(), | ||||
| @ -101,24 +101,24 @@ impl From<db::vm::DeletedVm> for DeleteVmReq { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<db::vm::DaemonNotification> for BrainVmMessage { | ||||
|     fn from(notification: db::vm::DaemonNotification) -> Self { | ||||
| impl From<db::VmDaemonNotification> for BrainVmMessage { | ||||
|     fn from(notification: db::VmDaemonNotification) -> Self { | ||||
|         match notification { | ||||
|             db::vm::DaemonNotification::Create(new_vm_req) => { | ||||
|             db::VmDaemonNotification::Create(new_vm_req) => { | ||||
|                 BrainVmMessage { msg: Some(brain_vm_message::Msg::NewVmReq(new_vm_req.into())) } | ||||
|             } | ||||
|             db::vm::DaemonNotification::Update(update_vm_req) => BrainVmMessage { | ||||
|             db::VmDaemonNotification::Update(update_vm_req) => BrainVmMessage { | ||||
|                 msg: Some(brain_vm_message::Msg::UpdateVmReq(update_vm_req.into())), | ||||
|             }, | ||||
|             db::vm::DaemonNotification::Delete(deleted_vm) => { | ||||
|             db::VmDaemonNotification::Delete(deleted_vm) => { | ||||
|                 BrainVmMessage { msg: Some(brain_vm_message::Msg::DeleteVm(deleted_vm.into())) } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<db::vm::ActiveVmWithNode> for VmContract { | ||||
|     fn from(db_c: db::vm::ActiveVmWithNode) -> Self { | ||||
| impl From<db::ActiveVmWithNode> for VmContract { | ||||
|     fn from(db_c: db::ActiveVmWithNode) -> Self { | ||||
|         let mut exposed_ports = Vec::new(); | ||||
|         for port in db_c.mapped_ports.iter() { | ||||
|             exposed_ports.push(port.0); | ||||
| @ -163,8 +163,8 @@ impl From<db::Error> for tonic::Status { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<db::general::Operator> for ListOperatorsResp { | ||||
|     fn from(db_o: db::general::Operator) -> Self { | ||||
| impl From<db::Operator> for ListOperatorsResp { | ||||
|     fn from(db_o: db::Operator) -> Self { | ||||
|         ListOperatorsResp { | ||||
|             pubkey: db_o.account.key().to_string(), | ||||
|             escrow: db_o.escrow, | ||||
| @ -176,8 +176,8 @@ impl From<db::general::Operator> for ListOperatorsResp { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<db::vm::NodeWithReports> for VmNodeListResp { | ||||
|     fn from(vm_node: db::vm::NodeWithReports) -> Self { | ||||
| impl From<db::VmNodeWithReports> for VmNodeListResp { | ||||
|     fn from(vm_node: db::VmNodeWithReports) -> Self { | ||||
|         Self { | ||||
|             operator: vm_node.operator.key().to_string(), | ||||
|             node_pubkey: vm_node.id.key().to_string(), | ||||
| @ -191,8 +191,8 @@ impl From<db::vm::NodeWithReports> for VmNodeListResp { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<db::app::NodeWithReports> for AppNodeListResp { | ||||
|     fn from(app_node: db::app::NodeWithReports) -> Self { | ||||
| impl From<db::AppNodeWithReports> for AppNodeListResp { | ||||
|     fn from(app_node: db::AppNodeWithReports) -> Self { | ||||
|         Self { | ||||
|             operator: app_node.operator.key().to_string(), | ||||
|             node_pubkey: app_node.id.key().to_string(), | ||||
| @ -206,7 +206,7 @@ impl From<db::app::NodeWithReports> for AppNodeListResp { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<VmNodeResources> for db::vm::NodeResources { | ||||
| impl From<VmNodeResources> for db::VmNodeResources { | ||||
|     fn from(res: VmNodeResources) -> Self { | ||||
|         Self { | ||||
|             avail_mem_mb: res.avail_memory_mb, | ||||
|  | ||||
| @ -1,6 +1,6 @@ | ||||
| #![allow(dead_code)] | ||||
| use crate::constants::{ACCOUNT, VM_NODE}; | ||||
| use crate::db; | ||||
| use crate::db::prelude as db; | ||||
| use crate::grpc::{check_sig_from_parts, check_sig_from_req}; | ||||
| use detee_shared::common_proto::Empty; | ||||
| use detee_shared::vm_proto::brain_vm_cli_server::BrainVmCli; | ||||
| @ -17,18 +17,18 @@ use tokio_stream::wrappers::ReceiverStream; | ||||
| use tokio_stream::{Stream, StreamExt}; | ||||
| use tonic::{Request, Response, Status, Streaming}; | ||||
| 
 | ||||
| pub struct BrainDaemonServer { | ||||
| pub struct VmDaemonServer { | ||||
|     pub db: Arc<Surreal<Client>>, | ||||
| } | ||||
| 
 | ||||
| impl BrainDaemonServer { | ||||
| impl VmDaemonServer { | ||||
|     pub fn new(db: Arc<Surreal<Client>>) -> Self { | ||||
|         Self { db } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[tonic::async_trait] | ||||
| impl BrainVmDaemon for BrainDaemonServer { | ||||
| impl BrainVmDaemon for VmDaemonServer { | ||||
|     type BrainMessagesStream = Pin<Box<dyn Stream<Item = Result<BrainVmMessage, Status>> + Send>>; | ||||
|     type RegisterVmNodeStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>; | ||||
| 
 | ||||
| @ -38,7 +38,7 @@ impl BrainVmDaemon for BrainDaemonServer { | ||||
|     ) -> Result<Response<Self::RegisterVmNodeStream>, Status> { | ||||
|         let req = check_sig_from_req(req)?; | ||||
|         info!("Starting registration process for {:?}", req); | ||||
|         db::vm::Node { | ||||
|         db::VmNode { | ||||
|             id: surrealdb::RecordId::from((VM_NODE, req.node_pubkey.clone())), | ||||
|             operator: surrealdb::RecordId::from((ACCOUNT, req.operator_wallet)), | ||||
|             country: req.country, | ||||
| @ -59,7 +59,7 @@ impl BrainVmDaemon for BrainDaemonServer { | ||||
|         .await?; | ||||
| 
 | ||||
|         info!("Sending existing contracts to {}", req.node_pubkey); | ||||
|         let contracts = db::vm::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?; | ||||
|         let contracts = db::ActiveVmWithNode::list_by_node(&self.db, &req.node_pubkey).await?; | ||||
|         let (tx, rx) = mpsc::channel(6); | ||||
|         tokio::spawn(async move { | ||||
|             for contract in contracts { | ||||
| @ -90,7 +90,7 @@ impl BrainVmDaemon for BrainDaemonServer { | ||||
|             let pubkey = pubkey.clone(); | ||||
|             let tx = tx.clone(); | ||||
|             tokio::spawn(async move { | ||||
|                 match db::listen_for_vm_node::<db::vm::DeletedVm>(&db, &pubkey, tx).await { | ||||
|                 match db::listen_for_vm_node::<db::DeletedVm>(&db, &pubkey, tx).await { | ||||
|                     Ok(()) => log::info!("db::VmContract::listen_for_node ended for {pubkey}"), | ||||
|                     Err(e) => { | ||||
|                         log::warn!("db::VmContract::listen_for_node errored for {pubkey}: {e}") | ||||
| @ -103,7 +103,7 @@ impl BrainVmDaemon for BrainDaemonServer { | ||||
|             let pubkey = pubkey.clone(); | ||||
|             let tx = tx.clone(); | ||||
|             tokio::spawn(async move { | ||||
|                 let _ = db::listen_for_vm_node::<db::vm::NewVmReq>(&db, &pubkey, tx.clone()).await; | ||||
|                 let _ = db::listen_for_vm_node::<db::NewVmReq>(&db, &pubkey, tx.clone()).await; | ||||
|             }); | ||||
|         } | ||||
|         { | ||||
| @ -111,7 +111,7 @@ impl BrainVmDaemon for BrainDaemonServer { | ||||
|             let pubkey = pubkey.clone(); | ||||
|             let tx = tx.clone(); | ||||
|             tokio::spawn(async move { | ||||
|                 let _ = db::listen_for_vm_node::<db::vm::UpdateVmReq>(&db, &pubkey, tx.clone()).await; | ||||
|                 let _ = db::listen_for_vm_node::<db::UpdateVmReq>(&db, &pubkey, tx.clone()).await; | ||||
|             }); | ||||
|         } | ||||
| 
 | ||||
| @ -151,7 +151,7 @@ impl BrainVmDaemon for BrainDaemonServer { | ||||
|                         // TODO: move new_vm_req to active_vm
 | ||||
|                         // also handle failure properly
 | ||||
|                         if !new_vm_resp.error.is_empty() { | ||||
|                             db::vm::NewVmReq::submit_error( | ||||
|                             db::NewVmReq::submit_error( | ||||
|                                 &self.db, | ||||
|                                 &new_vm_resp.uuid, | ||||
|                                 new_vm_resp.error, | ||||
| @ -166,7 +166,7 @@ impl BrainVmDaemon for BrainDaemonServer { | ||||
|                             ) | ||||
|                             .await?; | ||||
|                             if let Some(args) = new_vm_resp.args { | ||||
|                                 db::vm::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?; | ||||
|                                 db::ActiveVm::activate(&self.db, &new_vm_resp.uuid, args).await?; | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
| @ -175,7 +175,7 @@ impl BrainVmDaemon for BrainDaemonServer { | ||||
|                         // self.data.submit_updatevm_resp(update_vm_resp).await;
 | ||||
|                     } | ||||
|                     Some(vm_daemon_message::Msg::VmNodeResources(node_resources)) => { | ||||
|                         let node_resources: db::vm::NodeResources = node_resources.into(); | ||||
|                         let node_resources: db::VmNodeResources = node_resources.into(); | ||||
|                         node_resources.merge(&self.db, &pubkey).await?; | ||||
|                     } | ||||
|                     _ => {} | ||||
| @ -189,18 +189,18 @@ impl BrainVmDaemon for BrainDaemonServer { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub struct BrainCliServer { | ||||
| pub struct VmCliServer { | ||||
|     pub db: Arc<Surreal<Client>>, | ||||
| } | ||||
| 
 | ||||
| impl BrainCliServer { | ||||
| impl VmCliServer { | ||||
|     pub fn new(db: Arc<Surreal<Client>>) -> Self { | ||||
|         Self { db } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[tonic::async_trait] | ||||
| impl BrainVmCli for BrainCliServer { | ||||
| impl BrainVmCli for VmCliServer { | ||||
|     type ListVmContractsStream = Pin<Box<dyn Stream<Item = Result<VmContract, Status>> + Send>>; | ||||
|     type ListVmNodesStream = Pin<Box<dyn Stream<Item = Result<VmNodeListResp, Status>> + Send>>; | ||||
| 
 | ||||
| @ -211,14 +211,14 @@ impl BrainVmCli for BrainCliServer { | ||||
|             return Err(Status::permission_denied("This operator banned you. What did you do?")); | ||||
|         } | ||||
| 
 | ||||
|         let new_vm_req: db::vm::NewVmReq = req.into(); | ||||
|         let new_vm_req: db::NewVmReq = req.into(); | ||||
|         let id = new_vm_req.id.key().to_string(); | ||||
| 
 | ||||
|         let db = self.db.clone(); | ||||
| 
 | ||||
|         let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel(); | ||||
|         tokio::spawn(async move { | ||||
|             let _ = oneshot_tx.send(db::vm::NewVmResp::listen(&db, &id).await); | ||||
|             let _ = oneshot_tx.send(db::NewVmResp::listen(&db, &id).await); | ||||
|         }); | ||||
|         new_vm_req.submit(&self.db).await?; | ||||
| 
 | ||||
| @ -284,7 +284,7 @@ impl BrainVmCli for BrainCliServer { | ||||
|         let mut contracts = Vec::new(); | ||||
|         if !req.uuid.is_empty() { | ||||
|             if let Some(specific_contract) = | ||||
|                 db::vm::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? | ||||
|                 db::ActiveVmWithNode::get_by_uuid(&self.db, &req.uuid).await? | ||||
|             { | ||||
|                 if specific_contract.admin.key().to_string() == req.wallet { | ||||
|                     contracts.push(specific_contract); | ||||
| @ -293,10 +293,10 @@ impl BrainVmCli for BrainCliServer { | ||||
|             } | ||||
|         } else if req.as_operator { | ||||
|             contracts | ||||
|                 .append(&mut db::vm::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?); | ||||
|                 .append(&mut db::ActiveVmWithNode::list_by_operator(&self.db, &req.wallet).await?); | ||||
|         } else { | ||||
|             contracts | ||||
|                 .append(&mut db::vm::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?); | ||||
|                 .append(&mut db::ActiveVmWithNode::list_by_admin(&self.db, &req.wallet).await?); | ||||
|         } | ||||
|         let (tx, rx) = mpsc::channel(6); | ||||
|         tokio::spawn(async move { | ||||
| @ -314,7 +314,7 @@ impl BrainVmCli for BrainCliServer { | ||||
|     ) -> Result<Response<Self::ListVmNodesStream>, tonic::Status> { | ||||
|         let req = check_sig_from_req(req)?; | ||||
|         info!("CLI requested ListVmNodesStream: {req:?}"); | ||||
|         let nodes = db::vm::NodeWithReports::find_by_filters(&self.db, req).await?; | ||||
|         let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; | ||||
|         let (tx, rx) = mpsc::channel(6); | ||||
|         tokio::spawn(async move { | ||||
|             for node in nodes { | ||||
| @ -332,7 +332,7 @@ impl BrainVmCli for BrainCliServer { | ||||
|         let req = check_sig_from_req(req)?; | ||||
|         info!("Unknown CLI requested ListVmNodesStream: {req:?}"); | ||||
|         // TODO: optimize this query so that it gets only one node
 | ||||
|         let nodes = db::vm::NodeWithReports::find_by_filters(&self.db, req).await?; | ||||
|         let nodes = db::VmNodeWithReports::find_by_filters(&self.db, req).await?; | ||||
|         if let Some(node) = nodes.into_iter().next() { | ||||
|             return Ok(Response::new(node.into())); | ||||
|         } | ||||
|  | ||||
| @ -5,7 +5,10 @@ use dotenv::dotenv; | ||||
| use hyper_util::rt::TokioIo; | ||||
| use std::net::SocketAddr; | ||||
| use std::sync::Arc; | ||||
| use surreal_brain::grpc; | ||||
| use surreal_brain::grpc::{ | ||||
|     general::GeneralCliServer, | ||||
|     vm::{VmCliServer, VmDaemonServer}, | ||||
| }; | ||||
| use surrealdb::engine::remote::ws::Client; | ||||
| use surrealdb::Surreal; | ||||
| use tokio::io::DuplexStream; | ||||
| @ -57,11 +60,9 @@ pub async fn run_service_in_background() -> SocketAddr { | ||||
|         let db_arc = Arc::new(db); | ||||
| 
 | ||||
|         Server::builder() | ||||
|             .add_service(BrainGeneralCliServer::new(grpc::general::BrainCliServer::new( | ||||
|                 db_arc.clone(), | ||||
|             ))) | ||||
|             .add_service(BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone()))) | ||||
|             .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) | ||||
|             .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) | ||||
|             .await | ||||
|             .unwrap(); | ||||
| @ -89,9 +90,9 @@ pub async fn run_service_for_stream_server() -> DuplexStream { | ||||
|         let db_arc = Arc::new(db); | ||||
| 
 | ||||
|         tonic::transport::Server::builder() | ||||
|             .add_service(BrainGeneralCliServer::new(grpc::general::BrainCliServer::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmCliServer::new(grpc::vm::BrainCliServer::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmDaemonServer::new(grpc::vm::BrainDaemonServer::new(db_arc.clone()))) | ||||
|             .add_service(BrainGeneralCliServer::new(GeneralCliServer::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmCliServer::new(VmCliServer::new(db_arc.clone()))) | ||||
|             .add_service(BrainVmDaemonServer::new(VmDaemonServer::new(db_arc.clone()))) | ||||
|             .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) | ||||
|             .await | ||||
|     }); | ||||
|  | ||||
| @ -2,7 +2,7 @@ use super::test_utils::Key; | ||||
| use detee_shared::vm_proto; | ||||
| use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||
| use surreal_brain::constants::{ACTIVE_VM, NEW_VM_REQ}; | ||||
| use surreal_brain::db; | ||||
| use surreal_brain::db::prelude as db; | ||||
| use surrealdb::engine::remote::ws::Client; | ||||
| use surrealdb::Surreal; | ||||
| use tonic::transport::Channel; | ||||
| @ -32,14 +32,14 @@ pub async fn create_new_vm( | ||||
|     // wait for update db
 | ||||
|     tokio::time::sleep(tokio::time::Duration::from_millis(700)).await; | ||||
| 
 | ||||
|     let vm_req_db: Option<db::vm::NewVmReq> = | ||||
|     let vm_req_db: Option<db::NewVmReq> = | ||||
|         db.select((NEW_VM_REQ, new_vm_resp.uuid.clone())).await.unwrap(); | ||||
| 
 | ||||
|     if let Some(new_vm_req) = vm_req_db { | ||||
|         panic!("New VM request found in DB: {:?}", new_vm_req); | ||||
|     } | ||||
| 
 | ||||
|     let active_vm_op: Option<db::vm::ActiveVm> = | ||||
|     let active_vm_op: Option<db::ActiveVm> = | ||||
|         db.select((ACTIVE_VM, new_vm_resp.uuid.clone())).await.unwrap(); | ||||
|     let active_vm = active_vm_op.unwrap(); | ||||
| 
 | ||||
|  | ||||
| @ -11,7 +11,7 @@ use detee_shared::vm_proto::brain_vm_cli_client::BrainVmCliClient; | ||||
| use detee_shared::vm_proto::ListVmContractsReq; | ||||
| use futures::StreamExt; | ||||
| use surreal_brain::constants::VM_NODE; | ||||
| use surreal_brain::db; | ||||
| use surreal_brain::db::vm::VmNodeWithReports; | ||||
| 
 | ||||
| mod common; | ||||
| 
 | ||||
| @ -88,7 +88,7 @@ async fn test_report_node() { | ||||
|         .unwrap() | ||||
|         .into_inner(); | ||||
| 
 | ||||
|     let vm_nodes: Vec<db::vm::NodeWithReports> = db | ||||
|     let vm_nodes: Vec<VmNodeWithReports> = db | ||||
|         .query(format!( | ||||
|             "SELECT *, <-report.* as reports FROM {VM_NODE} WHERE id = {VM_NODE}:{daemon_key};" | ||||
|         )) | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user