Skip to content

Commit

Permalink
Use histogram to set default stream ACK deadline
Browse files Browse the repository at this point in the history
With all the messages lease-managed (even those on hold), there is no
need to have a fixed default value.
  • Loading branch information
plamut committed Nov 11, 2019
1 parent a0cf284 commit 79adfa4
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@
_RESUME_THRESHOLD = 0.8
"""The load threshold below which to resume the incoming message stream."""

_DEFAULT_STREAM_ACK_DEADLINE = 60
"""The default message acknowledge deadline in seconds for incoming message stream.
This default deadline is dynamically modified for the messages that are added
to the lease management.
"""


def _maybe_wrap_exception(exception):
"""Wraps a gRPC exception class, if needed."""
Expand Down Expand Up @@ -412,17 +405,7 @@ def open(self, callback, on_callback_error):
)

# Create the RPC

# We must use a fixed value for the ACK deadline, as we cannot read it
# from the subscription. The latter would require `pubsub.subscriptions.get`
# permission, which is not granted to the default subscriber role
# `roles/pubsub.subscriber`.
# See also https://github.com/googleapis/google-cloud-python/issues/9339
#
# When dynamic lease management is enabled for the "on hold" messages,
# the default stream ACK deadline should again be set based on the
# historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`.
stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE
stream_ack_deadline_seconds = self.ack_histogram.percentile(99)

get_initial_request = functools.partial(
self._get_initial_request, stream_ack_deadline_seconds
Expand Down
16 changes: 6 additions & 10 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,6 @@ class CallbackError(Exception):
with pytest.raises(CallbackError):
future.result(timeout=30)

@pytest.mark.xfail(
reason="The default stream ACK deadline is static and received messages "
"exceeding FlowControl.max_messages are currently not lease managed."
)
def test_streaming_pull_ack_deadline(
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
):
Expand All @@ -400,29 +396,29 @@ def test_streaming_pull_ack_deadline(
# Subscribe to the topic. This must happen before the messages
# are published.
subscriber.create_subscription(
subscription_path, topic_path, ack_deadline_seconds=240
subscription_path, topic_path, ack_deadline_seconds=45
)

# publish some messages and wait for completion
self._publish_messages(publisher, topic_path, batch_sizes=[2])

# subscribe to the topic
callback = StreamingPullCallback(
processing_time=70, # more than the default stream ACK deadline (60s)
processing_time=13, # more than the default stream ACK deadline (10s)
resolve_at_msg_count=3, # one more than the published messages count
)
flow_control = types.FlowControl(max_messages=1)
subscription_future = subscriber.subscribe(
subscription_path, callback, flow_control=flow_control
)

# We expect to process the first two messages in 2 * 70 seconds, and
# We expect to process the first two messages in 2 * 13 seconds, and
# any duplicate message that is re-sent by the backend in additional
# 70 seconds, totalling 210 seconds (+ overhead) --> if there have been
# no duplicates in 240 seconds, we can reasonably assume that there
# 13 seconds, totalling 39 seconds (+ overhead) --> if there have been
# no duplicates in 60 seconds, we can reasonably assume that there
# won't be any.
try:
callback.done_future.result(timeout=240)
callback.done_future.result(timeout=60)
except exceptions.TimeoutError:
# future timed out, because we received no excessive messages
assert sorted(callback.seen_message_ids) == [1, 2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,6 @@ def test_heartbeat_inactive():
"google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True
)
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE

manager = make_manager()

manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)
Expand Down Expand Up @@ -460,7 +458,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
)
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
assert initial_request_arg.func == manager._get_initial_request
assert initial_request_arg.args[0] == stream_ack_deadline
assert initial_request_arg.args[0] == 10 # the default stream ACK timeout
assert not manager._client.api.get_subscription.called

resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
Expand Down

0 comments on commit 79adfa4

Please sign in to comment.