Skip to content

Commit

Permalink
End process span and add event when messages are dropped
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Sep 20, 2024
1 parent 4189b22 commit d59a288
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 0 deletions.
9 changes: 9 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ def maintain_leases(self) -> None:
"Dropping %s items because they were leased too long.", len(to_drop)
)
assert self._manager.dispatcher is not None
for drop_msg in to_drop:
if drop_msg.opentelemetry_data:
drop_msg.opentelemetry_data.add_process_span_event("dropped")
drop_msg.opentelemetry_data.end_process_span()
self._manager.dispatcher.drop(to_drop)

# Remove dropped items from our copy of the leased messages (they
Expand Down Expand Up @@ -223,6 +227,11 @@ def maintain_leases(self) -> None:
# If exactly once delivery is enabled, we should drop all expired ack_ids from lease management.
if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids):
assert self._manager.dispatcher is not None
for ack_id in expired_ack_ids:
msg = leased_messages.get(ack_id)
if msg.opentelemetry_data:
msg.opentelemetry_data.add_process_span_event("dropped")
msg.opentelemetry_data.end_process_span()
self._manager.dispatcher.drop(
[
requests.DropRequest(
Expand Down
55 changes: 55 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
from google.cloud.pubsub_v1.subscriber._protocol import leaser
from google.cloud.pubsub_v1.subscriber._protocol import requests
from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager
from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
SubscribeOpenTelemetry,
)
from google.cloud.pubsub_v1.subscriber import message

# special case python < 3.8
if sys.version_info.major == 3 and sys.version_info.minor < 8:
Expand Down Expand Up @@ -136,6 +140,57 @@ def trigger_done(timeout):
leaser._stop_event.wait = trigger_done


def test_opentelemetry_dropped_message_process_span(span_exporter):
manager = create_manager()
leaser_ = leaser.Leaser(manager)
make_sleep_mark_event_as_done(leaser_)
msg = mock.create_autospec(
message.Message, instance=True, ack_id="ack_foo", size=10
)
msg.message_id = 3
opentelemetry_data = SubscribeOpenTelemetry(msg)
opentelemetry_data.start_subscribe_span(
subscription="projects/projectId/subscriptions/subscriptionID",
exactly_once_enabled=False,
ack_id="ack_id",
delivery_attempt=4,
)
opentelemetry_data.start_process_span()
leaser_.add(
[
requests.LeaseRequest(
ack_id="my ack id",
byte_size=50,
ordering_key="",
opentelemetry_data=opentelemetry_data,
)
]
)
leased_messages_dict = leaser_._leased_messages

# Setting the `sent_time`` to be less than `cutoff` in order to make the leased message expire.
# This will exercise the code path where the message would be dropped from the leaser
leased_messages_dict["my ack id"] = leased_messages_dict["my ack id"]._replace(
sent_time=0
)

manager._send_lease_modacks.return_value = set()
leaser_.maintain_leases()

opentelemetry_data.end_subscribe_span()
spans = span_exporter.get_finished_spans()
assert len(spans) == 2
process_span, subscribe_span = spans

assert process_span.name == "subscriptionID process"
assert subscribe_span.name == "subscriptionID subscribe"

assert len(process_span.events) == 1
assert process_span.events[0].name == "dropped"

assert process_span.parent == subscribe_span.context


def test_maintain_leases_ack_ids():
manager = create_manager()
leaser_ = leaser.Leaser(manager)
Expand Down

0 comments on commit d59a288

Please sign in to comment.