Intermediate Echo Service - Streaming Patterns¶
This guide shows how to extend the Basic Echo Service with streaming RPC patterns.
Overview¶
Building on the basic unary Echo service, this guide demonstrates: - Server Streaming - Server sends multiple responses - Client Streaming - Client sends multiple requests - Request/Response Patterns - Handling streams effectively
Adding Server Streaming¶
Extend the proto definition:
service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse);
// Server streaming - returns multiple echo responses
rpc ServerStreamEcho(EchoRequest) returns (stream EchoResponse);
}
Implement in the handler:
async def ServerStreamEcho(
self,
request: echo_pb2.EchoRequest,
context: grpc.aio.ServicerContext
) -> AsyncIterator[echo_pb2.EchoResponse]:
"""Server streaming - sends multiple responses."""
logger.info(f"Server stream echo started: {request.message}")
# Send 5 echo responses with delay
for i in range(5):
if context.cancelled():
logger.info("Client cancelled server stream")
break
response = echo_pb2.EchoResponse(
reply=f"Echo #{i+1}: {request.message}"
)
yield response
await asyncio.sleep(1)
Client usage:
Adding Client Streaming¶
Extend the proto definition:
service EchoService {
// Client streaming - collects messages and returns summary
rpc ClientStreamEcho(stream EchoRequest) returns (EchoResponse);
}
Implement in the handler:
async def ClientStreamEcho(
self,
request_iterator: AsyncIterator[echo_pb2.EchoRequest],
context: grpc.aio.ServicerContext
) -> echo_pb2.EchoResponse:
"""Client streaming - collects messages and responds."""
messages = []
async for request in request_iterator:
messages.append(request.message)
logger.debug(f"Received message: {request.message}")
summary = f"Received {len(messages)} messages: {', '.join(messages)}"
return echo_pb2.EchoResponse(reply=summary)
Client usage:
async def request_generator():
for msg in ["Hello", "World", "RPC"]:
yield echo_pb2.EchoRequest(message=msg)
await asyncio.sleep(0.5)
response = await stub.ClientStreamEcho(request_generator())
print(f"Summary: {response.reply}")
Best Practices¶
1. Handle Cancellation¶
Always check if the client cancelled the stream:
async def ServerStreamEcho(self, request, context):
for i in range(100):
if context.cancelled():
logger.info("Stream cancelled by client")
return
yield create_response(i)
2. Add Timeouts¶
Protect against slow or stalled streams:
async def call_server_stream(self, message: str):
try:
async for response in asyncio.wait_for(
self._stub.ServerStreamEcho(request),
timeout=30.0
):
process(response)
except TimeoutError:
logger.error("Stream timed out")
3. Use Backpressure¶
Control message flow to prevent overwhelming the receiver:
async def controlled_stream(self):
semaphore = asyncio.Semaphore(10) # Max 10 in-flight
async for request in request_iterator:
async with semaphore:
await process_request(request)
Implementation Steps¶
To add streaming to your echo service:
- Update Protocol Buffer - Add streaming RPC definitions
- Regenerate Code - Run
protocto update_pb2files - Implement Handlers - Add streaming methods to servicer
- Update Client - Add streaming call methods
- Test Thoroughly - Test cancellation, timeouts, errors
Next Steps¶
- Echo Advanced - Bidirectional streaming and advanced patterns
- Advanced Topics Guide - Deep dive into async patterns and performance
- Error Handling Guide - Robust error management
Reference¶
- Basic Example: Echo Basic
- Source Code:
examples/echo_server.py,examples/echo_client.py - gRPC Streaming: gRPC Python Documentation