diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index bc3059cb69af..f3798c05610e 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -51,13 +51,6 @@ _RESUME_THRESHOLD = 0.8 """The load threshold below which to resume the incoming message stream.""" -_DEFAULT_STREAM_ACK_DEADLINE = 60 -"""The default message acknowledge deadline in seconds for incoming message stream. - -This default deadline is dynamically modified for the messages that are added -to the lease management. -""" - def _maybe_wrap_exception(exception): """Wraps a gRPC exception class, if needed.""" @@ -412,17 +405,7 @@ def open(self, callback, on_callback_error): ) # Create the RPC - - # We must use a fixed value for the ACK deadline, as we cannot read it - # from the subscription. The latter would require `pubsub.subscriptions.get` - # permission, which is not granted to the default subscriber role - # `roles/pubsub.subscriber`. - # See also https://github.com/googleapis/google-cloud-python/issues/9339 - # - # When dynamic lease management is enabled for the "on hold" messages, - # the default stream ACK deadline should again be set based on the - # historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`. - stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE + stream_ack_deadline_seconds = self.ack_histogram.percentile(99) get_initial_request = functools.partial( self._get_initial_request, stream_ack_deadline_seconds diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index fd7473e1e53b..59e5e3fe83a4 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -382,10 +382,6 @@ class CallbackError(Exception): with pytest.raises(CallbackError): future.result(timeout=30) - @pytest.mark.xfail( - reason="The default stream ACK deadline is static and received messages " - "exceeding FlowControl.max_messages are currently not lease managed." - ) def test_streaming_pull_ack_deadline( self, publisher, subscriber, project, topic_path, subscription_path, cleanup ): @@ -400,7 +396,7 @@ def test_streaming_pull_ack_deadline( # Subscribe to the topic. This must happen before the messages # are published. subscriber.create_subscription( - subscription_path, topic_path, ack_deadline_seconds=240 + subscription_path, topic_path, ack_deadline_seconds=45 ) # publish some messages and wait for completion @@ -408,7 +404,7 @@ def test_streaming_pull_ack_deadline( # subscribe to the topic callback = StreamingPullCallback( - processing_time=70, # more than the default stream ACK deadline (60s) + processing_time=13, # more than the default stream ACK deadline (10s) resolve_at_msg_count=3, # one more than the published messages count ) flow_control = types.FlowControl(max_messages=1) @@ -416,13 +412,13 @@ def test_streaming_pull_ack_deadline( subscription_path, callback, flow_control=flow_control ) - # We expect to process the first two messages in 2 * 70 seconds, and + # We expect to process the first two messages in 2 * 13 seconds, and # any duplicate message that is re-sent by the backend in additional - # 70 seconds, totalling 210 seconds (+ overhead) --> if there have been - # no duplicates in 240 seconds, we can reasonably assume that there + # 13 seconds, totalling 39 seconds (+ overhead) --> if there have been + # no duplicates in 60 seconds, we can reasonably assume that there # won't be any. try: - callback.done_future.result(timeout=240) + callback.done_future.result(timeout=60) except exceptions.TimeoutError: # future timed out, because we received no excessive messages assert sorted(callback.seen_message_ids) == [1, 2] diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 761ac073083c..1732ec6cd4b3 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -429,8 +429,6 @@ def test_heartbeat_inactive(): "google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True ) def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc): - stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE - manager = make_manager() manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error) @@ -460,7 +458,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi ) initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"] assert initial_request_arg.func == manager._get_initial_request - assert initial_request_arg.args[0] == stream_ack_deadline + assert initial_request_arg.args[0] == 10 # the default stream ACK timeout assert not manager._client.api.get_subscription.called resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(