Skip to content

Commit

Permalink
Add drop event to subscribe span
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Sep 19, 2024
1 parent 1b2020d commit a8e9d4f
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 1 deletion.
4 changes: 4 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ def drop(
Args:
items: The items to drop.
"""
for item in items:
if item.opentelemetry_data:
item.opentelemetry_data.set_subscribe_span_result("dropped")
item.opentelemetry_data.end_subscribe_span()
assert self._manager.leaser is not None
self._manager.leaser.remove(items)
ordering_keys = (k.ordering_key for k in items if k.ordering_key)
Expand Down
5 changes: 4 additions & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ def maintain_leases(self) -> None:
# and allow the Pub/Sub server to resend them.
cutoff = time.time() - self._manager.flow_control.max_lease_duration
to_drop = [
requests.DropRequest(ack_id, item.size, item.ordering_key)
requests.DropRequest(
ack_id, item.size, item.ordering_key, item.opentelemetry_data
)
for ack_id, item in leased_messages.items()
if item.sent_time < cutoff
]
Expand Down Expand Up @@ -223,6 +225,7 @@ def maintain_leases(self) -> None:
ack_id,
leased_messages.get(ack_id).size, # type: ignore
leased_messages.get(ack_id).ordering_key, # type: ignore
leased_messages.get(ack_id).opentelemetry_data, # type: ignore
)
for ack_id in expired_ack_ids
if ack_id in leased_messages
Expand Down
1 change: 1 addition & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class DropRequest(NamedTuple):
ack_id: str
byte_size: int
ordering_key: Optional[str]
opentelemetry_data: Optional[SubscribeOpenTelemetry] = None


class LeaseRequest(NamedTuple):
Expand Down

0 comments on commit a8e9d4f

Please sign in to comment.