diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index d4194a56897f0..bfadeb0b2e709 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -27,6 +27,14 @@ _CALLBACK_WORKER_NAME = "Thread-CallbackRequestDispatcher" +_MAX_BATCH_SIZE = 100 +"""The maximum number of requests to process and dispatch at a time.""" + +_MAX_BATCH_LATENCY = 0.01 +"""The maximum amount of time in seconds to wait for additional request items +before processing the next batch of requests.""" + + class Dispatcher(object): def __init__(self, manager, queue): self._manager = manager @@ -43,7 +51,10 @@ def start(self): raise ValueError("Dispatcher is already running.") worker = helper_threads.QueueCallbackWorker( - self._queue, self.dispatch_callback, max_items=100, max_latency=0.01 + self._queue, + self.dispatch_callback, + max_items=_MAX_BATCH_SIZE, + max_latency=_MAX_BATCH_LATENCY, ) # Create and start the helper thread. thread = threading.Thread(name=_CALLBACK_WORKER_NAME, target=worker) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 3958a4a43db61..af6883fd067ea 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -44,6 +44,11 @@ exceptions.GatewayTimeout, exceptions.Aborted, ) +_MAX_LOAD = 1.0 +"""The load threshold above which to pause the incoming message stream.""" + +_RESUME_THRESHOLD = 0.8 +"""The load threshold below which to resume the incoming message stream.""" def _maybe_wrap_exception(exception): @@ -223,7 +228,7 @@ def add_close_callback(self, callback): def maybe_pause_consumer(self): """Check the current load and pause the consumer if needed.""" with self._pause_resume_lock: - if self.load >= 1.0: + if self.load >= _MAX_LOAD: if self._consumer is not None and not self._consumer.is_paused: _LOGGER.debug( "Message backlog over load at %.2f, pausing.", self.load @@ -252,7 +257,7 @@ def maybe_resume_consumer(self): # currently on hold, if the current load allows for it. self._maybe_release_messages() - if self.load < 0.8: + if self.load < _RESUME_THRESHOLD: _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) self._consumer.resume() else: @@ -271,7 +276,7 @@ def _maybe_release_messages(self): The method assumes the caller has acquired the ``_pause_resume_lock``. """ while True: - if self.load >= 1.0: + if self.load >= _MAX_LOAD: break # already overloaded try: @@ -520,7 +525,7 @@ def _on_response(self, response): message = google.cloud.pubsub_v1.subscriber.message.Message( received_message.message, received_message.ack_id, self._scheduler.queue ) - if self.load < 1.0: + if self.load < _MAX_LOAD: req = requests.LeaseRequest( ack_id=message.ack_id, byte_size=message.size )