Skip to content

Commit

Permalink
Merge pull request #1855 from tseaver/pubsub-add_gax_paging
Browse files Browse the repository at this point in the history
Add support for controlled paging via Gax.
  • Loading branch information
tseaver authored Jun 13, 2016
2 parents c252631 + eec245d commit 6fa89d8
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 17 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[report]
omit =
*/_generated/*.py
show_missing = True
exclude_lines =
# Re-enable the standard pragma
pragma: NO COVER
Expand Down
36 changes: 30 additions & 6 deletions gcloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

# pylint: disable=import-error
from google.gax import CallOptions
from google.gax import INITIAL_PAGE
from google.gax.errors import GaxError
from google.gax.grpc import exc_to_code
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
Expand All @@ -28,6 +29,14 @@
from gcloud._helpers import _to_bytes


def _build_paging_options(page_token=None):
"""Helper for :meth:'_PublisherAPI.list_topics' et aliae."""
if page_token is None:
page_token = INITIAL_PAGE
options = {'page_token': page_token}
return CallOptions(**options)


class _PublisherAPI(object):
"""Helper mapping publisher-related APIs.
Expand All @@ -37,7 +46,7 @@ class _PublisherAPI(object):
def __init__(self, gax_api):
self._gax_api = gax_api

def list_topics(self, project):
def list_topics(self, project, page_token=None):
"""List topics for the project associated with this API.
See:
Expand All @@ -46,13 +55,18 @@ def list_topics(self, project):
:type project: string
:param project: project ID
:type page_token: string
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of
topics.
:rtype: tuple, (list, str)
:returns: list of ``Topic`` resource dicts, plus a
"next page token" string: if not None, indicates that
more topics can be retrieved with another call (pass that
value as ``page_token``).
"""
options = CallOptions(is_page_streaming=False)
options = _build_paging_options(page_token)
path = 'projects/%s' % (project,)
response = self._gax_api.list_topics(path, options)
topics = [{'name': topic_pb.name} for topic_pb in response.topics]
Expand Down Expand Up @@ -152,7 +166,7 @@ def topic_publish(self, topic_path, messages):
raise
return response.message_ids

def topic_list_subscriptions(self, topic_path):
def topic_list_subscriptions(self, topic_path, page_token=None):
"""API call: list subscriptions bound to a topic
See:
Expand All @@ -162,13 +176,18 @@ def topic_list_subscriptions(self, topic_path):
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:type page_token: string
:param page_token: opaque marker for the next "page" of subscriptions.
If not passed, the API will return the first page
of subscriptions.
:rtype: list of strings
:returns: fully-qualified names of subscriptions for the supplied
topic.
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
options = CallOptions(is_page_streaming=False)
options = _build_paging_options(page_token)
try:
response = self._gax_api.list_topic_subscriptions(
topic_path, options)
Expand All @@ -190,7 +209,7 @@ class _SubscriberAPI(object):
def __init__(self, gax_api):
self._gax_api = gax_api

def list_subscriptions(self, project):
def list_subscriptions(self, project, page_token=None):
"""List subscriptions for the project associated with this API.
See:
Expand All @@ -199,13 +218,18 @@ def list_subscriptions(self, project):
:type project: string
:param project: project ID
:type page_token: string
:param page_token: opaque marker for the next "page" of subscriptions.
If not passed, the API will return the first page
of subscriptions.
:rtype: tuple, (list, str)
:returns: list of ``Subscription`` resource dicts, plus a
"next page token" string: if not None, indicates that
more topics can be retrieved with another call (pass that
value as ``page_token``).
"""
options = CallOptions(is_page_streaming=False)
options = _build_paging_options(page_token)
path = 'projects/%s' % (project,)
response = self._gax_api.list_subscriptions(path, options)
subscriptions = [_subscription_pb_to_mapping(sub_pb)
Expand Down
87 changes: 80 additions & 7 deletions gcloud/pubsub/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def test_ctor(self):
self.assertTrue(api._gax_api is gax_api)

def test_list_topics_no_paging(self):
response = _ListTopicsResponsePB([_TopicPB(self.TOPIC_PATH)])
from google.gax import INITIAL_PAGE
TOKEN = 'TOKEN'
response = _ListTopicsResponsePB([_TopicPB(self.TOPIC_PATH)], TOKEN)
gax_api = _GAXPublisherAPI(_list_topics_response=response)
api = self._makeOne(gax_api)

Expand All @@ -62,11 +64,31 @@ def test_list_topics_no_paging(self):
topic = topics[0]
self.assertIsInstance(topic, dict)
self.assertEqual(topic['name'], self.TOPIC_PATH)
self.assertEqual(next_token, None)
self.assertEqual(next_token, TOKEN)

name, options = gax_api._list_topics_called_with
self.assertEqual(name, self.PROJECT_PATH)
self.assertFalse(options.is_page_streaming)
self.assertTrue(options.page_token is INITIAL_PAGE)

def test_list_topics_with_paging(self):
TOKEN = 'TOKEN'
NEW_TOKEN = 'NEW_TOKEN'
response = _ListTopicsResponsePB(
[_TopicPB(self.TOPIC_PATH)], NEW_TOKEN)
gax_api = _GAXPublisherAPI(_list_topics_response=response)
api = self._makeOne(gax_api)

topics, next_token = api.list_topics(self.PROJECT, page_token=TOKEN)

self.assertEqual(len(topics), 1)
topic = topics[0]
self.assertIsInstance(topic, dict)
self.assertEqual(topic['name'], self.TOPIC_PATH)
self.assertEqual(next_token, NEW_TOKEN)

name, options = gax_api._list_topics_called_with
self.assertEqual(name, self.PROJECT_PATH)
self.assertEqual(options.page_token, TOKEN)

def test_topic_create(self):
topic_pb = _TopicPB(self.TOPIC_PATH)
Expand Down Expand Up @@ -233,6 +255,7 @@ def test_topic_publish_error(self):
self.assertEqual(options, None)

def test_topic_list_subscriptions_no_paging(self):
from google.gax import INITIAL_PAGE
response = _ListTopicSubscriptionsResponsePB([self.SUB_PATH])
gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response)
api = self._makeOne(gax_api)
Expand All @@ -249,9 +272,32 @@ def test_topic_list_subscriptions_no_paging(self):

topic_path, options = gax_api._list_topic_subscriptions_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
self.assertFalse(options.is_page_streaming)
self.assertTrue(options.page_token is INITIAL_PAGE)

def test_topic_list_subscriptions_with_paging(self):
TOKEN = 'TOKEN'
NEW_TOKEN = 'NEW_TOKEN'
response = _ListTopicSubscriptionsResponsePB(
[self.SUB_PATH], NEW_TOKEN)
gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response)
api = self._makeOne(gax_api)

subscriptions, next_token = api.topic_list_subscriptions(
self.TOPIC_PATH, page_token=TOKEN)

self.assertEqual(len(subscriptions), 1)
subscription = subscriptions[0]
self.assertIsInstance(subscription, dict)
self.assertEqual(subscription['name'], self.SUB_PATH)
self.assertEqual(subscription['topic'], self.TOPIC_PATH)
self.assertEqual(next_token, NEW_TOKEN)

name, options = gax_api._list_topic_subscriptions_called_with
self.assertEqual(name, self.TOPIC_PATH)
self.assertEqual(options.page_token, TOKEN)

def test_topic_list_subscriptions_miss(self):
from google.gax import INITIAL_PAGE
from gcloud.exceptions import NotFound
gax_api = _GAXPublisherAPI()
api = self._makeOne(gax_api)
Expand All @@ -261,9 +307,10 @@ def test_topic_list_subscriptions_miss(self):

topic_path, options = gax_api._list_topic_subscriptions_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
self.assertFalse(options.is_page_streaming)
self.assertTrue(options.page_token is INITIAL_PAGE)

def test_topic_list_subscriptions_error(self):
from google.gax import INITIAL_PAGE
from google.gax.errors import GaxError
gax_api = _GAXPublisherAPI(_random_gax_error=True)
api = self._makeOne(gax_api)
Expand All @@ -273,7 +320,7 @@ def test_topic_list_subscriptions_error(self):

topic_path, options = gax_api._list_topic_subscriptions_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
self.assertFalse(options.is_page_streaming)
self.assertTrue(options.page_token is INITIAL_PAGE)


@unittest2.skipUnless(_HAVE_GAX, 'No gax-python')
Expand All @@ -291,6 +338,7 @@ def test_ctor(self):
self.assertTrue(api._gax_api is gax_api)

def test_list_subscriptions_no_paging(self):
from google.gax import INITIAL_PAGE
response = _ListSubscriptionsResponsePB([_SubscriptionPB(
self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)])
gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response)
Expand All @@ -310,7 +358,32 @@ def test_list_subscriptions_no_paging(self):

name, options = gax_api._list_subscriptions_called_with
self.assertEqual(name, self.PROJECT_PATH)
self.assertFalse(options.is_page_streaming)
self.assertTrue(options.page_token is INITIAL_PAGE)

def test_list_subscriptions_with_paging(self):
TOKEN = 'TOKEN'
NEW_TOKEN = 'NEW_TOKEN'
response = _ListSubscriptionsResponsePB([_SubscriptionPB(
self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)], NEW_TOKEN)
gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response)
api = self._makeOne(gax_api)

subscriptions, next_token = api.list_subscriptions(
self.PROJECT, page_token=TOKEN)

self.assertEqual(len(subscriptions), 1)
subscription = subscriptions[0]
self.assertIsInstance(subscription, dict)
self.assertEqual(subscription['name'], self.SUB_PATH)
self.assertEqual(subscription['topic'], self.TOPIC_PATH)
self.assertEqual(subscription['pushConfig'],
{'pushEndpoint': self.PUSH_ENDPOINT})
self.assertEqual(subscription['ackDeadlineSeconds'], 0)
self.assertEqual(next_token, NEW_TOKEN)

name, options = gax_api._list_subscriptions_called_with
self.assertEqual(name, self.PROJECT_PATH)
self.assertEqual(options.page_token, TOKEN)

def test_subscription_create(self):
sub_pb = _SubscriptionPB(self.SUB_PATH, self.TOPIC_PATH, '', 0)
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
]

GRPC_EXTRAS = [
'grpcio == 0.13.1',
'grpcio >= 0.14.0',
'google-gax >= 0.11.0',
'gax-google-pubsub-v1',
]

Expand Down
16 changes: 13 additions & 3 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@ covercmd =
--cover-branches \
--nocapture

[grpc]
deps =
grpcio >= 0.14.0
google-gax >= 0.11.0
gax-google-pubsub-v1

[testenv:py27]
basepython =
python2.7
commands =
pip --quiet install gcloud[grpc]
nosetests
deps =
{[testenv]deps}
{[grpc]deps}
setenv =
PYTHONPATH =

Expand All @@ -45,10 +53,10 @@ deps =
basepython =
python2.7
commands =
pip --quiet install gcloud[grpc]
{[testenv]covercmd} --cover-min-percentage=100
deps =
{[testenv]deps}
{[grpc]deps}
coverage
setenv =
PYTHONPATH =
Expand Down Expand Up @@ -112,10 +120,12 @@ passenv = {[testenv:system-tests]passenv}
basepython =
python2.7
commands =
pip --quiet install gcloud[grpc]
python {toxinidir}/system_tests/attempt_system_tests.py
setenv =
PYTHONPATH =
deps =
{[testenv]deps}
{[grpc]deps}
passenv = GOOGLE_* GCLOUD_* TRAVIS* encrypted_*

[testenv:system-tests3]
Expand Down

0 comments on commit 6fa89d8

Please sign in to comment.