From c08287b57999424d7ea1758ec1cc9efbf1faa7ba Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Sun, 22 Sep 2024 21:45:07 +0000 Subject: [PATCH] Add modack span --- .../open_telemetry/subscribe_opentelemetry.py | 48 ++++++++++++++++++- .../pubsub_v1/subscriber/_protocol/leaser.py | 38 ++++++++++++++- .../unit/pubsub_v1/subscriber/test_leaser.py | 24 ++++++++-- 3 files changed, 104 insertions(+), 6 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index 64d915468..bf97e71ea 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional +from typing import Optional, List from datetime import datetime from opentelemetry import trace, context @@ -56,10 +56,22 @@ def __init__(self, message: PubsubMessage): # proces span to add links to the publisher create span. self._publisher_create_span_context: Optional[context.Context] = None + # This will be set by `start_subscribe_span` method and will be used + # for other spans, such as modack span. + self._project_id: Optional[str] = None + @property - def subscription_id(self): + def subscription_id(self) -> Optional[str]: return self._subscription_id + @property + def project_id(self) -> Optional[str]: + return self._project_id + + @property + def subscribe_span(self) -> Optional[trace.Span]: + return self._subscribe_span + def start_subscribe_span( self, subscription: str, @@ -75,6 +87,7 @@ def start_subscribe_span( self._publisher_create_span_context = parent_span_context assert len(subscription.split("/")) == 4 subscription_short_name = subscription.split("/")[3] + self._project_id = subscription.split("/")[1] self._subscription_id = subscription_short_name with tracer.start_as_current_span( name=f"{subscription_short_name} subscribe", @@ -185,3 +198,34 @@ def add_process_span_event(self, event: str) -> None: "timestamp": str(datetime.now()), }, ) + + +def start_modack_span( + subscribe_span_links: List[trace.Link], + subscription_id: Optional[str], + message_count: int, + deadline: float, + project_id: Optional[str], + code_function: str, +) -> trace.Span: + _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" + _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" + assert subscription_id is not None + assert project_id is not None + tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) + with tracer.start_as_current_span( + name=f"{subscription_id} modack", + attributes={ + "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.batch.message_count": message_count, + "messaging.gcp_pubsub.message.ack_deadline": deadline, + "messaging.destination.name": subscription_id, + "gcp.project_id": project_id, + "messaging.operation.name": "modack", + "code.function": code_function, + }, + links=subscribe_span_links, + kind=trace.SpanKind.CLIENT, + end_on_exit=False, + ) as modack_span: + return modack_span diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 5abdb7081..e54b14bea 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -20,12 +20,17 @@ import threading import time import typing -from typing import Dict, Iterable, Optional, Union +from typing import Dict, Iterable, Optional, Union, List + +from opentelemetry import trace from google.cloud.pubsub_v1.subscriber._protocol.dispatcher import _MAX_BATCH_LATENCY from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( SubscribeOpenTelemetry, ) +from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import ( + start_modack_span, +) try: from collections.abc import KeysView @@ -219,12 +224,43 @@ def maintain_leases(self) -> None: for message in list(leased_messages.values()) if message.opentelemetry_data ] + + subscribe_span_links: List[trace.Link] = [] + subscription_id: Optional[str] = None + project_id: Optional[str] = None + if len(opentelemetry_data) > 0: + subscription_id = opentelemetry_data[0].subscription_id + project_id = opentelemetry_data[0].project_id + for data in opentelemetry_data: + subscribe_span: Optional[trace.Span] = data.subscribe_span + if ( + subscribe_span + and subscribe_span.get_span_context().trace_flags.sampled + ): + subscribe_span_links.append( + trace.Link(subscribe_span.get_span_context()) + ) + + modack_span: Optional[trace.Span] = None + if len(opentelemetry_data) > 0: + modack_span = start_modack_span( + subscribe_span_links, + subscription_id, + len(opentelemetry_data), + deadline, + project_id, + "maintain_leases", + ) + expired_ack_ids = self._manager._send_lease_modacks( ack_id_gen, deadline, opentelemetry_data, ) + if modack_span: + modack_span.end() + start_time = time.time() # If exactly once delivery is enabled, we should drop all expired ack_ids from lease management. if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids): diff --git a/tests/unit/pubsub_v1/subscriber/test_leaser.py b/tests/unit/pubsub_v1/subscriber/test_leaser.py index b5b5cac20..21dfab12b 100644 --- a/tests/unit/pubsub_v1/subscriber/test_leaser.py +++ b/tests/unit/pubsub_v1/subscriber/test_leaser.py @@ -15,6 +15,9 @@ import logging import sys import threading +import math + +from opentelemetry import trace from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.subscriber._protocol import dispatcher @@ -201,7 +204,7 @@ def test_opentelemetry_expired_message_exactly_once_process_span(span_exporter): msg.message_id = 3 opentelemetry_data = SubscribeOpenTelemetry(msg) opentelemetry_data.start_subscribe_span( - subscription="projects/projectId/subscriptions/subscriptionID", + subscription="projects/projectID/subscriptions/subscriptionID", exactly_once_enabled=True, ack_id="ack_id", delivery_attempt=4, @@ -223,8 +226,8 @@ def test_opentelemetry_expired_message_exactly_once_process_span(span_exporter): opentelemetry_data.end_subscribe_span() spans = span_exporter.get_finished_spans() - assert len(spans) == 2 - process_span, subscribe_span = spans + assert len(spans) == 3 + modack_span, process_span, subscribe_span = spans assert process_span.name == "subscriptionID process" assert subscribe_span.name == "subscriptionID subscribe" @@ -234,6 +237,21 @@ def test_opentelemetry_expired_message_exactly_once_process_span(span_exporter): assert process_span.parent == subscribe_span.context + assert modack_span.name == "subscriptionID modack" + assert modack_span.parent is None + assert modack_span.kind == trace.SpanKind.CLIENT + assert len(modack_span.links) == 1 + modack_span_attributes = modack_span.attributes + assert modack_span_attributes["messaging.system"] == "gcp_pubsub" + assert modack_span_attributes["messaging.batch.message_count"] == 1 + assert math.isclose( + modack_span_attributes["messaging.gcp_pubsub.message.ack_deadline"], 10 + ) + assert modack_span_attributes["messaging.destination.name"] == "subscriptionID" + assert modack_span_attributes["gcp.project_id"] == "projectID" + assert modack_span_attributes["messaging.operation.name"] == "modack" + assert modack_span_attributes["code.function"] == "maintain_leases" + def test_maintain_leases_ack_ids(): manager = create_manager()