Skip to content

Commit

Permalink
Add links to ack_span in the subscribe_spans in dispatcher.retry_ack()
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Sep 24, 2024
1 parent 5874a96 commit a791570
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
8 changes: 8 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def _retry_acks(self, requests_to_retry: List[requests.AckRequest]):
subscription_id: Optional[str] = None
project_id: Optional[str] = None
subscribe_links: List[trace.Link] = []
subscribe_spans: List[trace.Span] = []
for req in requests_to_retry:
if req.opentelemetry_data:
req.opentelemetry_data.add_subscribe_span_event("ack start")
Expand All @@ -356,6 +357,7 @@ def _retry_acks(self, requests_to_retry: List[requests.AckRequest]):
subscribe_links.append(
trace.Link(subscribe_span.get_span_context())
)
subscribe_spans.append(subscribe_span)
ack_span: Optional[trace.Span] = None
if subscription_id and project_id:
ack_span = start_ack_span(
Expand All @@ -364,6 +366,12 @@ def _retry_acks(self, requests_to_retry: List[requests.AckRequest]):
project_id,
subscribe_links,
)
if (
ack_span and ack_span.get_span_context().trace_flags.sampled
): # pragma: NO COVER
ack_span_context: trace.SpanContext = ack_span.get_span_context()
for subscribe_span in subscribe_spans:
subscribe_span.add_link(ack_span_context)

requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=[req.ack_id for req in requests_to_retry],
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,10 @@ def test_retry_acks_in_new_thread():
assert ctor_call.kwargs["daemon"]


@pytest.mark.skipif(
sys.version_info < (3, 8),
reason="Open Telemetry not supported below Python version 3.8",
)
def test_opentelemetry_retry_acks(span_exporter):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
Expand Down Expand Up @@ -664,6 +668,14 @@ def test_opentelemetry_retry_acks(span_exporter):
assert subscribe_span.events[0].name == "ack start"
assert subscribe_span.events[1].name == "ack end"

# This subscribe span is sampled, so we expect it to be linked to the ack
# span.
assert len(spans[1].links) == 1
assert spans[1].links[0].context == ack_span.context
# This subscribe span is not sampled, so we expect it to not be linked to
# the ack span
assert len(spans[2].links) == 0

assert ack_span.name == "subscriptionID ack"
assert ack_span.kind == trace.SpanKind.CLIENT
assert ack_span.parent is None
Expand Down

0 comments on commit a791570

Please sign in to comment.