Skip to content

Commit

Permalink
feat(pubsub): add delivery attempt property to message object receive…
Browse files Browse the repository at this point in the history
…d by user code (#10205)

- Return None when a DeadLetterPolicy hasn't been set on the subscription.
  • Loading branch information
pradn authored Jan 30, 2020
1 parent ad93c6c commit c3b9327
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,10 @@ def _on_response(self, response):

for received_message in response.received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message, received_message.ack_id, self._scheduler.queue
received_message.message,
received_message.ack_id,
received_message.delivery_attempt,
self._scheduler.queue,
)
# Making a decision based on the load, and modifying the data that
# affects the load -> needs a lock, as that state can be modified
Expand Down
30 changes: 29 additions & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Message(object):
published.
"""

def __init__(self, message, ack_id, request_queue):
def __init__(self, message, ack_id, delivery_attempt, request_queue):
"""Construct the Message.
.. note::
Expand All @@ -82,12 +82,16 @@ def __init__(self, message, ack_id, request_queue):
message (~.pubsub_v1.types.PubsubMessage): The message received
from Pub/Sub.
ack_id (str): The ack_id received from Pub/Sub.
delivery_attempt (int): The delivery attempt counter received
from Pub/Sub if a DeadLetterPolicy is set on the subscription,
and zero otherwise.
request_queue (queue.Queue): A queue provided by the policy that
can accept requests; the policy is responsible for handling
those requests.
"""
self._message = message
self._ack_id = ack_id
self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None
self._request_queue = request_queue
self.message_id = message.message_id

Expand Down Expand Up @@ -162,6 +166,30 @@ def ack_id(self):
"""str: the ID used to ack the message."""
return self._ack_id

@property
def delivery_attempt(self):
"""The delivery attempt counter is 1 + (the sum of number of NACKs
and number of ack_deadline exceeds) for this message. It is set to None
if a DeadLetterPolicy is not set on the subscription.
A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline
exceeds event is whenever a message is not acknowledged within
ack_deadline. Note that ack_deadline is initially
Subscription.ackDeadlineSeconds, but may get extended automatically by
the client library.
The first delivery of a given message will have this value as 1. The value
is calculated at best effort and is approximate.
EXPERIMENTAL: This feature is part of a closed alpha release. This
API might be changed in backward-incompatible ways and is not recommended
for production use. It is not subject to any SLA or deprecation policy.
Returns:
Optional[int]: The delivery attempt counter or None.
"""
return self._delivery_attempt

def ack(self):
"""Acknowledge the given message.
Expand Down
20 changes: 16 additions & 4 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@
PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000


def create_message(data, ack_id="ACKID", **attrs):
def create_message(data, ack_id="ACKID", delivery_attempt=0, **attrs):
with mock.patch.object(time, "time") as time_:
time_.return_value = RECEIVED_SECONDS
msg = message.Message(
types.PubsubMessage(
message=types.PubsubMessage(
attributes=attrs,
data=data,
message_id="message_id",
publish_time=timestamp_pb2.Timestamp(
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
),
),
ack_id,
queue.Queue(),
ack_id=ack_id,
delivery_attempt=delivery_attempt,
request_queue=queue.Queue(),
)
return msg

Expand All @@ -72,6 +73,17 @@ def test_ack_id():
assert msg.ack_id == ack_id


def test_delivery_attempt():
delivery_attempt = 10
msg = create_message(b"foo", delivery_attempt=delivery_attempt)
assert msg.delivery_attempt == delivery_attempt


def test_delivery_attempt_is_none():
msg = create_message(b"foo", delivery_attempt=0)
assert msg.delivery_attempt is None


def test_publish_time():
msg = create_message(b"foo")
assert msg.publish_time == PUBLISHED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,37 @@ def test__get_initial_request_wo_leaser():
assert initial_request.modify_deadline_seconds == []


def test__on_response_delivery_attempt():
manager, _, dispatcher, leaser, _, scheduler = make_running_manager()
manager._callback = mock.sentinel.callback

# Set up the messages.
response = types.StreamingPullResponse(
received_messages=[
types.ReceivedMessage(
ack_id="fack", message=types.PubsubMessage(data=b"foo", message_id="1")
),
types.ReceivedMessage(
ack_id="back",
message=types.PubsubMessage(data=b"bar", message_id="2"),
delivery_attempt=6,
),
]
)

# adjust message bookkeeping in leaser
fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42)

manager._on_response(response)

schedule_calls = scheduler.schedule.mock_calls
assert len(schedule_calls) == 2
msg1 = schedule_calls[0][1][1]
assert msg1.delivery_attempt is None
msg2 = schedule_calls[1][1][1]
assert msg2.delivery_attempt == 6


def test__on_response_no_leaser_overload():
manager, _, dispatcher, leaser, _, scheduler = make_running_manager()
manager._callback = mock.sentinel.callback
Expand Down

0 comments on commit c3b9327

Please sign in to comment.