Skip to content

Commit

Permalink
Move adding dropped event and ending subscriber span from
Browse files Browse the repository at this point in the history
dispatcher.drop to leaser.maintain_leases

* dispatcher.drop() is also called by ack, _retry_ack, nack and
  maintain_leases(when it wants to drop expired messages.
* We only want to record the dropped event when the message is expired
  and hence dropped by the leaser
  • Loading branch information
mukund-ananthu committed Sep 22, 2024
1 parent 3ae2418 commit d1c870d
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
4 changes: 0 additions & 4 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,6 @@ def drop(
Args:
items: The items to drop.
"""
for item in items:
if item.opentelemetry_data:
item.opentelemetry_data.set_subscribe_span_result("dropped")
item.opentelemetry_data.end_subscribe_span()
assert self._manager.leaser is not None
self._manager.leaser.remove(items)
ordering_keys = (k.ordering_key for k in items if k.ordering_key)
Expand Down
20 changes: 13 additions & 7 deletions google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ def maintain_leases(self) -> None:
# and allow the Pub/Sub server to resend them.
cutoff = time.time() - self._manager.flow_control.max_lease_duration
to_drop = [
requests.DropRequest(
ack_id, item.size, item.ordering_key, item.opentelemetry_data
)
requests.DropRequest(ack_id, item.size, item.ordering_key)
for ack_id, item in leased_messages.items()
if item.sent_time < cutoff
]
Expand All @@ -183,9 +181,16 @@ def maintain_leases(self) -> None:
)
assert self._manager.dispatcher is not None
for drop_msg in to_drop:
if drop_msg.opentelemetry_data:
drop_msg.opentelemetry_data.add_process_span_event("dropped")
drop_msg.opentelemetry_data.end_process_span()
leased_message = leased_messages.get(drop_msg.ack_id)
if leased_message and leased_message.opentelemetry_data:
leased_message.opentelemetry_data.add_process_span_event(
"dropped"
)
leased_message.opentelemetry_data.end_process_span()
leased_message.opentelemetry_data.set_subscribe_span_result(
"dropped"
)
leased_message.opentelemetry_data.end_subscribe_span()
self._manager.dispatcher.drop(to_drop)

# Remove dropped items from our copy of the leased messages (they
Expand Down Expand Up @@ -229,13 +234,14 @@ def maintain_leases(self) -> None:
if msg and msg.opentelemetry_data:
msg.opentelemetry_data.add_process_span_event("dropped")
msg.opentelemetry_data.end_process_span()
msg.opentelemetry_data.set_subscribe_span_result("dropped")
msg.opentelemetry_data.end_subscribe_span()
self._manager.dispatcher.drop(
[
requests.DropRequest(
ack_id,
leased_messages.get(ack_id).size, # type: ignore
leased_messages.get(ack_id).ordering_key, # type: ignore
leased_messages.get(ack_id).opentelemetry_data, # type: ignore
)
for ack_id in expired_ack_ids
if ack_id in leased_messages
Expand Down
1 change: 0 additions & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class DropRequest(NamedTuple):
ack_id: str
byte_size: int
ordering_key: Optional[str]
opentelemetry_data: Optional[SubscribeOpenTelemetry] = None


class LeaseRequest(NamedTuple):
Expand Down

0 comments on commit d1c870d

Please sign in to comment.