Skip to content

Commit

Permalink
Promote hardcoded values to module constants
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Jul 30, 2019
1 parent ecf5eb3 commit 45d535b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
13 changes: 12 additions & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
_CALLBACK_WORKER_NAME = "Thread-CallbackRequestDispatcher"


_MAX_BATCH_SIZE = 100
"""The maximum number of requests to process and dispatch at a time."""

_MAX_BATCH_LATENCY = 0.01
"""The maximum amount of time in seconds to wait for additional request items
before processing the next batch of requests."""


class Dispatcher(object):
def __init__(self, manager, queue):
self._manager = manager
Expand All @@ -43,7 +51,10 @@ def start(self):
raise ValueError("Dispatcher is already running.")

worker = helper_threads.QueueCallbackWorker(
self._queue, self.dispatch_callback, max_items=100, max_latency=0.01
self._queue,
self.dispatch_callback,
max_items=_MAX_BATCH_SIZE,
max_latency=_MAX_BATCH_LATENCY,
)
# Create and start the helper thread.
thread = threading.Thread(name=_CALLBACK_WORKER_NAME, target=worker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
exceptions.GatewayTimeout,
exceptions.Aborted,
)
_MAX_LOAD = 1.0
"""The load threshold above which to pause the incoming message stream."""

_RESUME_THRESHOLD = 0.8
"""The load threshold below which to resume the incoming message stream."""


def _maybe_wrap_exception(exception):
Expand Down Expand Up @@ -223,7 +228,7 @@ def add_close_callback(self, callback):
def maybe_pause_consumer(self):
"""Check the current load and pause the consumer if needed."""
with self._pause_resume_lock:
if self.load >= 1.0:
if self.load >= _MAX_LOAD:
if self._consumer is not None and not self._consumer.is_paused:
_LOGGER.debug(
"Message backlog over load at %.2f, pausing.", self.load
Expand Down Expand Up @@ -252,7 +257,7 @@ def maybe_resume_consumer(self):
# currently on hold, if the current load allows for it.
self._maybe_release_messages()

if self.load < 0.8:
if self.load < _RESUME_THRESHOLD:
_LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
self._consumer.resume()
else:
Expand All @@ -271,7 +276,7 @@ def _maybe_release_messages(self):
The method assumes the caller has acquired the ``_pause_resume_lock``.
"""
while True:
if self.load >= 1.0:
if self.load >= _MAX_LOAD:
break # already overloaded

try:
Expand Down Expand Up @@ -520,7 +525,7 @@ def _on_response(self, response):
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message, received_message.ack_id, self._scheduler.queue
)
if self.load < 1.0:
if self.load < _MAX_LOAD:
req = requests.LeaseRequest(
ack_id=message.ack_id, byte_size=message.size
)
Expand Down

0 comments on commit 45d535b

Please sign in to comment.