Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): include request overhead when computing publish batch size overflow #9911

Merged
merged 5 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ def messages(self):
def size(self):
"""Return the total size of all of the messages currently in the batch.

The size includes any overhead of the actual ``PublishRequest`` that is
sent to the backend.

Returns:
int: The total size of all of the messages currently
in the batch, in bytes.
in the batch (including the request overhead), in bytes.
"""
raise NotImplementedError

Expand Down
41 changes: 31 additions & 10 deletions pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

_LOGGER = logging.getLogger(__name__)
_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING)
_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest


class Batch(base.Batch):
Expand Down Expand Up @@ -79,13 +80,17 @@ def __init__(self, client, topic, settings, autocommit=True):
# in order to avoid race conditions
self._futures = []
self._messages = []
self._size = 0
self._status = base.BatchStatus.ACCEPTING_MESSAGES

# The initial size is not zero, we need to account for the size overhead
# of the PublishRequest message itself.
self._base_request_size = types.PublishRequest(topic=topic).ByteSize()
self._size = self._base_request_size

# If max latency is specified, start a thread to monitor the batch and
# commit when the max latency is reached.
self._thread = None
if autocommit and self._settings.max_latency < float("inf"):
if autocommit and self.settings.max_latency < float("inf"):
self._thread = threading.Thread(
name="Thread-MonitorBatchPublisher", target=self.monitor
)
Expand Down Expand Up @@ -124,9 +129,12 @@ def settings(self):
def size(self):
"""Return the total size of all of the messages currently in the batch.

The size includes any overhead of the actual ``PublishRequest`` that is
sent to the backend.

Returns:
int: The total size of all of the messages currently
in the batch, in bytes.
in the batch (including the request overhead), in bytes.
"""
return self._size

Expand Down Expand Up @@ -251,14 +259,14 @@ def _commit(self):
def monitor(self):
"""Commit this batch after sufficient time has elapsed.

This simply sleeps for ``self._settings.max_latency`` seconds,
This simply sleeps for ``self.settings.max_latency`` seconds,
and then calls commit unless the batch has already been committed.
"""
# NOTE: This blocks; it is up to the calling code to call it
# in a separate thread.

# Sleep for however long we should be waiting.
time.sleep(self._settings.max_latency)
time.sleep(self.settings.max_latency)

_LOGGER.debug("Monitor is waking up")
return self._commit()
Expand All @@ -281,6 +289,10 @@ def publish(self, message):
the :class:`~concurrent.futures.Future` interface or :data:`None`.
If :data:`None` is returned, that signals that the batch cannot
accept a message.

Raises:
pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
the ``message`` would exceed the max size limit on the backend.
"""
# Coerce the type, just in case.
if not isinstance(message, types.PubsubMessage):
Expand All @@ -292,12 +304,21 @@ def publish(self, message):
if not self.will_accept(message):
return future

new_size = self._size + message.ByteSize()
size_increase = types.PublishRequest(messages=[message]).ByteSize()

if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES:
err_msg = (
"The message being published would produce too large a publish "
"request that would exceed the maximum allowed size on the "
"backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES)
)
raise exceptions.MessageTooLargeError(err_msg)

new_size = self._size + size_increase
new_count = len(self._messages) + 1
overflow = (
new_size > self.settings.max_bytes
or new_count >= self._settings.max_messages
)

size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES)
overflow = new_size > size_limit or new_count >= self.settings.max_messages

if not self._messages or not overflow:

Expand Down
6 changes: 5 additions & 1 deletion pubsub/google/cloud/pubsub_v1/publisher/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ class PublishError(GoogleAPICallError):
pass


__all__ = ("PublishError", "TimeoutError")
class MessageTooLargeError(ValueError):
"""Attempt to publish a message that would exceed the server max size limit."""


__all__ = ("MessageTooLargeError", "PublishError", "TimeoutError")
4 changes: 3 additions & 1 deletion pubsub/google/cloud/pubsub_v1/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
BatchSettings.__doc__ = "The settings for batch publishing the messages."
BatchSettings.max_bytes.__doc__ = (
"The maximum total size of the messages to collect before automatically "
"publishing the batch."
"publishing the batch, including any byte size overhead of the publish "
"request itself. The maximum value is bound by the server-side limit of "
"10_000_000 bytes."
)
BatchSettings.max_latency.__doc__ = (
"The maximum number of seconds to wait for additional messages before "
Expand Down
44 changes: 36 additions & 8 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,52 @@ def cleanup():


def test_publish_messages(publisher, topic_path, cleanup):
futures = []
# Make sure the topic gets deleted.
cleanup.append((publisher.delete_topic, topic_path))

publisher.create_topic(topic_path)
for index in six.moves.range(500):
futures.append(
publisher.publish(
topic_path,
b"The hail in Wales falls mainly on the snails.",
num=str(index),
)

futures = [
publisher.publish(
topic_path, b"The hail in Wales falls mainly on the snails.", num=str(i)
)
for i in six.moves.range(500)
]

for future in futures:
result = future.result()
assert isinstance(result, six.string_types)


def test_publish_large_messages(publisher, topic_path, cleanup):
# Make sure the topic gets deleted.
cleanup.append((publisher.delete_topic, topic_path))

# Each message should be smaller than 10**7 bytes (the server side limit for
# PublishRequest), but all messages combined in a PublishRequest should
# slightly exceed that threshold to make sure the publish code handles these
# cases well.
# Mind that the total PublishRequest size must still be smaller than
# 10 * 1024 * 1024 bytes in order to not exceed the max request body size limit.
msg_data = b"x" * (2 * 10 ** 6)

publisher.batch_settings = types.BatchSettings(
max_bytes=11 * 1000 * 1000, # more than the server limit of 10 ** 7
max_latency=2.0, # so that autocommit happens after publishing all messages
max_messages=100,
)
publisher.create_topic(topic_path)

futures = [publisher.publish(topic_path, msg_data, num=str(i)) for i in range(5)]

# If the publishing logic correctly split all messages into more than a
# single batch despite a high BatchSettings.max_bytes limit, there should
# be no "InvalidArgument: request_size is too large" error.
for future in futures:
result = future.result(timeout=10)
assert isinstance(result, six.string_types) # the message ID


def test_subscribe_to_messages(
publisher, topic_path, subscriber, subscription_path, cleanup
):
Expand Down
68 changes: 58 additions & 10 deletions pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ def create_client():
return publisher.Client(credentials=creds)


def create_batch(autocommit=False, **batch_settings):
def create_batch(autocommit=False, topic="topic_name", **batch_settings):
"""Return a batch object suitable for testing.

