renamed proto service and refactored

This commit is contained in:
ghe0 2025-02-02 02:38:22 +02:00
parent 7057a0584d
commit f89b0f8a81
Signed by: ghe0
GPG Key ID: 451028EE56A0FBB4
4 changed files with 44 additions and 104 deletions

52
Cargo.lock generated

@ -209,21 +209,6 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.31"
@ -231,7 +216,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink",
] ]
[[package]] [[package]]
@ -240,34 +224,6 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.31" version = "0.3.31"
@ -286,16 +242,10 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task", "futures-task",
"memchr",
"pin-project-lite", "pin-project-lite",
"pin-utils", "pin-utils",
"slab",
] ]
[[package]] [[package]]
@ -775,7 +725,7 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
name = "rust_test_grpc" name = "rust_test_grpc"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"futures", "async-stream",
"prost", "prost",
"prost-types", "prost-types",
"serde", "serde",

@ -12,7 +12,7 @@ name = "client"
path = "src/client.rs" path = "src/client.rs"
[dependencies] [dependencies]
futures = "0.3" async-stream = "0.3.6"
prost = "0.13" prost = "0.13"
prost-types = "0.13" prost-types = "0.13"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

3
rustfmt.toml Normal file

@ -0,0 +1,3 @@
reorder_impl_items = true
use_small_heuristics = "Max"
imports_granularity = "Crate"

@ -1,31 +1,31 @@
use futures::Stream;
use std::pin::Pin;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tonic::{transport::Server, Request, Response, Status};
mod grpc; mod grpc;
use grpc::dummy;
use std::pin::Pin;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::async_trait;
use tonic::{transport::Server, Request, Response, Status};
// Import the generated server trait and messages. use dummy::{
use grpc::dummy::brain_daemon_server::{BrainDaemon, BrainDaemonServer}; test_service_server::{TestService, TestServiceServer},
use grpc::dummy::{Empty, SomeRequest, SomeResponse, StreamRequest, StreamResponse}; Empty, SomeRequest, SomeResponse, StreamRequest, StreamResponse,
};
/// Our service implementation
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct MyBrainDaemon; pub struct MyTestService;
#[tonic::async_trait] #[async_trait]
impl BrainDaemon for MyBrainDaemon { impl TestService for MyTestService {
/// Unary RPC: (SomeRequest) -> SomeResponse
async fn get_something( async fn get_something(
&self, &self,
request: Request<SomeRequest>, request: Request<SomeRequest>,
) -> Result<Response<SomeResponse>, Status> { ) -> Result<Response<SomeResponse>, Status> {
// Extract the request message println!("Got a request from the client: {:?}", request.get_ref().what_client_sends);
let req = request.into_inner();
println!("Got a request: {:?}", req.what_client_sends);
// Build the response
let reply = SomeResponse { let reply = SomeResponse {
what_server_sends: format!("Server says: Hello, {}!", req.what_client_sends), what_server_sends: format!(
"Hello from server, you said: {}",
request.get_ref().what_client_sends
),
}; };
Ok(Response::new(reply)) Ok(Response::new(reply))
@ -37,41 +37,39 @@ impl BrainDaemon for MyBrainDaemon {
) -> Result<Response<Empty>, Status> { ) -> Result<Response<Empty>, Status> {
let mut stream = request.into_inner(); let mut stream = request.into_inner();
// Process the incoming stream println!("Client started streaming...");
while let Some(item) = stream.next().await {
let message = item?; while let Some(result) = stream.next().await {
println!("Received stream message: {}", message.what_client_sends); match result {
// Do something with the message ... Ok(msg) => {
println!("Client streamed message: {}", msg.what_client_sends);
}
Err(e) => {
println!("Stream error: {}", e);
return Err(Status::internal(format!("Error reading stream: {}", e)));
}
}
} }
// Return an Empty to the client println!("Client finished streaming.");
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
} }
/// Server streaming RPC: (Empty) -> (stream StreamResponse) type ServerMessagesStream = Pin<Box<dyn Stream<Item = Result<StreamResponse, Status>> + Send>>;
type ServerMessagesStream =
Pin<Box<dyn Stream<Item = Result<StreamResponse, Status>> + Send + 'static>>;
async fn server_messages( async fn server_messages(
&self, &self,
_request: Request<Empty>, _request: Request<Empty>,
) -> Result<Response<Self::ServerMessagesStream>, Status> { ) -> Result<Response<Self::ServerMessagesStream>, Status> {
// Create a channel (or use any stream you like)
let (tx, rx) = tokio::sync::mpsc::channel(4); let (tx, rx) = tokio::sync::mpsc::channel(4);
// Spawn a task that sends data to the client
tokio::spawn(async move { tokio::spawn(async move {
let messages = vec![ let messages =
"Hello from the server!", vec!["Hello from the server!", "Here's another message!", "And one more message!"];
"Here's another message!",
"And one more message!",
];
for msg in messages { for msg in messages {
// Send a single response let response = StreamResponse { what_server_sends: msg.to_string() };
let response = StreamResponse {
what_server_sends: msg.to_string(),
};
if let Err(e) = tx.send(Ok(response)).await { if let Err(e) = tx.send(Ok(response)).await {
eprintln!("Failed to send response via stream: {e}"); eprintln!("Failed to send response via stream: {e}");
@ -80,31 +78,20 @@ impl BrainDaemon for MyBrainDaemon {
} }
}); });
// Convert MPSC receiver into a Tonic-friendly stream
let output_stream = ReceiverStream::new(rx); let output_stream = ReceiverStream::new(rx);
// Box it up and return Ok(Response::new(Box::pin(output_stream) as Self::ServerMessagesStream))
Ok(Response::new(
Box::pin(output_stream) as Self::ServerMessagesStream
))
} }
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
// The address on which your server will listen
let addr = "[::1]:50051".parse()?; let addr = "[::1]:50051".parse()?;
let service = MyTestService::default();
// Create an instance of your service println!("Starting gRPC server on {}", addr);
let daemon = MyBrainDaemon::default();
println!("BrainDaemon server listening on {}", addr); Server::builder().add_service(TestServiceServer::new(service)).serve(addr).await?;
// Run the server
Server::builder()
.add_service(BrainDaemonServer::new(daemon))
.serve(addr)
.await?;
Ok(()) Ok(())
} }