From a8e9d4f99769e8826bf000d07fec01564eefa497 Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Thu, 19 Sep 2024 22:37:34 +0000 Subject: [PATCH] Add drop event to subscribe span --- google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py | 4 ++++ google/cloud/pubsub_v1/subscriber/_protocol/leaser.py | 5 ++++- google/cloud/pubsub_v1/subscriber/_protocol/requests.py | 1 + 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 04bfda5cf..ddd0df09c 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -316,6 +316,10 @@ def drop( Args: items: The items to drop. """ + for item in items: + if item.opentelemetry_data: + item.opentelemetry_data.set_subscribe_span_result("dropped") + item.opentelemetry_data.end_subscribe_span() assert self._manager.leaser is not None self._manager.leaser.remove(items) ordering_keys = (k.ordering_key for k in items if k.ordering_key) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 32f37fea0..d9b8319cd 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -170,7 +170,9 @@ def maintain_leases(self) -> None: # and allow the Pub/Sub server to resend them. cutoff = time.time() - self._manager.flow_control.max_lease_duration to_drop = [ - requests.DropRequest(ack_id, item.size, item.ordering_key) + requests.DropRequest( + ack_id, item.size, item.ordering_key, item.opentelemetry_data + ) for ack_id, item in leased_messages.items() if item.sent_time < cutoff ] @@ -223,6 +225,7 @@ def maintain_leases(self) -> None: ack_id, leased_messages.get(ack_id).size, # type: ignore leased_messages.get(ack_id).ordering_key, # type: ignore + leased_messages.get(ack_id).opentelemetry_data, # type: ignore ) for ack_id in expired_ack_ids if ack_id in leased_messages diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/requests.py b/google/cloud/pubsub_v1/subscriber/_protocol/requests.py index 6fd35896b..7736e4456 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/requests.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/requests.py @@ -38,6 +38,7 @@ class DropRequest(NamedTuple): ack_id: str byte_size: int ordering_key: Optional[str] + opentelemetry_data: Optional[SubscribeOpenTelemetry] = None class LeaseRequest(NamedTuple):