Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): set default stream ACK deadline to subscriptions' #9268

Merged
merged 2 commits into from
Sep 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def load(self):
float: The load value.
"""
if self._leaser is None:
return 0
return 0.0

return max(
[
Expand Down Expand Up @@ -384,14 +384,26 @@ def open(self, callback, on_callback_error):
)

# Create the RPC
subscription = self._client.api.get_subscription(self._subscription)
stream_ack_deadline_seconds = subscription.ack_deadline_seconds

get_initial_request = functools.partial(
self._get_initial_request, stream_ack_deadline_seconds
)
self._rpc = bidi.ResumableBidiRpc(
start_rpc=self._client.api.streaming_pull,
initial_request=self._get_initial_request,
initial_request=get_initial_request,
should_recover=self._should_recover,
throttle_reopen=True,
)
self._rpc.add_done_callback(self._on_rpc_done)

_LOGGER.debug(
"Creating a stream, default ACK deadline set to {} seconds.".format(
stream_ack_deadline_seconds
)
)

# Create references to threads
self._dispatcher = dispatcher.Dispatcher(self, self._scheduler.queue)
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
Expand Down Expand Up @@ -462,12 +474,16 @@ def close(self, reason=None):
for callback in self._close_callbacks:
callback(self, reason)

def _get_initial_request(self):
def _get_initial_request(self, stream_ack_deadline_seconds):
"""Return the initial request for the RPC.

This defines the initial request that must always be sent to Pub/Sub
immediately upon opening the subscription.

Args:
stream_ack_deadline_seconds (int):
The default message acknowledge deadline for the stream.

Returns:
google.cloud.pubsub_v1.types.StreamingPullRequest: A request
suitable for being the first request on the stream (and not
Expand All @@ -486,7 +502,7 @@ def _get_initial_request(self):
request = types.StreamingPullRequest(
modify_deadline_ack_ids=list(lease_ids),
modify_deadline_seconds=[self.ack_deadline] * len(lease_ids),
stream_ack_deadline_seconds=self.ack_histogram.percentile(99),
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
subscription=self._subscription,
)

Expand All @@ -511,14 +527,6 @@ def _on_response(self, response):
self._messages_on_hold.qsize(),
)

# Immediately modack the messages we received, as this tells the server
# that we've received them.
items = [
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
for message in response.received_messages
]
self._dispatcher.modify_ack_deadline(items)

invoke_callbacks_for = []

for received_message in response.received_messages:
Expand All @@ -535,6 +543,15 @@ def _on_response(self, response):
else:
self._messages_on_hold.put(message)

# Immediately (i.e. without waiting for the auto lease management)
# modack the messages we received and not put on hold, as this tells
# the server that we've received them.
items = [
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
for message in invoke_callbacks_for
]
self._dispatcher.modify_ack_deadline(items)

_LOGGER.debug(
"Scheduling callbacks for %s new messages, new total on hold %s.",
len(invoke_callbacks_for),
Expand Down
49 changes: 49 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,55 @@ class CallbackError(Exception):
with pytest.raises(CallbackError):
future.result(timeout=30)

def test_streaming_pull_ack_deadline(
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, topic_path))
cleanup.append((subscriber.delete_subscription, subscription_path))

# Create a topic and a subscription, then subscribe to the topic. This
# must happen before the messages are published.
publisher.create_topic(topic_path)

# Subscribe to the topic. This must happen before the messages
# are published.
subscriber.create_subscription(
subscription_path, topic_path, ack_deadline_seconds=60
)

# publish some messages and wait for completion
self._publish_messages(publisher, topic_path, batch_sizes=[2])

# subscribe to the topic
callback = StreamingPullCallback(
processing_time=15, # more than the default ACK deadline of 10 seconds
resolve_at_msg_count=3, # one more than the published messages count
)
flow_control = types.FlowControl(max_messages=1)
subscription_future = subscriber.subscribe(
subscription_path, callback, flow_control=flow_control
)

# We expect to process the first two messages in 2 * 15 seconds, and
# any duplicate message that is re-sent by the backend in additional
# 15 seconds, totalling 45 seconds (+ overhead) --> if there have been
# no duplicates in 60 seconds, we can reasonably assume that there
# won't be any.
try:
callback.done_future.result(timeout=60)
except exceptions.TimeoutError:
# future timed out, because we received no excessive messages
assert sorted(callback.seen_message_ids) == [1, 2]
else:
pytest.fail(
"Expected to receive 2 messages, but got at least {}.".format(
len(callback.seen_message_ids)
)
)
finally:
subscription_future.cancel()

def test_streaming_pull_max_messages(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ def test_heartbeat_inactive():
)
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
manager = make_manager()
manager._client.api.get_subscription.return_value = types.Subscription(
name="projects/foo/subscriptions/bar",
topic="projects/foo/topics/baz",
ack_deadline_seconds=123,
)

manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)

Expand All @@ -426,10 +431,14 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi

resumable_bidi_rpc.assert_called_once_with(
start_rpc=manager._client.api.streaming_pull,
initial_request=manager._get_initial_request,
initial_request=mock.ANY,
should_recover=manager._should_recover,
throttle_reopen=True,
)
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
assert initial_request_arg.func == manager._get_initial_request
assert initial_request_arg.args[0] == 123

resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
manager._on_rpc_done
)
Expand Down Expand Up @@ -574,11 +583,11 @@ def test__get_initial_request():
manager._leaser = mock.create_autospec(leaser.Leaser, instance=True)
manager._leaser.ack_ids = ["1", "2"]

initial_request = manager._get_initial_request()
initial_request = manager._get_initial_request(123)

assert isinstance(initial_request, types.StreamingPullRequest)
assert initial_request.subscription == "subscription-name"
assert initial_request.stream_ack_deadline_seconds == 10
assert initial_request.stream_ack_deadline_seconds == 123
assert initial_request.modify_deadline_ack_ids == ["1", "2"]
assert initial_request.modify_deadline_seconds == [10, 10]

Expand All @@ -587,11 +596,11 @@ def test__get_initial_request_wo_leaser():
manager = make_manager()
manager._leaser = None

initial_request = manager._get_initial_request()
initial_request = manager._get_initial_request(123)

assert isinstance(initial_request, types.StreamingPullRequest)
assert initial_request.subscription == "subscription-name"
assert initial_request.stream_ack_deadline_seconds == 10
assert initial_request.stream_ack_deadline_seconds == 123
assert initial_request.modify_deadline_ack_ids == []
assert initial_request.modify_deadline_seconds == []

Expand Down Expand Up @@ -660,12 +669,10 @@ def test__on_response_with_leaser_overload():
# are called in the expected way.
manager._on_response(response)

# only the messages that are added to the lease management and dispatched to
# callbacks should have their ACK deadline extended
dispatcher.modify_ack_deadline.assert_called_once_with(
[
requests.ModAckRequest("fack", 10),
requests.ModAckRequest("back", 10),
requests.ModAckRequest("zack", 10),
]
[requests.ModAckRequest("fack", 10)]
)

# one message should be scheduled, the leaser capacity allows for it
Expand Down