diff --git a/Cargo.lock b/Cargo.lock index e45c7ec..97a4195 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,55 @@ dependencies = [ "memchr", ] +[[package]] +name = "anstream" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + +[[package]] +name = "anstyle-parse" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" +dependencies = [ + "windows-sys 0.59.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2109dbce0e72be3ec00bed26e6a7479ca384ad226efdd66db8fa2e3a38c83125" +dependencies = [ + "anstyle", + "windows-sys 0.59.0", +] + [[package]] name = "anyhow" version = "1.0.94" @@ -156,6 +205,8 @@ name = "brain-mock" version = "0.1.0" dependencies = [ "dashmap", + "env_logger", + "log", "prost", "prost-types", "tokio", @@ -183,6 +234,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "colorchoice" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -209,6 +266,29 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "env_filter" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcaee3d8e3cfc3fd92428d477bc97fc29ec8716d180c0d74c643bb26166660e0" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -388,6 +468,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "1.5.2" @@ -461,6 +547,12 @@ dependencies = [ "hashbrown 0.15.2", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itertools" version = "0.13.0" @@ -1071,6 +1163,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.11.0" diff --git a/Cargo.toml b/Cargo.toml index fae1b2b..4a78dd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,8 @@ edition = "2021" [dependencies] dashmap = "6.1.0" +env_logger = "0.11.6" +log = "0.4.22" prost = "0.13.4" prost-types = "0.13.4" tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } diff --git a/src/data.rs b/src/data.rs index af87d1f..a11e929 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,6 +1,7 @@ #![allow(dead_code)] use crate::grpc::brain as grpc; use dashmap::DashMap; +use log::debug; use std::sync::Arc; use std::sync::RwLock; use tokio::sync::mpsc::Sender; @@ -101,6 +102,7 @@ pub struct BrainData { daemon_newvm_tx: DashMap>, } +#[derive(Debug)] enum TxType { CliContract, DaemonDeleteVm, @@ -115,15 +117,27 @@ pub struct GuardTx { impl Drop for GuardTx { fn drop(&mut self) { - match self.tx_type { - TxType::CliContract => self.brain_data.del_cli_vmcontract_tx(&self.key), - TxType::DaemonDeleteVm => self.brain_data.del_daemon_deletevm_tx(&self.key), - TxType::DaemonNewVm => self.brain_data.del_daemon_newvm_tx(&self.key), - } + debug!("Dropping {:?} for {}", self.tx_type, self.key); + // match self.tx_type { + // TxType::CliContract => self.brain_data.del_cli_vmcontract_tx(&self.key), + // TxType::DaemonDeleteVm => self.brain_data.del_daemon_deletevm_tx(&self.key), + // TxType::DaemonNewVm => self.brain_data.del_daemon_newvm_tx(&self.key), + // } } } impl BrainData { + pub fn new() -> Self { + Self { + nodes: RwLock::new(Vec::new()), + contracts: RwLock::new(Vec::new()), + tmp_vmrequests: DashMap::new(), + cli_vmcontract_tx: DashMap::new(), + daemon_deletevm_tx: DashMap::new(), + daemon_newvm_tx: DashMap::new(), + } + } + pub fn insert_node(&self, node: grpc::RegisterNodeRequest) { let mut nodes = self.nodes.write().unwrap(); for n in nodes.iter_mut() { diff --git a/src/grpc.rs b/src/grpc.rs index e3e79f4..49bbbfa 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -8,19 +8,34 @@ use crate::data::BrainData; use brain::brain_cli_service_server::BrainCliService; use brain::brain_daemon_service_server::BrainDaemonService; use brain::*; +use log::debug; +use log::info; use std::pin::Pin; use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; -struct BrainDaemonMock { +pub struct BrainDaemonMock { data: Arc, } -struct BrainCliMock { + +impl BrainDaemonMock { + pub fn new(data: Arc) -> Self { + Self { data } + } +} + +pub struct BrainCliMock { data: Arc, } +impl BrainCliMock { + pub fn new(data: Arc) -> Self { + Self { data } + } +} + #[tonic::async_trait] impl BrainDaemonService for BrainDaemonMock { async fn register_node( @@ -37,16 +52,29 @@ impl BrainDaemonService for BrainDaemonMock { &self, req: Request, ) -> Result, Status> { + let req = req.into_inner(); + info!( + "Daemon {} connected for GetNewVMReqsStream", + req.node_pubkey + ); let (grpc_tx, grpc_rx) = mpsc::channel(6); let (data_tx, mut data_rx) = mpsc::channel(6); - let _dropper = self + self .data .clone() - .add_daemon_newvm_tx(&req.into_inner().node_pubkey, data_tx); + .add_daemon_newvm_tx(&req.node_pubkey, data_tx); + let node_pubkey = req.node_pubkey.clone(); + let data = self.data.clone(); tokio::spawn(async move { while let Some(newvmreq) = data_rx.recv().await { + debug!( + "received this newvmreq to {}: {newvmreq:?}", + req.node_pubkey + ); let _ = grpc_tx.send(Ok(newvmreq)).await; } + let _ = dropper; + data.del_cli_vmcontract_tx(&node_pubkey); }); let output_stream = ReceiverStream::new(grpc_rx); Ok(Response::new( diff --git a/src/main.rs b/src/main.rs index fe7e4f3..bf0e8a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,29 @@ -mod grpc; mod data; -fn main() { - println!("Hello, world!"); +mod grpc; + +use data::BrainData; +use grpc::brain::brain_cli_service_server::BrainCliServiceServer; +use grpc::brain::brain_daemon_service_server::BrainDaemonServiceServer; +use grpc::BrainCliMock; +use grpc::BrainDaemonMock; +use std::sync::Arc; +use tonic::transport::Server; + +#[tokio::main] +async fn main() { + env_logger::builder() + .filter_level(log::LevelFilter::Debug) + .init(); + let data = Arc::new(BrainData::new()); + let addr = "[::1]:31337".parse().unwrap(); + + let daemon_server = BrainDaemonServiceServer::new(BrainDaemonMock::new(data.clone())); + let cli_server = BrainCliServiceServer::new(BrainCliMock::new(data.clone())); + + Server::builder() + .add_service(daemon_server) + .add_service(cli_server) + .serve(addr) + .await + .unwrap(); }