From b66fd19e5cf7e43e75c076bce7e25dce98878735 Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Mon, 23 Sep 2024 02:04:14 +0000 Subject: [PATCH] Add ack span --- .../open_telemetry/subscribe_opentelemetry.py | 44 ++++++++++---- .../subscriber/_protocol/dispatcher.py | 34 +++++++++++ .../pubsub_v1/subscriber/test_dispatcher.py | 59 +++++++++++++++---- 3 files changed, 113 insertions(+), 24 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index 0189c6fbc..aae1ea9a4 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -24,11 +24,11 @@ ) from google.pubsub_v1.types import PubsubMessage +_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" +_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" -class SubscribeOpenTelemetry: - _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" - _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" +class SubscribeOpenTelemetry: def __init__(self, message: PubsubMessage): self._message: PubsubMessage = message @@ -79,7 +79,7 @@ def start_subscribe_span( ack_id: str, delivery_attempt: int, ) -> None: - tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) parent_span_context = TraceContextTextMapPropagator().extract( carrier=self._message, getter=OpenTelemetryContextGetter(), @@ -94,7 +94,7 @@ def start_subscribe_span( context=parent_span_context if parent_span_context else None, kind=trace.SpanKind.CONSUMER, attributes={ - "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, "messaging.destination.name": subscription_short_name, "gcp.project_id": subscription.split("/")[1], "messaging.message.id": self._message.message_id, @@ -131,7 +131,7 @@ def set_subscribe_span_result(self, result: str) -> None: def start_subscribe_concurrency_control_span(self) -> None: assert self._subscribe_span is not None - tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) with tracer.start_as_current_span( name="subscriber concurrency control", kind=trace.SpanKind.INTERNAL, @@ -146,7 +146,7 @@ def end_subscribe_concurrency_control_span(self) -> None: def start_subscribe_scheduler_span(self) -> None: assert self._subscribe_span is not None - tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) with tracer.start_as_current_span( name="subscriber scheduler", kind=trace.SpanKind.INTERNAL, @@ -161,7 +161,7 @@ def end_subscribe_scheduler_span(self) -> None: def start_process_span(self) -> None: assert self._subscribe_span is not None - tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) publish_create_span_link: Optional[trace.Link] = None if self._publisher_create_span_context: publish_create_span: trace.Span = trace.get_current_span( @@ -177,7 +177,7 @@ def start_process_span(self) -> None: with tracer.start_as_current_span( name=f"{self._subscription_id} process", attributes={ - "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, }, kind=trace.SpanKind.INTERNAL, context=set_span_in_context(self._subscribe_span), @@ -209,8 +209,6 @@ def start_modack_span( code_function: str, receipt_modack: bool, ) -> trace.Span: - _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" - _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" assert subscription_id is not None assert project_id is not None tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) @@ -231,3 +229,27 @@ def start_modack_span( end_on_exit=False, ) as modack_span: return modack_span + + +def start_ack_span( + subscription_id: str, + message_count: int, + project_id: str, + links: List[trace.Link], +) -> trace.Span: + tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) + with tracer.start_as_current_span( + name=f"{subscription_id} ack", + attributes={ + "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.batch.message_count": message_count, + "messaging.operation": "ack", + "gcp.project_id": project_id, + "messaging.destination.name": subscription_id, + "code.function": "ack", + }, + kind=trace.SpanKind.CLIENT, + links=links, + end_on_exit=False, + ) as ack_span: + return ack_span diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 0fc02ddeb..6accf37d0 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -26,11 +26,14 @@ import warnings from google.api_core.retry import exponential_sleep_generator +from opentelemetry import trace + from google.cloud.pubsub_v1.subscriber._protocol import helper_threads from google.cloud.pubsub_v1.subscriber._protocol import requests from google.cloud.pubsub_v1.subscriber.exceptions import ( AcknowledgeStatus, ) +from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import start_ack_span if typing.TYPE_CHECKING: # pragma: NO COVER import queue @@ -232,18 +235,49 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: items_gen = iter(items) ack_ids_gen = (item.ack_id for item in items) total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) + subscription_id: Optional[str] = None + project_id: Optional[str] = None for item in items: if item.opentelemetry_data: item.opentelemetry_data.add_subscribe_span_event("ack start") + if subscription_id is None: + subscription_id = item.opentelemetry_data.subscription_id + if project_id is None: + project_id = item.opentelemetry_data.project_id + for _ in range(total_chunks): ack_reqs_dict = { req.ack_id: req for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) } + + subscribe_links: List[trace.Link] = [] + for ack_req in ack_reqs_dict.values(): + if ack_req.opentelemetry_data: + subscribe_span: Optional[ + trace.Span + ] = ack_req.opentelemetry_data.subscribe_span + if ( + subscribe_span + and subscribe_span.get_span_context().trace_flags.sampled + ): + subscribe_links.append( + trace.Link(subscribe_span.get_span_context()) + ) + ack_span: Optional[trace.Span] = None + if subscription_id and project_id: + ack_span = start_ack_span( + subscription_id, + len(ack_reqs_dict), + project_id, + subscribe_links, + ) requests_completed, requests_to_retry = self._manager.send_unary_ack( ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)), ack_reqs_dict=ack_reqs_dict, ) + if ack_span: + ack_span.end() for completed_ack in requests_completed: if completed_ack.opentelemetry_data: diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index 869df65bf..5ec72b2df 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -17,6 +17,8 @@ import sys import threading +from opentelemetry import trace + from google.cloud.pubsub_v1.subscriber._protocol import dispatcher from google.cloud.pubsub_v1.subscriber._protocol import helper_threads from google.cloud.pubsub_v1.subscriber._protocol import requests @@ -410,8 +412,15 @@ def test_opentelemetry_ack(span_exporter): ) dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) - opentelemetry_data = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo")) - opentelemetry_data.start_subscribe_span( + data1 = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo")) + data1.start_subscribe_span( + subscription="projects/projectID/subscriptions/subscriptionID", + exactly_once_enabled=True, + ack_id="ack_id", + delivery_attempt=5, + ) + data2 = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo")) + data2.start_subscribe_span( subscription="projects/projectID/subscriptions/subscriptionID", exactly_once_enabled=True, ack_id="ack_id", @@ -424,22 +433,46 @@ def test_opentelemetry_ack(span_exporter): time_to_ack=20, ordering_key="", future=None, - opentelemetry_data=opentelemetry_data, - ) + opentelemetry_data=data1, + ), + requests.AckRequest( + ack_id="ack_id_string2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, + opentelemetry_data=data2, + ), ] manager.send_unary_ack.return_value = (items, []) - dispatcher_.ack(items) + mock_span_context = mock.Mock(spec=trace.SpanContext) + mock_span_context.trace_flags.sampled = False + with mock.patch.object( + data2._subscribe_span, "get_span_context", return_value=mock_span_context + ): + dispatcher_.ack(items) spans = span_exporter.get_finished_spans() - assert len(spans) == 1 - subscribe_span = spans[0] - - assert "messaging.gcp_pubsub.result" in subscribe_span.attributes - assert subscribe_span.attributes["messaging.gcp_pubsub.result"] == "acked" - assert len(subscribe_span.events) == 2 - assert subscribe_span.events[0].name == "ack start" - assert subscribe_span.events[1].name == "ack end" + assert len(spans) == 3 + ack_span = spans[0] + + for subscribe_span in spans[1:]: + assert subscribe_span.attributes["messaging.gcp_pubsub.result"] == "acked" + assert len(subscribe_span.events) == 2 + assert subscribe_span.events[0].name == "ack start" + assert subscribe_span.events[1].name == "ack end" + + assert ack_span.name == "subscriptionID ack" + assert ack_span.kind == trace.SpanKind.CLIENT + assert ack_span.parent is None + assert len(ack_span.links) == 1 + assert ack_span.attributes["messaging.system"] == "gcp_pubsub" + assert ack_span.attributes["messaging.batch.message_count"] == 2 + assert ack_span.attributes["messaging.operation"] == "ack" + assert ack_span.attributes["gcp.project_id"] == "projectID" + assert ack_span.attributes["messaging.destination.name"] == "subscriptionID" + assert ack_span.attributes["code.function"] == "ack" def test_ack():