diff --git a/Cargo.lock b/Cargo.lock index c36903f..17b8fbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -209,21 +209,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "futures" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.31" @@ -231,7 +216,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", - "futures-sink", ] [[package]] @@ -240,34 +224,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" -[[package]] -name = "futures-executor" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-io" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" - -[[package]] -name = "futures-macro" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "futures-sink" version = "0.3.31" @@ -286,16 +242,10 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ - "futures-channel", "futures-core", - "futures-io", - "futures-macro", - "futures-sink", "futures-task", - "memchr", "pin-project-lite", "pin-utils", - "slab", ] [[package]] @@ -775,7 +725,7 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" name = "rust_test_grpc" version = "0.1.0" dependencies = [ - "futures", + "async-stream", "prost", "prost-types", "serde", diff --git a/Cargo.toml b/Cargo.toml index d21d5f0..e52b76a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ name = "client" path = "src/client.rs" [dependencies] -futures = "0.3" +async-stream = "0.3.6" prost = "0.13" prost-types = "0.13" serde = { version = "1.0", features = ["derive"] } diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..a484977 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,3 @@ +reorder_impl_items = true +use_small_heuristics = "Max" +imports_granularity = "Crate" diff --git a/src/server.rs b/src/server.rs index dd0bf92..7a50bc3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,31 +1,31 @@ -use futures::Stream; -use std::pin::Pin; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; -use tonic::{transport::Server, Request, Response, Status}; mod grpc; +use grpc::dummy; +use std::pin::Pin; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; +use tonic::async_trait; +use tonic::{transport::Server, Request, Response, Status}; -// Import the generated server trait and messages. -use grpc::dummy::brain_daemon_server::{BrainDaemon, BrainDaemonServer}; -use grpc::dummy::{Empty, SomeRequest, SomeResponse, StreamRequest, StreamResponse}; +use dummy::{ + test_service_server::{TestService, TestServiceServer}, + Empty, SomeRequest, SomeResponse, StreamRequest, StreamResponse, +}; -/// Our service implementation #[derive(Debug, Default)] -pub struct MyBrainDaemon; +pub struct MyTestService; -#[tonic::async_trait] -impl BrainDaemon for MyBrainDaemon { - /// Unary RPC: (SomeRequest) -> SomeResponse +#[async_trait] +impl TestService for MyTestService { async fn get_something( &self, request: Request, ) -> Result, Status> { - // Extract the request message - let req = request.into_inner(); - println!("Got a request: {:?}", req.what_client_sends); + println!("Got a request from the client: {:?}", request.get_ref().what_client_sends); - // Build the response let reply = SomeResponse { - what_server_sends: format!("Server says: Hello, {}!", req.what_client_sends), + what_server_sends: format!( + "Hello from server, you said: {}", + request.get_ref().what_client_sends + ), }; Ok(Response::new(reply)) @@ -37,41 +37,39 @@ impl BrainDaemon for MyBrainDaemon { ) -> Result, Status> { let mut stream = request.into_inner(); - // Process the incoming stream - while let Some(item) = stream.next().await { - let message = item?; - println!("Received stream message: {}", message.what_client_sends); - // Do something with the message ... + println!("Client started streaming..."); + + while let Some(result) = stream.next().await { + match result { + Ok(msg) => { + println!("Client streamed message: {}", msg.what_client_sends); + } + Err(e) => { + println!("Stream error: {}", e); + return Err(Status::internal(format!("Error reading stream: {}", e))); + } + } } - // Return an Empty to the client + println!("Client finished streaming."); + Ok(Response::new(Empty {})) } - /// Server streaming RPC: (Empty) -> (stream StreamResponse) - type ServerMessagesStream = - Pin> + Send + 'static>>; + type ServerMessagesStream = Pin> + Send>>; async fn server_messages( &self, _request: Request, ) -> Result, Status> { - // Create a channel (or use any stream you like) let (tx, rx) = tokio::sync::mpsc::channel(4); - // Spawn a task that sends data to the client tokio::spawn(async move { - let messages = vec![ - "Hello from the server!", - "Here's another message!", - "And one more message!", - ]; + let messages = + vec!["Hello from the server!", "Here's another message!", "And one more message!"]; for msg in messages { - // Send a single response - let response = StreamResponse { - what_server_sends: msg.to_string(), - }; + let response = StreamResponse { what_server_sends: msg.to_string() }; if let Err(e) = tx.send(Ok(response)).await { eprintln!("Failed to send response via stream: {e}"); @@ -80,31 +78,20 @@ impl BrainDaemon for MyBrainDaemon { } }); - // Convert MPSC receiver into a Tonic-friendly stream let output_stream = ReceiverStream::new(rx); - // Box it up and return - Ok(Response::new( - Box::pin(output_stream) as Self::ServerMessagesStream - )) + Ok(Response::new(Box::pin(output_stream) as Self::ServerMessagesStream)) } } #[tokio::main] async fn main() -> Result<(), Box> { - // The address on which your server will listen let addr = "[::1]:50051".parse()?; + let service = MyTestService::default(); - // Create an instance of your service - let daemon = MyBrainDaemon::default(); + println!("Starting gRPC server on {}", addr); - println!("BrainDaemon server listening on {}", addr); - - // Run the server - Server::builder() - .add_service(BrainDaemonServer::new(daemon)) - .serve(addr) - .await?; + Server::builder().add_service(TestServiceServer::new(service)).serve(addr).await?; Ok(()) }