Skip to content

Commit

Permalink
Merge pull request #1795 from tseaver/pubsub-wrap_gax_subscriber_api
Browse files Browse the repository at this point in the history
Add GAX-based _SubscriberAPI.
  • Loading branch information
tseaver committed May 16, 2016
2 parents 14b1143 + 5ecff7b commit bb26745
Show file tree
Hide file tree
Showing 3 changed files with 779 additions and 19 deletions.
279 changes: 276 additions & 3 deletions gcloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from google.gax.errors import GaxError
from google.gax.grpc import exc_to_code
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
from google.pubsub.v1.pubsub_pb2 import PushConfig
from grpc.beta.interfaces import StatusCode
# pylint: enable=import-error

Expand Down Expand Up @@ -47,9 +48,9 @@ def list_topics(self, project):
:rtype: tuple, (list, str)
:returns: list of ``Topic`` resource dicts, plus a
"next page token" string: if not None, indicates that
more topics can be retrieved with another call (pass that
value as ``page_token``).
"next page token" string: if not None, indicates that
more topics can be retrieved with another call (pass that
value as ``page_token``).
"""
options = CallOptions(is_page_streaming=False)
path = 'projects/%s' % (project,)
Expand Down Expand Up @@ -180,7 +181,279 @@ def topic_list_subscriptions(self, topic_path):
return subs, response.next_page_token


class _SubscriberAPI(object):
"""Helper mapping subscriber-related APIs.
:type gax_api: :class:`google.pubsub.v1.publisher_api.SubscriberApi`
:param gax_api: API object used to make GAX requests.
"""
def __init__(self, gax_api):
self._gax_api = gax_api

def list_subscriptions(self, project):
"""List subscriptions for the project associated with this API.
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/list
:type project: string
:param project: project ID
:rtype: tuple, (list, str)
:returns: list of ``Subscription`` resource dicts, plus a
"next page token" string: if not None, indicates that
more topics can be retrieved with another call (pass that
value as ``page_token``).
"""
options = CallOptions(is_page_streaming=False)
path = 'projects/%s' % (project,)
response = self._gax_api.list_subscriptions(path, options)
subscriptions = [_subscription_pb_to_mapping(sub_pb)
for sub_pb in response.subscriptions]
return subscriptions, response.next_page_token

def subscription_create(self, subscription_path, topic_path,
ack_deadline=None, push_endpoint=None):
"""API call: create a subscription
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/create
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type topic_path: string
:param topic_path: the fully-qualified path of the topic being
subscribed, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:type ack_deadline: int, or ``NoneType``
:param ack_deadline: the deadline (in seconds) by which messages pulled
from the back-end must be acknowledged.
:type push_endpoint: string, or ``NoneType``
:param push_endpoint: URL to which messages will be pushed by the
back-end. If not set, the application must pull
messages.
:rtype: dict
:returns: ``Subscription`` resource returned from the API.
"""
if push_endpoint is not None:
push_config = PushConfig(push_endpoint=push_endpoint)
else:
push_config = None

if ack_deadline is None:
ack_deadline = 0

try:
sub_pb = self._gax_api.create_subscription(
subscription_path, topic_path, push_config, ack_deadline)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
raise Conflict(topic_path)
raise
return _subscription_pb_to_mapping(sub_pb)

def subscription_get(self, subscription_path):
"""API call: retrieve a subscription
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/get
:type subscription_path: string
:param subscription_path: the fully-qualified path of the subscription,
in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:rtype: dict
:returns: ``Subscription`` resource returned from the API.
"""
try:
sub_pb = self._gax_api.get_subscription(subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise
return _subscription_pb_to_mapping(sub_pb)

def subscription_delete(self, subscription_path):
"""API call: delete a subscription
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/delete
:type subscription_path: string
:param subscription_path: the fully-qualified path of the subscription,
in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
"""
try:
self._gax_api.delete_subscription(subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

def subscription_modify_push_config(self, subscription_path,
push_endpoint):
"""API call: update push config of a subscription
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type push_endpoint: string, or ``NoneType``
:param push_endpoint: URL to which messages will be pushed by the
back-end. If not set, the application must pull
messages.
"""
push_config = PushConfig(push_endpoint=push_endpoint)
try:
self._gax_api.modify_push_config(subscription_path, push_config)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

def subscription_pull(self, subscription_path, return_immediately=False,
max_messages=1):
"""API call: retrieve messages for a subscription
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type return_immediately: boolean
:param return_immediately: if True, the back-end returns even if no
messages are available; if False, the API
call blocks until one or more messages are
available.
:type max_messages: int
:param max_messages: the maximum number of messages to return.
:rtype: list of dict
:returns: the ``receivedMessages`` element of the response.
"""
try:
response_pb = self._gax_api.pull(
subscription_path, max_messages, return_immediately)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise
return [_received_message_pb_to_mapping(rmpb)
for rmpb in response_pb.received_messages]

def subscription_acknowledge(self, subscription_path, ack_ids):
"""API call: acknowledge retrieved messages
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged
"""
try:
self._gax_api.acknowledge(subscription_path, ack_ids)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
ack_deadline):
"""API call: update ack deadline for retrieved messages
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyAckDeadline
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged
:type ack_deadline: int
:param ack_deadline: the deadline (in seconds) by which messages pulled
from the back-end must be acknowledged.
"""
try:
self._gax_api.modify_ack_deadline(
subscription_path, ack_ids, ack_deadline)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise


def _message_pb_from_dict(message):
"""Helper for :meth:`_PublisherAPI.topic_publish`."""
return PubsubMessage(data=_to_bytes(message['data']),
attributes=message['attributes'])


def _subscription_pb_to_mapping(sub_pb):
"""Helper for :meth:`list_subscriptions`, et aliae
Ideally, would use a function from :mod:`protobuf.json_format`, but
the right one isn't public. See:
https://github.com/google/protobuf/issues/1351
"""
mapping = {
'name': sub_pb.name,
'topic': sub_pb.topic,
'ack_deadline': sub_pb.ack_deadline,
}
if sub_pb.push_config.push_endpoint != '':
mapping['push_config'] = {
'push_endpoint': sub_pb.push_config.push_endpoint,
}
return mapping


def _message_pb_to_mapping(message_pb):
"""Helper for :meth:`pull`, et aliae
Ideally, would use a function from :mod:`protobuf.json_format`, but
the right one isn't public. See:
https://github.com/google/protobuf/issues/1351
"""
return {
'messageId': message_pb.message_id,
'data': message_pb.data,
'attributes': message_pb.attributes,
}


def _received_message_pb_to_mapping(received_message_pb):
"""Helper for :meth:`pull`, et aliae
Ideally, would use a function from :mod:`protobuf.json_format`, but
the right one isn't public. See:
https://github.com/google/protobuf/issues/1351
"""
return {
'ackId': received_message_pb.ack_id,
'message': _message_pb_to_mapping(
received_message_pb.message),
}
10 changes: 5 additions & 5 deletions gcloud/pubsub/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def subscription_create(self, subscription_path, topic_path,
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type topic_path: string
:param topic_path: the fully-qualified path of the topic being
Expand Down Expand Up @@ -373,7 +373,7 @@ def subscription_modify_push_config(self, subscription_path,
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type push_endpoint: string, or ``NoneType``
:param push_endpoint: URL to which messages will be pushed by the
Expand All @@ -395,7 +395,7 @@ def subscription_pull(self, subscription_path, return_immediately=False,
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type return_immediately: boolean
:param return_immediately: if True, the back-end returns even if no
Expand Down Expand Up @@ -427,7 +427,7 @@ def subscription_acknowledge(self, subscription_path, ack_ids):
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged
Expand All @@ -449,7 +449,7 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged
Expand Down
Loading

0 comments on commit bb26745

Please sign in to comment.