diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py new file mode 100644 index 000000000000..4b4832dbce37 --- /dev/null +++ b/gcloud/pubsub/_gax.py @@ -0,0 +1,186 @@ +# Copyright 2016 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""GAX wrapper for Pubsub API requests.""" + +# pylint: disable=import-error +from google.gax import CallOptions +from google.gax.errors import GaxError +from google.gax.grpc import exc_to_code +from google.pubsub.v1.pubsub_pb2 import PubsubMessage +from grpc.beta.interfaces import StatusCode +# pylint: enable=import-error + +from gcloud.exceptions import Conflict +from gcloud.exceptions import NotFound +from gcloud._helpers import _to_bytes + + +class _PublisherAPI(object): + """Helper mapping publisher-related APIs. + + :type gax_api: :class:`google.pubsub.v1.publisher_api.PublisherApi` + :param gax_api: API object used to make GAX requests. + """ + def __init__(self, gax_api): + self._gax_api = gax_api + + def list_topics(self, project): + """List topics for the project associated with this API. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/list + + :type project: string + :param project: project ID + + :rtype: tuple, (list, str) + :returns: list of ``Topic`` resource dicts, plus a + "next page token" string: if not None, indicates that + more topics can be retrieved with another call (pass that + value as ``page_token``). + """ + options = CallOptions(is_page_streaming=False) + path = 'projects/%s' % (project,) + response = self._gax_api.list_topics(path, options) + topics = [{'name': topic_pb.name} for topic_pb in response.topics] + return topics, response.next_page_token + + def topic_create(self, topic_path): + """API call: create a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create + + :type topic_path: string + :param topic_path: fully-qualified path of the new topic, in format + ``projects//topics/``. + + :rtype: dict + :returns: ``Topic`` resource returned from the API. + :raises: :exc:`gcloud.exceptions.Conflict` if the topic already + exists + """ + try: + topic_pb = self._gax_api.create_topic(topic_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: + raise Conflict(topic_path) + raise + return {'name': topic_pb.name} + + def topic_get(self, topic_path): + """API call: retrieve a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get + + :type topic_path: string + :param topic_path: fully-qualified path of the topic, in format + ``projects//topics/``. + + :rtype: dict + :returns: ``Topic`` resource returned from the API. + :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not + exist + """ + try: + topic_pb = self._gax_api.get_topic(topic_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise + return {'name': topic_pb.name} + + def topic_delete(self, topic_path): + """API call: delete a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create + + :type topic_path: string + :param topic_path: fully-qualified path of the new topic, in format + ``projects//topics/``. + + :rtype: dict + :returns: ``Topic`` resource returned from the API. + """ + try: + self._gax_api.delete_topic(topic_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise + + def topic_publish(self, topic_path, messages): + """API call: publish one or more messages to a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish + + :type topic_path: string + :param topic_path: fully-qualified path of the topic, in format + ``projects//topics/``. + + :type messages: list of dict + :param messages: messages to be published. + + :rtype: list of string + :returns: list of opaque IDs for published messages. + :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not + exist + """ + message_pbs = [_message_pb_from_dict(message) + for message in messages] + try: + response = self._gax_api.publish(topic_path, message_pbs) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise + return response.message_ids + + def topic_list_subscriptions(self, topic_path): + """API call: list subscriptions bound to a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list + + :type topic_path: string + :param topic_path: fully-qualified path of the topic, in format + ``projects//topics/``. + + :rtype: list of strings + :returns: fully-qualified names of subscriptions for the supplied + topic. + :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not + exist + """ + options = CallOptions(is_page_streaming=False) + try: + response = self._gax_api.list_topic_subscriptions( + topic_path, options) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise + subs = [{'topic': topic_path, 'name': subscription} + for subscription in response.subscriptions] + return subs, response.next_page_token + + +def _message_pb_from_dict(message): + """Helper for :meth:`_PublisherAPI.topic_publish`.""" + return PubsubMessage(data=_to_bytes(message['data']), + attributes=message['attributes']) diff --git a/gcloud/pubsub/connection.py b/gcloud/pubsub/connection.py index 196d58cea7a6..76c232e45a10 100644 --- a/gcloud/pubsub/connection.py +++ b/gcloud/pubsub/connection.py @@ -101,7 +101,7 @@ def __init__(self, connection): self._connection = connection def list_topics(self, project, page_size=None, page_token=None): - """List topics for the project associated with this API. + """API call: list topics for a given project See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/list @@ -138,13 +138,13 @@ def list_topics(self, project, page_size=None, page_token=None): return resp.get('topics', ()), resp.get('nextPageToken') def topic_create(self, topic_path): - """API call: create a topic via a PUT request + """API call: create a topic See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create :type topic_path: string - :param topic_path: the fully-qualfied path of the new topic, in format + :param topic_path: the fully-qualified path of the new topic, in format ``projects//topics/``. :rtype: dict @@ -154,13 +154,13 @@ def topic_create(self, topic_path): return conn.api_request(method='PUT', path='/%s' % (topic_path,)) def topic_get(self, topic_path): - """API call: retrieve a topic via a GET request + """API call: retrieve a topic See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get :type topic_path: string - :param topic_path: the fully-qualfied path of the topic, in format + :param topic_path: the fully-qualified path of the topic, in format ``projects//topics/``. :rtype: dict @@ -170,26 +170,26 @@ def topic_get(self, topic_path): return conn.api_request(method='GET', path='/%s' % (topic_path,)) def topic_delete(self, topic_path): - """API call: delete a topic via a DELETE request + """API call: delete a topic See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/delete :type topic_path: string - :param topic_path: the fully-qualfied path of the topic, in format + :param topic_path: the fully-qualified path of the topic, in format ``projects//topics/``. """ conn = self._connection conn.api_request(method='DELETE', path='/%s' % (topic_path,)) def topic_publish(self, topic_path, messages): - """API call: publish a message to a topic via a POST request + """API call: publish one or more messages to a topic See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish :type topic_path: string - :param topic_path: the fully-qualfied path of the topic, in format + :param topic_path: the fully-qualified path of the topic, in format ``projects//topics/``. :type messages: list of dict @@ -206,13 +206,13 @@ def topic_publish(self, topic_path, messages): def topic_list_subscriptions(self, topic_path, page_size=None, page_token=None): - """API call: list subscriptions bound to a topic via a GET request + """API call: list subscriptions bound to a topic See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list :type topic_path: string - :param topic_path: the fully-qualfied path of the topic, in format + :param topic_path: the fully-qualified path of the topic, in format ``projects//topics/``. :type page_size: int @@ -253,7 +253,7 @@ def __init__(self, connection): self._connection = connection def list_subscriptions(self, project, page_size=None, page_token=None): - """List subscriptions for the project associated with this API. + """API call: list subscriptions for a given project See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/list @@ -291,18 +291,18 @@ def list_subscriptions(self, project, page_size=None, page_token=None): def subscription_create(self, subscription_path, topic_path, ack_deadline=None, push_endpoint=None): - """API call: create a subscription via a PUT request + """API call: create a subscription See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/create :type subscription_path: string - :param subscription_path: the fully-qualfied path of the new + :param subscription_path: the fully-qualified path of the new subscription, in format ``projects//subscriptions/``. :type topic_path: string - :param topic_path: the fully-qualfied path of the topic being + :param topic_path: the fully-qualified path of the topic being subscribed, in format ``projects//topics/``. @@ -331,13 +331,13 @@ def subscription_create(self, subscription_path, topic_path, return conn.api_request(method='PUT', path=path, data=resource) def subscription_get(self, subscription_path): - """API call: retrieve a subscription via a GET request + """API call: retrieve a subscription See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/get :type subscription_path: string - :param subscription_path: the fully-qualfied path of the subscription, + :param subscription_path: the fully-qualified path of the subscription, in format ``projects//subscriptions/``. @@ -349,13 +349,13 @@ def subscription_get(self, subscription_path): return conn.api_request(method='GET', path=path) def subscription_delete(self, subscription_path): - """API call: delete a subscription via a DELETE request + """API call: delete a subscription See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/delete :type subscription_path: string - :param subscription_path: the fully-qualfied path of the subscription, + :param subscription_path: the fully-qualified path of the subscription, in format ``projects//subscriptions/``. """ @@ -365,13 +365,13 @@ def subscription_delete(self, subscription_path): def subscription_modify_push_config(self, subscription_path, push_endpoint): - """API call: update push config of a subscription via a POST request + """API call: update push config of a subscription See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig :type subscription_path: string - :param subscription_path: the fully-qualfied path of the new + :param subscription_path: the fully-qualified path of the new subscription, in format ``projects//subscriptions/``. @@ -387,13 +387,13 @@ def subscription_modify_push_config(self, subscription_path, def subscription_pull(self, subscription_path, return_immediately=False, max_messages=1): - """API call: update push config of a subscription via a POST request + """API call: retrieve messages for a subscription See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig :type subscription_path: string - :param subscription_path: the fully-qualfied path of the new + :param subscription_path: the fully-qualified path of the new subscription, in format ``projects//subscriptions/``. @@ -419,13 +419,13 @@ def subscription_pull(self, subscription_path, return_immediately=False, return response.get('receivedMessages', ()) def subscription_acknowledge(self, subscription_path, ack_ids): - """API call: acknowledge retrieved messages for the subscription. + """API call: acknowledge retrieved messages See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig :type subscription_path: string - :param subscription_path: the fully-qualfied path of the new + :param subscription_path: the fully-qualified path of the new subscription, in format ``projects//subscriptions/``. @@ -441,13 +441,13 @@ def subscription_acknowledge(self, subscription_path, ack_ids): def subscription_modify_ack_deadline(self, subscription_path, ack_ids, ack_deadline): - """API call: acknowledge retrieved messages for the subscription. + """API call: update ack deadline for retrieved messages See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyAckDeadline :type subscription_path: string - :param subscription_path: the fully-qualfied path of the new + :param subscription_path: the fully-qualified path of the new subscription, in format ``projects//subscriptions/``. @@ -478,7 +478,7 @@ def __init__(self, connection): self._connection = connection def get_iam_policy(self, target_path): - """Fetch the IAM policy for the target. + """API call: fetch the IAM policy for the target See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/getIamPolicy @@ -495,7 +495,7 @@ def get_iam_policy(self, target_path): return conn.api_request(method='GET', path=path) def set_iam_policy(self, target_path, policy): - """Update the IAM policy for the target. + """API call: update the IAM policy for the target See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/setIamPolicy @@ -516,7 +516,7 @@ def set_iam_policy(self, target_path, policy): return conn.api_request(method='POST', path=path, data=wrapped) def test_iam_permissions(self, target_path, permissions): - """Update the IAM policy for the target. + """API call: test permissions See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/testIamPermissions diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py new file mode 100644 index 000000000000..2feae097c76f --- /dev/null +++ b/gcloud/pubsub/test__gax.py @@ -0,0 +1,378 @@ +# Copyright 2016 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +try: + # pylint: disable=unused-import + import gcloud.pubsub._gax + # pylint: enable=unused-import +except ImportError: # pragma: NO COVER + _HAVE_GAX = False +else: + _HAVE_GAX = True + + +@unittest2.skipUnless(_HAVE_GAX, 'No gax-python') +class Test_PublisherAPI(unittest2.TestCase): + PROJECT = 'PROJECT' + PROJECT_PATH = 'projects/%s' % (PROJECT,) + LIST_TOPICS_PATH = '%s/topics' % (PROJECT_PATH,) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + LIST_TOPIC_SUBSCRIPTIONS_PATH = '%s/subscriptions' % (TOPIC_PATH,) + SUB_NAME = 'sub_name' + SUB_PATH = '%s/subscriptions/%s' % (TOPIC_PATH, SUB_NAME) + + def _getTargetClass(self): + from gcloud.pubsub._gax import _PublisherAPI + return _PublisherAPI + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor(self): + gax_api = _GAXPublisherAPI() + api = self._makeOne(gax_api) + self.assertTrue(api._gax_api is gax_api) + + def test_list_topics_no_paging(self): + response = _ListTopicsResponsePB([_TopicPB(self.TOPIC_PATH)]) + gax_api = _GAXPublisherAPI(_list_topics_response=response) + api = self._makeOne(gax_api) + + topics, next_token = api.list_topics(self.PROJECT) + + self.assertEqual(len(topics), 1) + topic = topics[0] + self.assertIsInstance(topic, dict) + self.assertEqual(topic['name'], self.TOPIC_PATH) + self.assertEqual(next_token, None) + + name, options = gax_api._list_topics_called_with + self.assertEqual(name, self.PROJECT_PATH) + self.assertFalse(options.is_page_streaming) + + def test_topic_create(self): + topic_pb = _TopicPB(self.TOPIC_PATH) + gax_api = _GAXPublisherAPI(_create_topic_response=topic_pb) + api = self._makeOne(gax_api) + + resource = api.topic_create(self.TOPIC_PATH) + + self.assertEqual(resource, {'name': self.TOPIC_PATH}) + topic_path, options = gax_api._create_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_create_already_exists(self): + from gcloud.exceptions import Conflict + gax_api = _GAXPublisherAPI(_create_topic_conflict=True) + api = self._makeOne(gax_api) + + with self.assertRaises(Conflict): + api.topic_create(self.TOPIC_PATH) + + topic_path, options = gax_api._create_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_create_error(self): + from google.gax.errors import GaxError + gax_api = _GAXPublisherAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.topic_create(self.TOPIC_PATH) + + topic_path, options = gax_api._create_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_get_hit(self): + topic_pb = _TopicPB(self.TOPIC_PATH) + gax_api = _GAXPublisherAPI(_get_topic_response=topic_pb) + api = self._makeOne(gax_api) + + resource = api.topic_get(self.TOPIC_PATH) + + self.assertEqual(resource, {'name': self.TOPIC_PATH}) + topic_path, options = gax_api._get_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_get_miss(self): + from gcloud.exceptions import NotFound + gax_api = _GAXPublisherAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.topic_get(self.TOPIC_PATH) + + topic_path, options = gax_api._get_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_get_error(self): + from google.gax.errors import GaxError + gax_api = _GAXPublisherAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.topic_get(self.TOPIC_PATH) + + topic_path, options = gax_api._get_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_delete_hit(self): + gax_api = _GAXPublisherAPI(_delete_topic_ok=True) + api = self._makeOne(gax_api) + + api.topic_delete(self.TOPIC_PATH) + + topic_path, options = gax_api._delete_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_delete_miss(self): + from gcloud.exceptions import NotFound + gax_api = _GAXPublisherAPI(_delete_topic_ok=False) + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.topic_delete(self.TOPIC_PATH) + + topic_path, options = gax_api._delete_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_delete_error(self): + from google.gax.errors import GaxError + gax_api = _GAXPublisherAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.topic_delete(self.TOPIC_PATH) + + topic_path, options = gax_api._delete_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_publish_hit(self): + import base64 + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, 'attributes': {}} + response = _PublishResponsePB([MSGID]) + gax_api = _GAXPublisherAPI(_publish_response=response) + api = self._makeOne(gax_api) + + resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + self.assertEqual(resource, [MSGID]) + topic_path, message_pbs, options = gax_api._publish_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + message_pb, = message_pbs + self.assertEqual(message_pb.data, B64) + self.assertEqual(message_pb.attributes, {}) + self.assertEqual(options, None) + + def test_topic_publish_miss_w_attrs_w_bytes_payload(self): + import base64 + from gcloud.exceptions import NotFound + PAYLOAD = u'This is the message text' + B64 = base64.b64encode(PAYLOAD) + MESSAGE = {'data': B64, 'attributes': {'foo': 'bar'}} + gax_api = _GAXPublisherAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + topic_path, message_pbs, options = gax_api._publish_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + message_pb, = message_pbs + self.assertEqual(message_pb.data, B64) + self.assertEqual(message_pb.attributes, {'foo': 'bar'}) + self.assertEqual(options, None) + + def test_topic_publish_error(self): + import base64 + from google.gax.errors import GaxError + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MESSAGE = {'data': B64, 'attributes': {}} + gax_api = _GAXPublisherAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + topic_path, message_pbs, options = gax_api._publish_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + message_pb, = message_pbs + self.assertEqual(message_pb.data, B64) + self.assertEqual(message_pb.attributes, {}) + self.assertEqual(options, None) + + def test_topic_list_subscriptions_no_paging(self): + response = _ListTopicSubscriptionsResponsePB([self.SUB_PATH]) + gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response) + api = self._makeOne(gax_api) + + subscriptions, next_token = api.topic_list_subscriptions( + self.TOPIC_PATH) + + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertIsInstance(subscription, dict) + self.assertEqual(subscription['name'], self.SUB_PATH) + self.assertEqual(subscription['topic'], self.TOPIC_PATH) + self.assertEqual(next_token, None) + + topic_path, options = gax_api._list_topic_subscriptions_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertFalse(options.is_page_streaming) + + def test_topic_list_subscriptions_miss(self): + from gcloud.exceptions import NotFound + gax_api = _GAXPublisherAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.topic_list_subscriptions(self.TOPIC_PATH) + + topic_path, options = gax_api._list_topic_subscriptions_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertFalse(options.is_page_streaming) + + def test_topic_list_subscriptions_error(self): + from google.gax.errors import GaxError + gax_api = _GAXPublisherAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.topic_list_subscriptions(self.TOPIC_PATH) + + topic_path, options = gax_api._list_topic_subscriptions_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertFalse(options.is_page_streaming) + + +class _GAXPublisherAPI(object): + + _create_topic_conflict = False + _random_gax_error = False + + def __init__(self, **kw): + self.__dict__.update(kw) + + def list_topics(self, name, options): + self._list_topics_called_with = name, options + return self._list_topics_response + + def _make_grpc_error(self, status_code): + from grpc.framework.interfaces.face.face import AbortionError + + class _DummyException(AbortionError): + code = status_code + + def __init__(self): + pass + + return _DummyException() + + def _make_grpc_not_found(self): + from grpc.beta.interfaces import StatusCode + return self._make_grpc_error(StatusCode.NOT_FOUND) + + def _make_grpc_failed_precondition(self): + from grpc.beta.interfaces import StatusCode + return self._make_grpc_error(StatusCode.FAILED_PRECONDITION) + + def create_topic(self, name, options=None): + from google.gax.errors import GaxError + self._create_topic_called_with = name, options + if self._random_gax_error: + raise GaxError('error') + if self._create_topic_conflict: + raise GaxError('conflict', self._make_grpc_failed_precondition()) + return self._create_topic_response + + def get_topic(self, name, options=None): + from google.gax.errors import GaxError + self._get_topic_called_with = name, options + if self._random_gax_error: + raise GaxError('error') + try: + return self._get_topic_response + except AttributeError: + raise GaxError('miss', self._make_grpc_not_found()) + + def delete_topic(self, name, options=None): + from google.gax.errors import GaxError + self._delete_topic_called_with = name, options + if self._random_gax_error: + raise GaxError('error') + if not self._delete_topic_ok: + raise GaxError('miss', self._make_grpc_not_found()) + + def publish(self, topic, messages, options=None): + from google.gax.errors import GaxError + self._publish_called_with = topic, messages, options + if self._random_gax_error: + raise GaxError('error') + try: + return self._publish_response + except AttributeError: + raise GaxError('miss', self._make_grpc_not_found()) + + def list_topic_subscriptions(self, topic, options=None): + from google.gax.errors import GaxError + self._list_topic_subscriptions_called_with = topic, options + if self._random_gax_error: + raise GaxError('error') + try: + return self._list_topic_subscriptions_response + except AttributeError: + raise GaxError('miss', self._make_grpc_not_found()) + + +class _TopicPB(object): + + def __init__(self, name): + self.name = name + + +class _PublishResponsePB(object): + + def __init__(self, message_ids): + self.message_ids = message_ids + + +class _ListTopicsResponsePB(object): + + def __init__(self, topic_pbs, next_page_token=None): + self.topics = topic_pbs + self.next_page_token = next_page_token + + +class _ListTopicSubscriptionsResponsePB(object): + + def __init__(self, subscriptions, next_page_token=None): + self.subscriptions = subscriptions + self.next_page_token = next_page_token diff --git a/gcloud/pubsub/test_connection.py b/gcloud/pubsub/test_connection.py index 03895dda0bab..fe641aad9e7c 100644 --- a/gcloud/pubsub/test_connection.py +++ b/gcloud/pubsub/test_connection.py @@ -199,7 +199,20 @@ def test_topic_create(self): path = '/%s' % (self.TOPIC_PATH,) self.assertEqual(connection._called_with['path'], path) - def test_topic_get(self): + def test_topic_create_already_exists(self): + from gcloud.exceptions import Conflict + connection = _Connection() + connection._no_response_error = Conflict + api = self._makeOne(connection) + + with self.assertRaises(Conflict): + api.topic_create(self.TOPIC_PATH) + + self.assertEqual(connection._called_with['method'], 'PUT') + path = '/%s' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + + def test_topic_get_hit(self): RETURNED = {'name': self.TOPIC_PATH} connection = _Connection(RETURNED) api = self._makeOne(connection) @@ -211,7 +224,19 @@ def test_topic_get(self): path = '/%s' % (self.TOPIC_PATH,) self.assertEqual(connection._called_with['path'], path) - def test_topic_delete(self): + def test_topic_get_miss(self): + from gcloud.exceptions import NotFound + connection = _Connection() + api = self._makeOne(connection) + + with self.assertRaises(NotFound): + api.topic_get(self.TOPIC_PATH) + + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + + def test_topic_delete_hit(self): RETURNED = {} connection = _Connection(RETURNED) api = self._makeOne(connection) @@ -222,7 +247,19 @@ def test_topic_delete(self): path = '/%s' % (self.TOPIC_PATH,) self.assertEqual(connection._called_with['path'], path) - def test_topic_publish(self): + def test_topic_delete_miss(self): + from gcloud.exceptions import NotFound + connection = _Connection() + api = self._makeOne(connection) + + with self.assertRaises(NotFound): + api.topic_delete(self.TOPIC_PATH) + + self.assertEqual(connection._called_with['method'], 'DELETE') + path = '/%s' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + + def test_topic_publish_hit(self): import base64 PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') @@ -241,6 +278,24 @@ def test_topic_publish(self): self.assertEqual(connection._called_with['data'], {'messages': [MESSAGE]}) + def test_topic_publish_miss(self): + import base64 + from gcloud.exceptions import NotFound + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MESSAGE = {'data': B64, 'attributes': {}} + connection = _Connection() + api = self._makeOne(connection) + + with self.assertRaises(NotFound): + api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:publish' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], + {'messages': [MESSAGE]}) + def test_topic_list_subscriptions_no_paging(self): SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} RETURNED = {'subscriptions': [SUB_INFO]} @@ -306,6 +361,19 @@ def test_topic_list_subscriptions_missing_key(self): self.assertEqual(connection._called_with['path'], path) self.assertEqual(connection._called_with['query_params'], {}) + def test_topic_list_subscriptions_miss(self): + from gcloud.exceptions import NotFound + connection = _Connection() + api = self._makeOne(connection) + + with self.assertRaises(NotFound): + api.topic_list_subscriptions(self.TOPIC_PATH) + + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_TOPIC_SUBSCRIPTIONS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], {}) + class Test_SubscriberAPI(_Base): @@ -665,11 +733,17 @@ def test_test_iam_permissions_missing_key(self): class _Connection(object): _called_with = None + _no_response_error = None def __init__(self, *responses): self._responses = responses def api_request(self, **kw): + from gcloud.exceptions import NotFound self._called_with = kw - response, self._responses = self._responses[0], self._responses[1:] + try: + response, self._responses = self._responses[0], self._responses[1:] + except IndexError: + err_class = self._no_response_error or NotFound + raise err_class('miss') return response diff --git a/setup.py b/setup.py index c3c20735bc5a..ff2af731ec0c 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ ] GRPC_EXTRAS = [ - 'grpcio >= 0.13.1', + 'grpcio == 0.13.1', 'gax-google-pubsub-v1', ] diff --git a/tox.ini b/tox.ini index d7dc90393600..593929fcebe7 100644 --- a/tox.ini +++ b/tox.ini @@ -50,15 +50,20 @@ commands = deps = {[testenv]deps} coverage +setenv = + PYTHONPATH = [testenv:coveralls] basepython = {[testenv:cover]basepython} commands = + pip install gcloud[grpc] {[testenv]covercmd} coveralls deps = {[testenv:cover]deps} coveralls +setenv = + PYTHONPATH = passenv = {[testenv:system-tests]passenv} [testenv:docs]