Args:
autocommit (bool): Whether the batch should commit after
``max_latency`` seconds. By default, this is ``False``
for unit testing.
topic (str): The name of the topic the batch should publish
the messages to.
batch_settings (dict): Arguments passed on to the
:class:``~.pubsub_v1.types.BatchSettings`` constructor.

Expand All @@ -49,7 +51,7 @@ def create_batch(autocommit=False, **batch_settings):
"""
client = create_client()
settings = types.BatchSettings(**batch_settings)
return Batch(client, "topic_name", settings, autocommit=autocommit)
return Batch(client, topic, settings, autocommit=autocommit)


def test_init():
Expand Down Expand Up @@ -299,8 +301,8 @@ def test_monitor_already_committed():
assert batch._status == status


def test_publish():
batch = create_batch()
def test_publish_updating_batch_size():
batch = create_batch(topic="topic_foo")
messages = (
types.PubsubMessage(data=b"foobarbaz"),
types.PubsubMessage(data=b"spameggs"),
Expand All @@ -314,22 +316,27 @@ def test_publish():
assert len(batch.messages) == 3
assert batch._futures == futures

# The size should have been incremented by the sum of the size of the
# messages.
expected_size = sum([message_pb.ByteSize() for message_pb in messages])
assert batch.size == expected_size
# The size should have been incremented by the sum of the size
# contributions of each message to the PublishRequest.
base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()
expected_request_size = base_request_size + sum(
types.PublishRequest(messages=[msg]).ByteSize() for msg in messages
)

assert batch.size == expected_request_size
assert batch.size > 0 # I do not always trust protobuf.


def test_publish_not_will_accept():
batch = create_batch(max_messages=0)
batch = create_batch(topic="topic_foo", max_messages=0)
base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()

# Publish the message.
message = types.PubsubMessage(data=b"foobarbaz")
future = batch.publish(message)

assert future is None
assert batch.size == 0
assert batch.size == base_request_size
assert batch.messages == []
assert batch._futures == []

Expand Down Expand Up @@ -361,6 +368,47 @@ def test_publish_exceed_max_messages():
assert batch._futures == futures


@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000)
def test_publish_single_message_size_exceeds_server_size_limit():
batch = create_batch(
topic="topic_foo",
max_messages=1000,
max_bytes=1000 * 1000, # way larger than (mocked) server side limit
)

big_message = types.PubsubMessage(data=b"x" * 984)

request_size = types.PublishRequest(
topic="topic_foo", messages=[big_message]
).ByteSize()
assert request_size == 1001 # sanity check, just above the (mocked) server limit

with pytest.raises(exceptions.MessageTooLargeError):
batch.publish(big_message)


@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000)
def test_publish_total_messages_size_exceeds_server_size_limit():
batch = create_batch(topic="topic_foo", max_messages=10, max_bytes=1500)

messages = (
types.PubsubMessage(data=b"x" * 500),
types.PubsubMessage(data=b"x" * 600),
)

# Sanity check - request size is still below BatchSettings.max_bytes,
# but it exceeds the server-side size limit.
request_size = types.PublishRequest(topic="topic_foo", messages=messages).ByteSize()
assert 1000 < request_size < 1500
plamut marked this conversation as resolved.
Show resolved Hide resolved

with mock.patch.object(batch, "commit") as fake_commit:
batch.publish(messages[0])
batch.publish(messages[1])

# The server side limit should kick in and cause a commit.
fake_commit.assert_called_once()


def test_publish_dict():
batch = create_batch()
future = batch.publish({"data": b"foobarbaz", "attributes": {"spam": "eggs"}})
Expand Down