From 3b1a34b4406fe53932ab29439714848d35f4ccaa Mon Sep 17 00:00:00 2001 From: psydok <47638600+psydok@users.noreply.github.com> Date: Tue, 13 Feb 2024 06:00:12 +0500 Subject: [PATCH] Fix blocking of gRPC stream-to-stream requests when elasticapm is enabled (#1967) * Fix gprc support with streaming requests (#1966) * Update version.py --- .../contrib/grpc/async_server_interceptor.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/elasticapm/contrib/grpc/async_server_interceptor.py b/elasticapm/contrib/grpc/async_server_interceptor.py index 5af0c1372..e7c9b659f 100644 --- a/elasticapm/contrib/grpc/async_server_interceptor.py +++ b/elasticapm/contrib/grpc/async_server_interceptor.py @@ -33,20 +33,18 @@ import grpc import elasticapm -from elasticapm.contrib.grpc.server_interceptor import _ServicerContextWrapper, _wrap_rpc_behavior, get_trace_parent +from elasticapm.contrib.grpc.server_interceptor import _ServicerContextWrapper, get_trace_parent class _AsyncServerInterceptor(grpc.aio.ServerInterceptor): async def intercept_service(self, continuation, handler_call_details): - def transaction_wrapper(behavior, request_streaming, response_streaming): - async def _interceptor(request_or_iterator, context): - if request_streaming or response_streaming: # only unary-unary is supported - return behavior(request_or_iterator, context) + def wrap_unary_unary(behavior): + async def _interceptor(request, context): tp = get_trace_parent(handler_call_details) client = elasticapm.get_client() transaction = client.begin_transaction("request", trace_parent=tp) try: - result = behavior(request_or_iterator, _ServicerContextWrapper(context, transaction)) + result = behavior(request, _ServicerContextWrapper(context, transaction)) # This is so we can support both sync and async rpc functions if inspect.isawaitable(result): @@ -65,4 +63,12 @@ async def _interceptor(request_or_iterator, context): return _interceptor - return _wrap_rpc_behavior(await continuation(handler_call_details), transaction_wrapper) + handler = await continuation(handler_call_details) + if handler.request_streaming or handler.response_streaming: + return handler + + return grpc.unary_unary_rpc_method_handler( + wrap_unary_unary(handler.unary_unary), + request_deserializer=handler.request_deserializer, + response_serializer=handler.response_serializer, + )