From 550b0c60e9a04a2dee860f44472826378d136b87 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 2 Dec 2019 18:55:25 +0100 Subject: [PATCH 1/5] Clarify the description of BatchSettings.max_bytes --- pubsub/google/cloud/pubsub_v1/types.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pubsub/google/cloud/pubsub_v1/types.py b/pubsub/google/cloud/pubsub_v1/types.py index 7f833660f6e2..2d238b42f797 100644 --- a/pubsub/google/cloud/pubsub_v1/types.py +++ b/pubsub/google/cloud/pubsub_v1/types.py @@ -48,7 +48,9 @@ BatchSettings.__doc__ = "The settings for batch publishing the messages." BatchSettings.max_bytes.__doc__ = ( "The maximum total size of the messages to collect before automatically " - "publishing the batch." + "publishing the batch, including any byte size overhead of the publish " + "request itself. The maximum value is bound by the server-side limit of " + "10_000_000 bytes." ) BatchSettings.max_latency.__doc__ = ( "The maximum number of seconds to wait for additional messages before " From 20001935c78671cc6eabaca7c355343c2e99d6d7 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 3 Dec 2019 15:48:04 +0100 Subject: [PATCH 2/5] Include overhead in batch overflow calculation The maximum allowed size for a PublishRequest on the backend is lower than a mere sum of the byte sizes of individual messages. This commit adjusts the batch size overflow calculation to account for this overhead. It also caps the effective maximum BatchSetting.max_size value to 10_000_000 bytes (the limit on the backend). (credit also to GitHub @relud for outlining the main idea first in the issue description) --- .../cloud/pubsub_v1/publisher/_batch/base.py | 5 +- .../pubsub_v1/publisher/_batch/thread.py | 31 ++++++-- pubsub/tests/system.py | 31 ++++++++ .../pubsub_v1/publisher/batch/test_thread.py | 73 ++++++++++++++++--- 4 files changed, 122 insertions(+), 18 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py index 4dc6ceec6a80..75f430b09421 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py @@ -75,9 +75,12 @@ def messages(self): def size(self): """Return the total size of all of the messages currently in the batch. + The size includes any overhead of the actual ``PublishRequest`` that is + sent to the backend. + Returns: int: The total size of all of the messages currently - in the batch, in bytes. + in the batch (including the request overhead), in bytes. """ raise NotImplementedError diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index 726e93166cda..db754a8767c5 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -29,6 +29,7 @@ _LOGGER = logging.getLogger(__name__) _CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING) +_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest class Batch(base.Batch): @@ -79,9 +80,13 @@ def __init__(self, client, topic, settings, autocommit=True): # in order to avoid race conditions self._futures = [] self._messages = [] - self._size = 0 self._status = base.BatchStatus.ACCEPTING_MESSAGES + # The initial size is not zero, we need to account for the size overhead + # of the PublishRequest message itself. + self._base_request_size = types.PublishRequest(topic=topic).ByteSize() + self._size = self._base_request_size + # If max latency is specified, start a thread to monitor the batch and # commit when the max latency is reached. self._thread = None @@ -124,9 +129,12 @@ def settings(self): def size(self): """Return the total size of all of the messages currently in the batch. + The size includes any overhead of the actual ``PublishRequest`` that is + sent to the backend. + Returns: int: The total size of all of the messages currently - in the batch, in bytes. + in the batch (including the request overhead), in bytes. """ return self._size @@ -292,12 +300,21 @@ def publish(self, message): if not self.will_accept(message): return future - new_size = self._size + message.ByteSize() + size_increase = types.PublishRequest(messages=[message]).ByteSize() + + if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES: + err_msg = ( + "The message being published would produce too large a publish " + "request that would exceed the maximum allowed size on the " + "backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES) + ) + raise ValueError(err_msg) + + new_size = self._size + size_increase new_count = len(self._messages) + 1 - overflow = ( - new_size > self.settings.max_bytes - or new_count >= self._settings.max_messages - ) + + size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES) + overflow = new_size > size_limit or new_count >= self._settings.max_messages if not self._messages or not overflow: diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 59e5e3fe83a4..59e97f52dc1a 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -92,6 +92,37 @@ def test_publish_messages(publisher, topic_path, cleanup): assert isinstance(result, six.string_types) +def test_publish_large_messages(publisher, topic_path, cleanup): + futures = [] + # Make sure the topic gets deleted. + cleanup.append((publisher.delete_topic, topic_path)) + + # Each message should be smaller than 10**7 bytes (the server side limit for + # PublishRequest), but all messages combined in a PublishRequest should + # slightly exceed that threshold to make sure the publish code handles these + # cases well. + # Mind that the total PublishRequest size must still be smaller than + # 10 * 1024 * 1024 bytes in order to not exceed the max request body size limit. + msg_data = b"x" * (2 * 10 ** 6) + + publisher.batch_settings = types.BatchSettings( + max_bytes=11 * 1000 * 1000, # more than the server limit of 10 ** 7 + max_latency=2.0, # so that autocommit happens after publishing all messages + max_messages=100, + ) + publisher.create_topic(topic_path) + + for index in six.moves.range(5): + futures.append(publisher.publish(topic_path, msg_data, num=str(index))) + + # If the publishing logic correctly split all messages into more than a + # single batch despite a high BatchSettings.max_bytes limit, there should + # be no "InvalidArgument: request_size is too large" error. + for future in futures: + result = future.result(timeout=10) + assert isinstance(result, six.string_types) # the message ID + + def test_subscribe_to_messages( publisher, topic_path, subscriber, subscription_path, cleanup ): diff --git a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 60425e748043..e1e44f40be67 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -34,13 +34,15 @@ def create_client(): return publisher.Client(credentials=creds) -def create_batch(autocommit=False, **batch_settings): +def create_batch(autocommit=False, topic="topic_name", **batch_settings): """Return a batch object suitable for testing. Args: autocommit (bool): Whether the batch should commit after ``max_latency`` seconds. By default, this is ``False`` for unit testing. + autocommit (topic): The name of the topic the batch should publish + the messages to. batch_settings (dict): Arguments passed on to the :class:``~.pubsub_v1.types.BatchSettings`` constructor. @@ -49,7 +51,7 @@ def create_batch(autocommit=False, **batch_settings): """ client = create_client() settings = types.BatchSettings(**batch_settings) - return Batch(client, "topic_name", settings, autocommit=autocommit) + return Batch(client, topic, settings, autocommit=autocommit) def test_init(): @@ -299,8 +301,8 @@ def test_monitor_already_committed(): assert batch._status == status -def test_publish(): - batch = create_batch() +def test_publish_updating_batch_size(): + batch = create_batch(topic="topic_foo") messages = ( types.PubsubMessage(data=b"foobarbaz"), types.PubsubMessage(data=b"spameggs"), @@ -314,22 +316,28 @@ def test_publish(): assert len(batch.messages) == 3 assert batch._futures == futures - # The size should have been incremented by the sum of the size of the - # messages. - expected_size = sum([message_pb.ByteSize() for message_pb in messages]) - assert batch.size == expected_size + # The size should have been incremented by the sum of the size + # contributions of each message to the PublishRequest. + base_request_size = types.PublishRequest(topic="topic_foo").ByteSize() + msg_size_overheads = [ + types.PublishRequest(messages=[msg]).ByteSize() for msg in messages + ] + + expected_request_size = base_request_size + sum(msg_size_overheads) + assert batch.size == expected_request_size assert batch.size > 0 # I do not always trust protobuf. def test_publish_not_will_accept(): - batch = create_batch(max_messages=0) + batch = create_batch(topic="topic_foo", max_messages=0) + base_request_size = types.PublishRequest(topic="topic_foo").ByteSize() # Publish the message. message = types.PubsubMessage(data=b"foobarbaz") future = batch.publish(message) assert future is None - assert batch.size == 0 + assert batch.size == base_request_size assert batch.messages == [] assert batch._futures == [] @@ -361,6 +369,51 @@ def test_publish_exceed_max_messages(): assert batch._futures == futures +@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000) +def test_publish_single_message_size_exceeds_server_size_limit(): + batch = create_batch( + topic="topic_foo", + max_messages=1000, + max_bytes=1000 * 1000, # way larger than (mocked) server side limit + ) + + big_message = types.PubsubMessage(data=b"x" * 984) + + request_size = types.PublishRequest( + topic="topic_foo", messages=[big_message] + ).ByteSize() + assert request_size == 1001 # sanity check, just above the (mocked) server limit + + with pytest.raises(ValueError) as error: + batch.publish(big_message) + + error_msg = str(error) + assert "too large" in error_msg + assert "1000 bytes" in error_msg + + +@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000) +def test_publish_total_messages_size_exceeds_server_size_limit(): + batch = create_batch(topic="topic_foo", max_messages=10, max_bytes=1500) + + messages = ( + types.PubsubMessage(data=b"x" * 500), + types.PubsubMessage(data=b"x" * 600), + ) + + # Sanity check - request size is still below BatchSettings.max_bytes, + # but it exceeds the server-side size limit. + request_size = types.PublishRequest(topic="topic_foo", messages=messages).ByteSize() + assert 1000 < request_size < 1500 + + with mock.patch.object(batch, "commit") as fake_commit: + batch.publish(messages[0]) + batch.publish(messages[1]) + + # The server side limit should kick in and cause a commit. + fake_commit.assert_called_once() + + def test_publish_dict(): batch = create_batch() future = batch.publish({"data": b"foobarbaz", "attributes": {"spam": "eggs"}}) From 843a10abcfe7a7c39900deedc1900faafe1078e5 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 3 Dec 2019 23:48:49 +0100 Subject: [PATCH 3/5] Access settings inside Batch in a consistent way. --- pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index db754a8767c5..5ba551b9f197 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -313,7 +313,7 @@ def publish(self, message): new_size = self._size + size_increase new_count = len(self._messages) + 1 - size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES) + size_limit = min(self._settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES) overflow = new_size > size_limit or new_count >= self._settings.max_messages if not self._messages or not overflow: From 289935ebb011904038c0a109b954ec8fbb3b5795 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 4 Dec 2019 09:43:51 +0100 Subject: [PATCH 4/5] Cleanup and refactor a few code snippets --- .../pubsub_v1/publisher/_batch/thread.py | 10 +++++----- pubsub/tests/system.py | 19 ++++++++----------- .../pubsub_v1/publisher/batch/test_thread.py | 7 +++---- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index 5ba551b9f197..08c7d0b2a7e2 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -90,7 +90,7 @@ def __init__(self, client, topic, settings, autocommit=True): # If max latency is specified, start a thread to monitor the batch and # commit when the max latency is reached. self._thread = None - if autocommit and self._settings.max_latency < float("inf"): + if autocommit and self.settings.max_latency < float("inf"): self._thread = threading.Thread( name="Thread-MonitorBatchPublisher", target=self.monitor ) @@ -259,14 +259,14 @@ def _commit(self): def monitor(self): """Commit this batch after sufficient time has elapsed. - This simply sleeps for ``self._settings.max_latency`` seconds, + This simply sleeps for ``self.settings.max_latency`` seconds, and then calls commit unless the batch has already been committed. """ # NOTE: This blocks; it is up to the calling code to call it # in a separate thread. # Sleep for however long we should be waiting. - time.sleep(self._settings.max_latency) + time.sleep(self.settings.max_latency) _LOGGER.debug("Monitor is waking up") return self._commit() @@ -313,8 +313,8 @@ def publish(self, message): new_size = self._size + size_increase new_count = len(self._messages) + 1 - size_limit = min(self._settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES) - overflow = new_size > size_limit or new_count >= self._settings.max_messages + size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES) + overflow = new_size > size_limit or new_count >= self.settings.max_messages if not self._messages or not overflow: diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 59e97f52dc1a..65baaf016407 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -74,26 +74,24 @@ def cleanup(): def test_publish_messages(publisher, topic_path, cleanup): - futures = [] # Make sure the topic gets deleted. cleanup.append((publisher.delete_topic, topic_path)) publisher.create_topic(topic_path) - for index in six.moves.range(500): - futures.append( - publisher.publish( - topic_path, - b"The hail in Wales falls mainly on the snails.", - num=str(index), - ) + + futures = [ + publisher.publish( + topic_path, b"The hail in Wales falls mainly on the snails.", num=str(i) ) + for i in six.moves.range(500) + ] + for future in futures: result = future.result() assert isinstance(result, six.string_types) def test_publish_large_messages(publisher, topic_path, cleanup): - futures = [] # Make sure the topic gets deleted. cleanup.append((publisher.delete_topic, topic_path)) @@ -112,8 +110,7 @@ def test_publish_large_messages(publisher, topic_path, cleanup): ) publisher.create_topic(topic_path) - for index in six.moves.range(5): - futures.append(publisher.publish(topic_path, msg_data, num=str(index))) + futures = [publisher.publish(topic_path, msg_data, num=str(i)) for i in range(5)] # If the publishing logic correctly split all messages into more than a # single batch despite a high BatchSettings.max_bytes limit, there should diff --git a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py index e1e44f40be67..0df0bc439d4b 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -41,7 +41,7 @@ def create_batch(autocommit=False, topic="topic_name", **batch_settings): autocommit (bool): Whether the batch should commit after ``max_latency`` seconds. By default, this is ``False`` for unit testing. - autocommit (topic): The name of the topic the batch should publish + topic (str): The name of the topic the batch should publish the messages to. batch_settings (dict): Arguments passed on to the :class:``~.pubsub_v1.types.BatchSettings`` constructor. @@ -319,11 +319,10 @@ def test_publish_updating_batch_size(): # The size should have been incremented by the sum of the size # contributions of each message to the PublishRequest. base_request_size = types.PublishRequest(topic="topic_foo").ByteSize() - msg_size_overheads = [ + expected_request_size = base_request_size + sum( types.PublishRequest(messages=[msg]).ByteSize() for msg in messages - ] + ) - expected_request_size = base_request_size + sum(msg_size_overheads) assert batch.size == expected_request_size assert batch.size > 0 # I do not always trust protobuf. From d4509ec8e96d7c233d27b4e96751758350811ed4 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 5 Dec 2019 00:42:17 +0100 Subject: [PATCH 5/5] Raise more specific error if message too large --- pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py | 6 +++++- pubsub/google/cloud/pubsub_v1/publisher/exceptions.py | 6 +++++- pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py | 6 +----- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index 08c7d0b2a7e2..4101bc518b0a 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -289,6 +289,10 @@ def publish(self, message): the :class:`~concurrent.futures.Future` interface or :data:`None`. If :data:`None` is returned, that signals that the batch cannot accept a message. + + Raises: + pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing + the ``message`` would exceed the max size limit on the backend. """ # Coerce the type, just in case. if not isinstance(message, types.PubsubMessage): @@ -308,7 +312,7 @@ def publish(self, message): "request that would exceed the maximum allowed size on the " "backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES) ) - raise ValueError(err_msg) + raise exceptions.MessageTooLargeError(err_msg) new_size = self._size + size_increase new_count = len(self._messages) + 1 diff --git a/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py b/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py index adbfaaaa1ee1..be176bac2dba 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/exceptions.py @@ -22,4 +22,8 @@ class PublishError(GoogleAPICallError): pass -__all__ = ("PublishError", "TimeoutError") +class MessageTooLargeError(ValueError): + """Attempt to publish a message that would exceed the server max size limit.""" + + +__all__ = ("MessageTooLargeError", "PublishError", "TimeoutError") diff --git a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 0df0bc439d4b..f51b314af6df 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -383,13 +383,9 @@ def test_publish_single_message_size_exceeds_server_size_limit(): ).ByteSize() assert request_size == 1001 # sanity check, just above the (mocked) server limit - with pytest.raises(ValueError) as error: + with pytest.raises(exceptions.MessageTooLargeError): batch.publish(big_message) - error_msg = str(error) - assert "too large" in error_msg - assert "1000 bytes" in error_msg - @mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000) def test_publish_total_messages_size_exceeds_server_size_limit():