Skip to content

Commit

Permalink
Merge pull request #1764 from tseaver/pubsub-wrap_gax_publisher_api
Browse files Browse the repository at this point in the history
 Implement a GAX version of '_PublisherAPI'.
  • Loading branch information
tseaver committed May 16, 2016
2 parents a411069 + 46720fa commit 14b1143
Show file tree
Hide file tree
Showing 6 changed files with 678 additions and 35 deletions.
186 changes: 186 additions & 0 deletions gcloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Copyright 2016 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""GAX wrapper for Pubsub API requests."""

# pylint: disable=import-error
from google.gax import CallOptions
from google.gax.errors import GaxError
from google.gax.grpc import exc_to_code
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
from grpc.beta.interfaces import StatusCode
# pylint: enable=import-error

from gcloud.exceptions import Conflict
from gcloud.exceptions import NotFound
from gcloud._helpers import _to_bytes


class _PublisherAPI(object):
"""Helper mapping publisher-related APIs.
:type gax_api: :class:`google.pubsub.v1.publisher_api.PublisherApi`
:param gax_api: API object used to make GAX requests.
"""
def __init__(self, gax_api):
self._gax_api = gax_api

def list_topics(self, project):
"""List topics for the project associated with this API.
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/list
:type project: string
:param project: project ID
:rtype: tuple, (list, str)
:returns: list of ``Topic`` resource dicts, plus a
"next page token" string: if not None, indicates that
more topics can be retrieved with another call (pass that
value as ``page_token``).
"""
options = CallOptions(is_page_streaming=False)
path = 'projects/%s' % (project,)
response = self._gax_api.list_topics(path, options)
topics = [{'name': topic_pb.name} for topic_pb in response.topics]
return topics, response.next_page_token

def topic_create(self, topic_path):
"""API call: create a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create
:type topic_path: string
:param topic_path: fully-qualified path of the new topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:rtype: dict
:returns: ``Topic`` resource returned from the API.
:raises: :exc:`gcloud.exceptions.Conflict` if the topic already
exists
"""
try:
topic_pb = self._gax_api.create_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
raise Conflict(topic_path)
raise
return {'name': topic_pb.name}

def topic_get(self, topic_path):
"""API call: retrieve a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get
:type topic_path: string
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:rtype: dict
:returns: ``Topic`` resource returned from the API.
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
try:
topic_pb = self._gax_api.get_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
return {'name': topic_pb.name}

def topic_delete(self, topic_path):
"""API call: delete a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create
:type topic_path: string
:param topic_path: fully-qualified path of the new topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:rtype: dict
:returns: ``Topic`` resource returned from the API.
"""
try:
self._gax_api.delete_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise

def topic_publish(self, topic_path, messages):
"""API call: publish one or more messages to a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish
:type topic_path: string
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:type messages: list of dict
:param messages: messages to be published.
:rtype: list of string
:returns: list of opaque IDs for published messages.
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
message_pbs = [_message_pb_from_dict(message)
for message in messages]
try:
response = self._gax_api.publish(topic_path, message_pbs)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
return response.message_ids

def topic_list_subscriptions(self, topic_path):
"""API call: list subscriptions bound to a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list
:type topic_path: string
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:rtype: list of strings
:returns: fully-qualified names of subscriptions for the supplied
topic.
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
options = CallOptions(is_page_streaming=False)
try:
response = self._gax_api.list_topic_subscriptions(
topic_path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
subs = [{'topic': topic_path, 'name': subscription}
for subscription in response.subscriptions]
return subs, response.next_page_token


def _message_pb_from_dict(message):
"""Helper for :meth:`_PublisherAPI.topic_publish`."""
return PubsubMessage(data=_to_bytes(message['data']),
attributes=message['attributes'])
Loading

0 comments on commit 14b1143

Please sign in to comment.