diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index e41341afab3d..2b2574829306 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -27,6 +27,14 @@ _CALLBACK_WORKER_NAME = "Thread-CallbackRequestDispatcher" +_MAX_BATCH_SIZE = 100 +"""The maximum number of requests to process and dispatch at a time.""" + +_MAX_BATCH_LATENCY = 0.01 +"""The maximum amount of time in seconds to wait for additional request items +before processing the next batch of requests.""" + + class Dispatcher(object): def __init__(self, manager, queue): self._manager = manager @@ -42,12 +50,11 @@ def start(self): if self._thread is not None: raise ValueError("Dispatcher is already running.") - flow_control = self._manager.flow_control worker = helper_threads.QueueCallbackWorker( self._queue, self.dispatch_callback, - max_items=flow_control.max_request_batch_size, - max_latency=flow_control.max_request_batch_latency, + max_items=_MAX_BATCH_SIZE, + max_latency=_MAX_BATCH_LATENCY, ) # Create and start the helper thread. thread = threading.Thread(name=_CALLBACK_WORKER_NAME, target=worker) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 68ab452fc564..af6883fd067e 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -44,6 +44,11 @@ exceptions.GatewayTimeout, exceptions.Aborted, ) +_MAX_LOAD = 1.0 +"""The load threshold above which to pause the incoming message stream.""" + +_RESUME_THRESHOLD = 0.8 +"""The load threshold below which to resume the incoming message stream.""" def _maybe_wrap_exception(exception): @@ -223,7 +228,7 @@ def add_close_callback(self, callback): def maybe_pause_consumer(self): """Check the current load and pause the consumer if needed.""" with self._pause_resume_lock: - if self.load >= 1.0: + if self.load >= _MAX_LOAD: if self._consumer is not None and not self._consumer.is_paused: _LOGGER.debug( "Message backlog over load at %.2f, pausing.", self.load @@ -252,7 +257,7 @@ def maybe_resume_consumer(self): # currently on hold, if the current load allows for it. self._maybe_release_messages() - if self.load < self.flow_control.resume_threshold: + if self.load < _RESUME_THRESHOLD: _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) self._consumer.resume() else: @@ -271,7 +276,7 @@ def _maybe_release_messages(self): The method assumes the caller has acquired the ``_pause_resume_lock``. """ while True: - if self.load >= 1.0: + if self.load >= _MAX_LOAD: break # already overloaded try: @@ -518,12 +523,9 @@ def _on_response(self, response): for received_message in response.received_messages: message = google.cloud.pubsub_v1.subscriber.message.Message( - received_message.message, - received_message.ack_id, - self._scheduler.queue, - autolease=False, + received_message.message, received_message.ack_id, self._scheduler.queue ) - if self.load < 1.0: + if self.load < _MAX_LOAD: req = requests.LeaseRequest( ack_id=message.ack_id, byte_size=message.size ) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/message.py b/pubsub/google/cloud/pubsub_v1/subscriber/message.py index db8e650db06c..41bc42755ad7 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/message.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/message.py @@ -18,7 +18,6 @@ import json import math import time -import warnings from google.api_core import datetime_helpers from google.cloud.pubsub_v1.subscriber._protocol import requests @@ -71,7 +70,7 @@ class Message(object): published. """ - def __init__(self, message, ack_id, request_queue, autolease=True): + def __init__(self, message, ack_id, request_queue): """Construct the Message. .. note:: @@ -86,13 +85,6 @@ def __init__(self, message, ack_id, request_queue, autolease=True): request_queue (queue.Queue): A queue provided by the policy that can accept requests; the policy is responsible for handling those requests. - autolease (bool): An optional flag determining whether a new Message - instance should automatically lease itself upon creation. - Defaults to :data:`True`. - - .. note:: - .. deprecated:: 0.44.0 - Parameter will be removed in future versions. """ self._message = message self._ack_id = ack_id @@ -104,11 +96,6 @@ def __init__(self, message, ack_id, request_queue, autolease=True): # the default lease deadline. self._received_timestamp = time.time() - # The policy should lease this message, telling PubSub that it has - # it until it is acked or otherwise dropped. - if autolease: - self.lease() - def __repr__(self): # Get an abbreviated version of the data. abbv_data = self._message.data @@ -213,26 +200,6 @@ def drop(self): requests.DropRequest(ack_id=self._ack_id, byte_size=self.size) ) - def lease(self): - """Inform the policy to lease this message continually. - - .. note:: - By default this method is called by the constructor, and you should - never need to call it manually, unless the - :class:`~.pubsub_v1.subscriber.message.Message` instance was - created with ``autolease=False``. - - .. deprecated:: 0.44.0 - Will be removed in future versions. - """ - warnings.warn( - "lease() is deprecated since 0.44.0, and will be removed in future versions.", - category=DeprecationWarning, - ) - self._request_queue.put( - requests.LeaseRequest(ack_id=self._ack_id, byte_size=self.size) - ) - def modify_ack_deadline(self, seconds): """Resets the deadline for acknowledgement. diff --git a/pubsub/google/cloud/pubsub_v1/types.py b/pubsub/google/cloud/pubsub_v1/types.py index 9b0d3fef3f64..733d3bf97ac0 100644 --- a/pubsub/google/cloud/pubsub_v1/types.py +++ b/pubsub/google/cloud/pubsub_v1/types.py @@ -15,7 +15,6 @@ from __future__ import absolute_import import collections import sys -import textwrap from google.api import http_pb2 from google.iam.v1 import iam_policy_pb2 @@ -67,24 +66,11 @@ # these settings can be altered to tweak Pub/Sub behavior. # The defaults should be fine for most use cases. FlowControl = collections.namedtuple( - "FlowControl", - [ - "max_bytes", - "max_messages", - "resume_threshold", - "max_requests", - "max_request_batch_size", - "max_request_batch_latency", - "max_lease_duration", - ], + "FlowControl", ["max_bytes", "max_messages", "max_lease_duration"] ) FlowControl.__new__.__defaults__ = ( 100 * 1024 * 1024, # max_bytes: 100mb 100, # max_messages: 100 - 0.8, # resume_threshold: 80% - 100, # max_requests: 100 - 100, # max_request_batch_size: 100 - 0.01, # max_request_batch_latency: 0.01s 2 * 60 * 60, # max_lease_duration: 2 hours. ) @@ -101,42 +87,6 @@ "The maximum number of received - but not yet processed - messages before " "pausing the message stream." ) - FlowControl.resume_threshold.__doc__ = textwrap.dedent( - """ - The relative threshold of the ``max_bytes`` and ``max_messages`` limits - below which to resume the message stream. Must be a positive number not - greater than ``1.0``. - - .. note:: - .. deprecated:: 0.44.0 - Will be removed in future versions.""" - ) - FlowControl.max_requests.__doc__ = textwrap.dedent( - """ - Currently not in use. - - .. note:: - .. deprecated:: 0.44.0 - Will be removed in future versions.""" - ) - FlowControl.max_request_batch_size.__doc__ = textwrap.dedent( - """ - The maximum number of requests scheduled by callbacks to process and - dispatch at a time. - - .. note:: - .. deprecated:: 0.44.0 - Will be removed in future versions.""" - ) - FlowControl.max_request_batch_latency.__doc__ = textwrap.dedent( - """ - The maximum amount of time in seconds to wait for additional request - items before processing the next batch of requests. - - .. note:: - .. deprecated:: 0.44.0 - Will be removed in future versions.""" - ) FlowControl.max_lease_duration.__doc__ = ( "The maximum amount of time in seconds to hold a lease on a message " "before dropping it from the lease management." diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py index 0a7d7fb8c391..4bb3329a29f0 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py @@ -16,7 +16,6 @@ import time import mock -import pytest import pytz from six.moves import queue from google.protobuf import timestamp_pb2 @@ -34,28 +33,22 @@ PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000 -def create_message(data, ack_id="ACKID", autolease=True, **attrs): - with mock.patch.object(message.Message, "lease") as lease: - with mock.patch.object(time, "time") as time_: - time_.return_value = RECEIVED_SECONDS - msg = message.Message( - types.PubsubMessage( - attributes=attrs, - data=data, - message_id="message_id", - publish_time=timestamp_pb2.Timestamp( - seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000 - ), +def create_message(data, ack_id="ACKID", **attrs): + with mock.patch.object(time, "time") as time_: + time_.return_value = RECEIVED_SECONDS + msg = message.Message( + types.PubsubMessage( + attributes=attrs, + data=data, + message_id="message_id", + publish_time=timestamp_pb2.Timestamp( + seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000 ), - ack_id, - queue.Queue(), - autolease=autolease, - ) - if autolease: - lease.assert_called_once_with() - else: - lease.assert_not_called() - return msg + ), + ack_id, + queue.Queue(), + ) + return msg def test_attributes(): @@ -84,11 +77,6 @@ def test_publish_time(): assert msg.publish_time == PUBLISHED -def test_disable_autolease_on_creation(): - # the create_message() helper does the actual assertion - create_message(b"foo", autolease=False) - - def check_call_types(mock, *args, **kwargs): """Checks a mock's call types. @@ -134,18 +122,6 @@ def test_drop(): check_call_types(put, requests.DropRequest) -def test_lease(): - msg = create_message(b"foo", ack_id="bogus_ack_id") - - pytest_warns = pytest.warns(DeprecationWarning) - with pytest_warns, mock.patch.object(msg._request_queue, "put") as put: - msg.lease() - put.assert_called_once_with( - requests.LeaseRequest(ack_id="bogus_ack_id", byte_size=30) - ) - check_call_types(put, requests.LeaseRequest) - - def test_modify_ack_deadline(): msg = create_message(b"foo", ack_id="bogus_ack_id") with mock.patch.object(msg._request_queue, "put") as put: