Skip to content

Commit

Permalink
Optimize creating and accesing pubsub messages
Browse files Browse the repository at this point in the history
Profiling shows that the speed of creating a new pubsub message and
the speed of accessing the message's attributes significantly affects
the throughput of publisher and subscriber.

This commit makes everything faster by circumventing the wrapper class
around the raw protobuf pubsub messages where possible.
  • Loading branch information
plamut committed Aug 22, 2020
1 parent cca5683 commit c29d7f8
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 22 deletions.
8 changes: 7 additions & 1 deletion google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING)
_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest

_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()


class Batch(base.Batch):
"""A batch of messages.
Expand Down Expand Up @@ -337,7 +339,11 @@ def publish(self, message):

# Coerce the type, just in case.
if not isinstance(message, gapic_types.PubsubMessage):
message = gapic_types.PubsubMessage(**message)
# For performance reasons, the message should be constructed by directly
# using the raw protobuf class, and only then wrapping it into the
# higher-level PubsubMessage class.
vanilla_pb = _raw_proto_pubbsub_message(**message)
message = gapic_types.PubsubMessage.wrap(vanilla_pb)

future = None

Expand Down
9 changes: 7 additions & 2 deletions google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
"from_service_account_json",
)

_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()


def _set_nested_value(container, value, keys):
current = container
Expand Down Expand Up @@ -346,10 +348,13 @@ def publish(
"be sent as text strings."
)

# Create the Pub/Sub message object.
message = gapic_types.PubsubMessage(
# Create the Pub/Sub message object. For performance reasons, the message
# should be constructed by directly using the raw protobuf class, and only
# then wrapping it into the higher-level PubsubMessage class.
vanilla_pb = _raw_proto_pubbsub_message(
data=data, ordering_key=ordering_key, attributes=attrs
)
message = gapic_types.PubsubMessage.wrap(vanilla_pb)

# Messages should go through flow control to prevent excessive
# queuing on the client side (depending on the settings).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,9 +602,13 @@ def _on_response(self, response):
)
return

# IMPORTANT: Circumvent the wrapper class and operate on the raw underlying
# protobuf message to significantly gain on attribute access performance.
received_messages = response._pb.received_messages

_LOGGER.debug(
"Processing %s received message(s), currently on hold %s (bytes %s).",
len(response.received_messages),
len(received_messages),
self._messages_on_hold.size,
self._on_hold_bytes,
)
Expand All @@ -614,12 +618,12 @@ def _on_response(self, response):
# received them.
items = [
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
for message in response.received_messages
for message in received_messages
]
self._dispatcher.modify_ack_deadline(items)

with self._pause_resume_lock:
for received_message in response.received_messages:
for received_message in received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message,
received_message.ack_id,
Expand Down
28 changes: 22 additions & 6 deletions google/cloud/pubsub_v1/subscriber/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

from __future__ import absolute_import

import datetime as dt
import json
import math
import pytz
import time

from google.cloud.pubsub_v1.subscriber._protocol import requests
Expand Down Expand Up @@ -79,7 +81,9 @@ def __init__(self, message, ack_id, delivery_attempt, request_queue):
Args:
message (~.pubsub_v1.types.PubsubMessage): The message received
from Pub/Sub.
from Pub/Sub. For performance reasons it should be the the raw
protobuf message wrapped by the ``PubsubMessage`` class obtained
through the message's ``.pb()`` method.
ack_id (str): The ack_id received from Pub/Sub.
delivery_attempt (int): The delivery attempt counter received
from Pub/Sub if a DeadLetterPolicy is set on the subscription,
Expand All @@ -99,6 +103,18 @@ def __init__(self, message, ack_id, delivery_attempt, request_queue):
# the default lease deadline.
self._received_timestamp = time.time()

# Store the message attributes directly to speed up attribute access, i.e.
# to avoid two lookups if self._message.<attribute> pattern was used in
# properties.
self._attributes = message.attributes
self._data = message.data
self._publish_time = dt.datetime.fromtimestamp(
message.publish_time.seconds + message.publish_time.nanos / 1e9,
tz=pytz.UTC,
)
self._ordering_key = message.ordering_key
self._size = message.ByteSize()

def __repr__(self):
# Get an abbreviated version of the data.
abbv_data = self._message.data
Expand Down Expand Up @@ -130,7 +146,7 @@ def attributes(self):
.ScalarMapContainer: The message's attributes. This is a
``dict``-like object provided by ``google.protobuf``.
"""
return self._message.attributes
return self._attributes

@property
def data(self):
Expand All @@ -140,7 +156,7 @@ def data(self):
bytes: The message data. This is always a bytestring; if you
want a text string, call :meth:`bytes.decode`.
"""
return self._message.data
return self._data

@property
def publish_time(self):
Expand All @@ -149,17 +165,17 @@ def publish_time(self):
Returns:
datetime: The date and time that the message was published.
"""
return self._message.publish_time
return self._publish_time

@property
def ordering_key(self):
"""str: the ordering key used to publish the message."""
return self._message.ordering_key
return self._ordering_key

@property
def size(self):
"""Return the size of the underlying message, in bytes."""
return self._message._pb.ByteSize()
return self._size

@property
def ack_id(self):
Expand Down
21 changes: 12 additions & 9 deletions tests/unit/pubsub_v1/subscriber/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@
def create_message(data, ack_id="ACKID", delivery_attempt=0, ordering_key="", **attrs):
with mock.patch.object(time, "time") as time_:
time_.return_value = RECEIVED_SECONDS
msg = message.Message(
message=gapic_types.PubsubMessage(
attributes=attrs,
data=data,
message_id="message_id",
publish_time=timestamp_pb2.Timestamp(
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
),
ordering_key=ordering_key,
gapic_pubsub_message = gapic_types.PubsubMessage(
attributes=attrs,
data=data,
message_id="message_id",
publish_time=timestamp_pb2.Timestamp(
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
),
ordering_key=ordering_key,
)
msg = message.Message(
# The code under test uses a raw protobuf PubsubMessage, i.e. w/o additional
# Python class wrappers, hence the "_pb"
message=gapic_pubsub_message._pb,
ack_id=ack_id,
delivery_attempt=delivery_attempt,
request_queue=queue.Queue(),
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

def make_message(ack_id, ordering_key):
proto_msg = gapic_types.PubsubMessage(data=b"Q", ordering_key=ordering_key)
return message.Message(proto_msg, ack_id, 0, queue.Queue())
return message.Message(proto_msg._pb, ack_id, 0, queue.Queue())


def test_init():
Expand Down

0 comments on commit c29d7f8

Please sign in to comment.