Simplified Python gRPC Interceptors¶
Contents
The primary aim of this project is to make Python gRPC interceptors simple.
The Python grpc
package provides service interceptors, but they’re a bit hard to
use because of their flexibility. The grpc
interceptors don’t have direct access
to the request and response objects, or the service context. Access to these are often
desired, to be able to log data in the request or response, or set status codes on the
context.
The secondary aim of this project is to keep the code small and simple. Code you can
read through and understand quickly gives you confidence and helps debug issues. When
you install this package, you also don’t want a bunch of other packages that might
cause conflicts within your project. Too many dependencies slow down installation
as well as runtime (fresh imports take time). Hence, a goal of this project is to keep
dependencies to a minimum. The only core dependency is the grpc
package, and the
testing
extra includes protobuf
as well.
The grpc_interceptor
package provides the following:
A
ServerInterceptor
base class, to make it easy to define your own server-side interceptors. Do not confuse this with thegrpc.ServerInterceptor
class.An
AsyncServerInterceptor
base class, which is the analogy for async server-side interceptors.An
ExceptionToStatusInterceptor
interceptor, so your service can raise exceptions that set the gRPC status code correctly (rather than the default of every exception resulting in anUNKNOWN
status code). This is something for which pretty much any service will have a use.An
AsyncExceptionToStatusInterceptor
interceptor, which is the analogy for asyncExceptionToStatusInterceptor
.A
ClientInterceptor
base class, to make it easy to define your own client-side interceptors. Do not confuse this with thegrpc.ClientInterceptor
class. (Note, there is currently no async analogy toClientInterceptor
, though contributions are welcome.)An optional testing framework. If you’re writing your own interceptors, this is useful. If you’re just using
ExceptionToStatusInterceptor
then you don’t need this.
Installation¶
To install just the interceptors:
$ pip install grpc-interceptor
To also install the testing framework:
$ pip install grpc-interceptor[testing]
Usage¶
Server Interceptors¶
To define your own server interceptor (we can use a simplified version of
ExceptionToStatusInterceptor
as an example):
from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
class ExceptionToStatusInterceptor(ServerInterceptor):
def intercept(
self,
method: Callable,
request_or_iterator: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
"""Override this method to implement a custom interceptor.
You should call method(request_or_iterator, context) to invoke the
next handler (either the RPC method implementation, or the
next interceptor in the list).
Args:
method: The next interceptor, or method implementation.
request_or_iterator: The RPC request, as a protobuf message.
context: The ServicerContext pass by gRPC to the service.
method_name: A string of the form
"/protobuf.package.Service/Method"
Returns:
This should generally return the result of
method(request_or_iterator, context), which is typically the RPC
method response, as a protobuf message. The interceptor
is free to modify this in some way, however.
"""
try:
return method(request_or_iterator, context)
except GrpcException as e:
context.set_code(e.status_code)
context.set_details(e.details)
raise
Then inject your interceptor when you create the grpc
server:
interceptors = [ExceptionToStatusInterceptor()]
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=interceptors
)
To use ExceptionToStatusInterceptor
:
from grpc_interceptor.exceptions import NotFound
class MyService(my_pb2_grpc.MyServiceServicer):
def MyRpcMethod(
self, request: MyRequest, context: grpc.ServicerContext
) -> MyResponse:
thing = lookup_thing()
if not thing:
raise NotFound("Sorry, your thing is missing")
...
This results in the gRPC status status code being set to NOT_FOUND
,
and the details "Sorry, your thing is missing"
. This saves you the hassle of
catching exceptions in your service handler, or passing the context down into
helper functions so they can call context.abort
or context.set_code
. It allows
the more Pythonic approach of just raising an exception from anywhere in the code,
and having it be handled automatically.
Server Streaming Interceptors¶
The above example shows how to write an interceptor for a unary-unary RPC. Server
streaming RPCs need to be handled a little differently because method(request, context)
will return a generator. Hence, the code won’t actually run until you iterate over it.
Hence, if we were to continue the example of catching exceptions from RPCs, we would
need to do something like this:
class ExceptionToStatusInterceptor(ServerInterceptor):
def intercept(
self,
method: Callable,
request: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
try:
for response in method(request, context):
yield response
except GrpcException as e:
context.set_code(e.status_code)
context.set_details(e.details)
raise
However, this will only work for server streaming RPCs. In order to work with both unary and streaming RPCs, you’ll need to handle the unary case and streaming case separately, like this:
class ExceptionToStatusInterceptor(ServerInterceptor):
def intercept(self, method, request, context, method_name):
# Call the RPC. It could be either unary or streaming
try:
response_or_iterator = method(request, context)
except GrpcException as e:
# If it was unary, then any exception raised would be caught
# immediately, so handle it here.
context.set_code(e.status_code)
context.set_details(e.details)
raise
# Check if it's streaming
if hasattr(response_or_iterator, "__iter__"):
# Now we know it's a server streaming RPC, so the actual RPC method
# hasn't run yet. Delegate to a helper to iterate over it so it runs.
# The helper needs to re-yield the responses, and we need to return
# the generator that produces.
return self._intercept_streaming(response_or_iterator)
else:
# For unary cases, we are done, so just return the response.
return response_or_iterator
def _intercept_streaming(self, iterator):
try:
for resp in iterator:
yield resp
except GrpcException as e:
context.set_code(e.status_code)
context.set_details(e.details)
raise
Async Server Interceptors¶
Async interceptors are similar to sync ones, but there are two things of which you need to be aware.
First, async server streaming RPCs that are implemented with async def + yield
cannot be awaited. When you call such a method, you get back an async_generator
.
This is not await
-able (though you can async for
loop over it). This is
contrary to a unary RPC is implemented with async def + return
. That results in a
coroutine when called, which you can await
.
All this is to say that you mustn’t await method(request, context)
in an async
interceptor immediately. First, check if it’s an async_generator
. You can do this
by checking for the presence of the __aiter__
attribute.
Here’s an async version of our running ExceptionToStatusInterceptor
example:
from grpc_interceptor.exceptions import GrpcException
from grpc_interceptor.server import AsyncServerInterceptor
class AsyncExceptionToStatusInterceptor(AsyncServerInterceptor):
async def intercept(
self,
method: Callable,
request_or_iterator: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
try:
response_or_iterator = method(request_or_iterator, context)
if not hasattr(response_or_iterator, "__aiter__"):
# Unary, just await and return the response
return await response_or_iterator
except GrpcException as e:
await context.set_code(e.status_code)
await context.set_details(e.details)
raise
# Server streaming responses, delegate to an async generator helper.
# Note that we do NOT await this.
return self._intercept_streaming(response_or_iterator, context)
async def _intercept_streaming(self, iterator, context):
try:
async for r in iterator:
yield r
except GrpcException as e:
await context.set_code(e.status_code)
await context.set_details(e.details)
raise
The second thing you must be aware of with async RPCs, is that an
alternate streaming API
was added. With this API, instead of writing a server streaming RPC with
async def + yield
, you write it as async def + return
, but it returns None
.
The way it streams responses is by calling await context.write(...)
for each response
it streams. Similarly, client streaming can be achieved by calling
await context.read()
instead of iterating over the request object.
If you must support RPC services written using this new API, then you must be aware that
a server streaming RPC could return None
. In that case it will not be an
async_generator
even though it’s streaming. You will also need your own solution to
get access to the streaming response objects. For example, you could wrap the
context
object that you pass to method(request, context)
, so that you can
capture read
and write
calls.
Client Interceptors¶
We will use an invocation metadata injecting interceptor as an example of defining a client interceptor:
from grpc_interceptor import ClientCallDetails, ClientInterceptor
class MetadataClientInterceptor(ClientInterceptor):
def intercept(
self,
method: Callable,
request_or_iterator: Any,
call_details: grpc.ClientCallDetails,
):
"""Override this method to implement a custom interceptor.
This method is called for all unary and streaming RPCs. The interceptor
implementation should call `method` using a `grpc.ClientCallDetails` and the
`request_or_iterator` object as parameters. The `request_or_iterator`
parameter may be type checked to determine if this is a singluar request
for unary RPCs or an iterator for client-streaming or client-server streaming
RPCs.
Args:
method: A function that proceeds with the invocation by executing the next
interceptor in the chain or invoking the actual RPC on the underlying
channel.
request_or_iterator: RPC request message or iterator of request messages
for streaming requests.
call_details: Describes an RPC to be invoked.
Returns:
The type of the return should match the type of the return value received
by calling `method`. This is an object that is both a
`Call <https://grpc.github.io/grpc/python/grpc.html#grpc.Call>`_ for the
RPC and a `Future <https://grpc.github.io/grpc/python/grpc.html#grpc.Future>`_.
The actual result from the RPC can be got by calling `.result()` on the
value returned from `method`.
"""
new_details = ClientCallDetails(
call_details.method,
call_details.timeout,
[("authorization", "Bearer mysecrettoken")],
call_details.credentials,
call_details.wait_for_ready,
call_details.compression,
)
return method(request_or_iterator, new_details)
Now inject your interceptor when you create the grpc
channel:
interceptors = [MetadataClientInterceptor()]
with grpc.insecure_channel("grpc-server:50051") as channel:
channel = grpc.intercept_channel(channel, *interceptors)
...
Client interceptors can also be used to retry RPCs that fail due to specific errors, or a host of other use cases. There are some basic approaches in the tests to get you started.
Note: The method
in a client interceptor is a continuation
as described in the
client interceptor section of the gRPC docs.
When you invoke the continuation, you get a future back, which resolves to either the
result, or exception. This is different than invoking a client stub, which returns the
result directly. If the interceptor needs the value returned by the call, or to catch
exceptions, then you’ll need to do future = method(request_or_iterator, call_details)
,
followed by future.result()
. Check out the tests for
examples.
Testing¶
The testing framework provides an actual gRPC service and client, which you can inject interceptors into. This allows end-to-end testing, rather than mocking things out (such as the context). This can catch interactions between your interceptors and the gRPC framework, and also allows chaining interceptors.
The crux of the testing framework is the dummy_client
context manager. It provides
a client to a gRPC service, which by defaults echos the input
field of the request
to the output
field of the response.
You can also provide a special_cases
dict which tells the service to call arbitrary
functions when the input matches a key in the dict. This allows you to test things like
exceptions being thrown.
Here’s an example (again using ExceptionToStatusInterceptor
):
from grpc_interceptor import ExceptionToStatusInterceptor
from grpc_interceptor.exceptions import NotFound
from grpc_interceptor.testing import dummy_client, DummyRequest, raises
def test_exception():
special_cases = {"error": raises(NotFound())}
interceptors = [ExceptionToStatusInterceptor()]
with dummy_client(special_cases=special_cases, interceptors=interceptors) as client:
# Test a happy path first
assert client.Execute(DummyRequest(input="foo")).output == "foo"
# And now a special case
with pytest.raises(grpc.RpcError) as e:
client.Execute(DummyRequest(input="error"))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
Limitations¶
Known limitations:
Async client interceptors are not implemented.
The
read
/write
API for async streaming technically works, but you’ll need to roll your own solution to get access to streaming request and response objects.
Contributions or requests are welcome for any limitations you may find.