Skip to content

Commit

Permalink
EXPERIMENTAL: speed up streaming pull (circumvent proto wrapper clases)
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Aug 22, 2020
1 parent d5a6247 commit 30dce66
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -602,9 +602,13 @@ def _on_response(self, response):
)
return

# Circumvent the wrapper class and operate on the raw protobuf message
# to gain attribute access performance.
received_messages = response._pb.received_messages

_LOGGER.debug(
"Processing %s received message(s), currently on hold %s (bytes %s).",
len(response.received_messages),
len(received_messages),
self._messages_on_hold.size,
self._on_hold_bytes,
)
Expand All @@ -614,12 +618,12 @@ def _on_response(self, response):
# received them.
items = [
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
for message in response.received_messages
for message in received_messages
]
self._dispatcher.modify_ack_deadline(items)

with self._pause_resume_lock:
for received_message in response.received_messages:
for received_message in received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message,
received_message.ack_id,
Expand Down
23 changes: 17 additions & 6 deletions google/cloud/pubsub_v1/subscriber/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

from __future__ import absolute_import

import datetime as dt
import json
import math
import pytz
import time

from google.cloud.pubsub_v1.subscriber._protocol import requests
Expand Down Expand Up @@ -79,7 +81,7 @@ def __init__(self, message, ack_id, delivery_attempt, request_queue):
Args:
message (~.pubsub_v1.types.PubsubMessage): The message received
from Pub/Sub.
from Pub/Sub. TODO: it's the raw protobuf message! for performance
ack_id (str): The ack_id received from Pub/Sub.
delivery_attempt (int): The delivery attempt counter received
from Pub/Sub if a DeadLetterPolicy is set on the subscription,
Expand All @@ -99,6 +101,15 @@ def __init__(self, message, ack_id, delivery_attempt, request_queue):
# the default lease deadline.
self._received_timestamp = time.time()

self._attributes = message.attributes
self._data = message.data
self._publish_time = dt.datetime.fromtimestamp(
message.publish_time.seconds + message.publish_time.nanos / 1e9,
tz=pytz.UTC,
)
self._ordering_key = message.ordering_key
self._size = message.ByteSize()

def __repr__(self):
# Get an abbreviated version of the data.
abbv_data = self._message.data
Expand Down Expand Up @@ -130,7 +141,7 @@ def attributes(self):
.ScalarMapContainer: The message's attributes. This is a
``dict``-like object provided by ``google.protobuf``.
"""
return self._message.attributes
return self._attributes

@property
def data(self):
Expand All @@ -140,7 +151,7 @@ def data(self):
bytes: The message data. This is always a bytestring; if you
want a text string, call :meth:`bytes.decode`.
"""
return self._message.data
return self._data

@property
def publish_time(self):
Expand All @@ -149,17 +160,17 @@ def publish_time(self):
Returns:
datetime: The date and time that the message was published.
"""
return self._message.publish_time
return self._publish_time

@property
def ordering_key(self):
"""str: the ordering key used to publish the message."""
return self._message.ordering_key
return self._ordering_key

@property
def size(self):
"""Return the size of the underlying message, in bytes."""
return self._message._pb.ByteSize()
return self._size

@property
def ack_id(self):
Expand Down
21 changes: 12 additions & 9 deletions tests/unit/pubsub_v1/subscriber/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@
def create_message(data, ack_id="ACKID", delivery_attempt=0, ordering_key="", **attrs):
with mock.patch.object(time, "time") as time_:
time_.return_value = RECEIVED_SECONDS
msg = message.Message(
message=gapic_types.PubsubMessage(
attributes=attrs,
data=data,
message_id="message_id",
publish_time=timestamp_pb2.Timestamp(
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
),
ordering_key=ordering_key,
gapic_pubsub_message = gapic_types.PubsubMessage(
attributes=attrs,
data=data,
message_id="message_id",
publish_time=timestamp_pb2.Timestamp(
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
),
ordering_key=ordering_key,
)
msg = message.Message(
# The code under test uses a raw protobuf PubsubMessage, i.e. w/o additional
# Python class wrappers, hence the "_pb"
message=gapic_pubsub_message._pb,
ack_id=ack_id,
delivery_attempt=delivery_attempt,
request_queue=queue.Queue(),
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

def make_message(ack_id, ordering_key):
proto_msg = gapic_types.PubsubMessage(data=b"Q", ordering_key=ordering_key)
return message.Message(proto_msg, ack_id, 0, queue.Queue())
return message.Message(proto_msg._pb, ack_id, 0, queue.Queue())


def test_init():
Expand Down

0 comments on commit 30dce66

Please sign in to comment.