Skip to content

Commit

Permalink
[Python Otel] Manage call tracer life cycle use call arena. (v1.65.x …
Browse files Browse the repository at this point in the history
…backport) (#37478)

Backport of #37460 to v1.65.x.
---
We're seeing segfault in Python CSM tests:
```
2024-08-03T09:49:45.720555997Z *** SIGSEGV received at time=1722678585 on cpu 0 ***
2024-08-03T09:49:45.721761998Z PC: @     0x7847ffd5c1c9  (unknown)  (unknown)
2024-08-03T09:49:45.722070502Z     @     0x7847fa309d8c         64  absl::lts_20240116::WriteFailureInfo()
2024-08-03T09:49:45.722175904Z     @     0x7847fa309a15        272  absl::lts_20240116::AbslFailureSignalHandler()
2024-08-03T09:49:45.722187675Z     @     0x7847ffc3d050       1592  (unknown)
2024-08-03T09:49:45.723432238Z     @     0x7847e97f9390  (unknown)  (unknown)
2024-08-03T09:49:45.723487349Z     @ ... and at least 1 more frames
2024-08-03T09:49:45.829702781Z [INFO  tini (1)] Spawned child process '/xds_interop_client' with pid '7'
2024-08-03T09:49:45.829766869Z [DEBUG tini (1)] Received SIGCHLD
2024-08-03T09:49:45.829778749Z [DEBUG tini (1)] Reaped child with pid: '7'
2024-08-03T09:49:45.829787070Z [INFO  tini (1)] Main child exited with signal (with signal 'Segmentation fault')
```

### The issue

After investigation, we found that the call tracer was deleted before
`RecordEnd` was called.

### Why this fix

* To fix this, we decide to use arena to manage the life cycle of
CallTracer.
* Since CallTracer was created in another shard object library
(`grpcio_observability`) which don't have a dependency on grpc core, we
can't use `grpc_core::Arena` directly when creating the call tracer.
* As a workaround, we created a wrapper class `ClientCallTracerWrapper`
to wrap the CallTracer and created another core API
`grpc_call_tracer_set_and_manage` so that we can manage the life cycle
of CallTracer use the wrapper class.


<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
  • Loading branch information
XuanWang-Amos authored Aug 14, 2024
1 parent 1e8f711 commit dcbbf06
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 71 deletions.
7 changes: 7 additions & 0 deletions src/core/lib/surface/call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,13 @@ void grpc_call_tracer_set(grpc_call* call,
return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
}

void grpc_call_tracer_set_and_manage(grpc_call* call,
grpc_core::ClientCallTracer* tracer) {
grpc_core::Arena* arena = grpc_call_get_arena(call);
arena->ManagedNew<ClientCallTracerWrapper>(tracer);
return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
}

void* grpc_call_tracer_get(grpc_call* call) {
grpc_core::Arena* arena = grpc_call_get_arena(call);
auto* call_tracer =
Expand Down
19 changes: 19 additions & 0 deletions src/core/lib/surface/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ void grpc_call_log_batch(const char* file, int line, gpr_log_severity severity,

void grpc_call_tracer_set(grpc_call* call, grpc_core::ClientCallTracer* tracer);

// Sets call tracer on the call and manages its life by using the call's arena.
// When using this API, the tracer will be destroyed by grpc_call arena when
// grpc_call is about to be destroyed. The caller of this API SHOULD NOT
// manually destroy the tracer. This API is used by Python as a way of using
// Arena to manage the lifetime of the call tracer. Python needs this API
// because the tracer was created within a separate shared object library which
// doesn't have access to core functions like arena->ManagedNew<>.
void grpc_call_tracer_set_and_manage(grpc_call* call,
grpc_core::ClientCallTracer* tracer);

void* grpc_call_tracer_get(grpc_call* call);

#define GRPC_CALL_LOG_BATCH(sev, ops, nops) \
Expand All @@ -285,6 +295,15 @@ void* grpc_call_tracer_get(grpc_call* call);

uint8_t grpc_call_is_client(grpc_call* call);

class ClientCallTracerWrapper {
public:
explicit ClientCallTracerWrapper(grpc_core::ClientCallTracer* tracer)
: tracer_(tracer) {}

private:
std::unique_ptr<grpc_core::ClientCallTracer> tracer_;
};

// Return an appropriate compression algorithm for the requested compression \a
// level in the context of \a call.
grpc_compression_algorithm grpc_call_compression_for_level(
Expand Down
2 changes: 1 addition & 1 deletion src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ cdef class _CallState:
cdef object call_tracer_capsule
cdef void maybe_save_registered_method(self, bytes method_name) except *
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *
cdef void maybe_delete_call_tracer(self) except *
cdef void delete_call(self) except *


cdef class _ChannelState:
Expand Down
30 changes: 9 additions & 21 deletions src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ cdef class _CallState:
def __cinit__(self):
self.due = set()

cdef void maybe_delete_call_tracer(self) except *:
if not self.call_tracer_capsule:
return
_observability.delete_call_tracer(self.call_tracer_capsule)
cdef void delete_call(self) except *:
with nogil:
grpc_call_unref(self.c_call)
self.c_call = NULL

cdef void maybe_save_registered_method(self, bytes method_name) except *:
with _observability.get_plugin() as plugin:
Expand Down Expand Up @@ -291,9 +291,7 @@ cdef void _call(
grpc_call_credentials_release(c_call_credentials)
if c_call_error != GRPC_CALL_OK:
#TODO(xuanwn): Expand the scope of nogil
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
call_state.delete_call()
_raise_call_error_no_metadata(c_call_error)
started_tags = set()
for operations, user_tag in operationses_and_user_tags:
Expand All @@ -303,9 +301,7 @@ cdef void _call(
else:
grpc_call_cancel(call_state.c_call, NULL)
#TODO(xuanwn): Expand the scope of nogil
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
call_state.delete_call()
_raise_call_error(c_call_error, metadata)
else:
call_state.due.update(started_tags)
Expand All @@ -319,10 +315,7 @@ cdef void _process_integrated_call_tag(
cdef _CallState call_state = state.integrated_call_states.pop(tag)
call_state.due.remove(tag)
if not call_state.due:
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
call_state.maybe_delete_call_tracer()
call_state.delete_call()

cdef class IntegratedCall:

Expand Down Expand Up @@ -362,10 +355,7 @@ cdef object _process_segregated_call_tag(
call_state.due.remove(tag)
if not call_state.due:
#TODO(xuanwn): Expand the scope of nogil
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
call_state.maybe_delete_call_tracer()
call_state.delete_call()
state.segregated_call_states.remove(call_state)
_destroy_c_completion_queue(c_completion_queue)
return True
Expand All @@ -392,9 +382,7 @@ cdef class SegregatedCall:
self._channel_state, self._call_state, self._c_completion_queue, tag)
def on_failure():
self._call_state.due.clear()
with nogil:
grpc_call_unref(self._call_state.c_call)
self._call_state.c_call = NULL
self._call_state.delete_call()
self._channel_state.segregated_call_states.remove(self._call_state)
_destroy_c_completion_queue(self._c_completion_queue)
return _next_call_event(
Expand Down
2 changes: 1 addition & 1 deletion src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ cdef extern from "src/core/telemetry/call_tracer.h" namespace "grpc_core":
void RegisterGlobal(ServerCallTracerFactory* factory) nogil

cdef extern from "src/core/lib/surface/call.h":
void grpc_call_tracer_set(grpc_call* call, void* value) nogil
void grpc_call_tracer_set_and_manage(grpc_call* call, void* value) nogil

void* grpc_call_tracer_get(grpc_call* call) nogil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def maybe_save_server_trace_context(RequestCallEvent event) -> None:

cdef void _set_call_tracer(grpc_call* call, void* capsule_ptr):
cdef ClientCallTracer* call_tracer = <ClientCallTracer*>capsule_ptr
grpc_call_tracer_set(call, call_tracer)
grpc_call_tracer_set_and_manage(call, call_tracer)


cdef void* _get_call_tracer(grpc_call* call):
Expand Down
33 changes: 0 additions & 33 deletions src/python/grpcio/grpc/_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,6 @@ def create_client_call_tracer(
"""
raise NotImplementedError()

@abc.abstractmethod
def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule
) -> None:
"""Deletes the ClientCallTracer stored in ClientCallTracerCapsule.
After register the plugin, if tracing or stats is enabled, this method
will be called at the end of the call to destroy the ClientCallTracer.
The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer`
interface and wrapped in a PyCapsule using `client_call_tracer` as name.
Args:
client_call_tracer: A PyCapsule which stores a ClientCallTracer object.
"""
raise NotImplementedError()

@abc.abstractmethod
def save_trace_context(
self, trace_id: str, span_id: str, is_sampled: bool
Expand Down Expand Up @@ -276,22 +259,6 @@ def observability_deinit() -> None:
_cygrpc.clear_server_call_tracer_factory()


def delete_call_tracer(client_call_tracer_capsule: Any) -> None:
"""Deletes the ClientCallTracer stored in ClientCallTracerCapsule.
This method will be called at the end of the call to destroy the ClientCallTracer.
The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer`
interface and wrapped in a PyCapsule using `client_call_tracer` as the name.
Args:
client_call_tracer_capsule: A PyCapsule which stores a ClientCallTracer object.
"""
with get_plugin() as plugin:
if plugin and plugin.observability_enabled:
plugin.delete_client_call_tracer(client_call_tracer_capsule)


def maybe_record_rpc_latency(state: "_channel._RPCState") -> None:
"""Record the latency of the RPC, if the plugin is registered and stats is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,6 @@ def create_server_call_tracer_factory_capsule(dict exchange_labels, str identifi
return capsule


def delete_client_call_tracer(object client_call_tracer) -> None:
client_call_tracer: grpc._observability.ClientCallTracerCapsule

if cpython.PyCapsule_IsValid(client_call_tracer, CLIENT_CALL_TRACER):
capsule_ptr = cpython.PyCapsule_GetPointer(client_call_tracer, CLIENT_CALL_TRACER)
call_tracer_ptr = <ClientCallTracer*>capsule_ptr
del call_tracer_ptr


def _c_label_to_labels(vector[Label] c_labels) -> Dict[str, AnyStr]:
py_labels = {}
for label in c_labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,6 @@ def create_server_call_tracer_factory(
)
return capsule

def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule
) -> None:
_cyobservability.delete_client_call_tracer(client_call_tracer)

def save_trace_context(
self, trace_id: str, span_id: str, is_sampled: bool
) -> None:
Expand Down

0 comments on commit dcbbf06

Please sign in to comment.