diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 4b4832dbce37..ea2b6dcfc6c7 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -19,6 +19,7 @@ from google.gax.errors import GaxError from google.gax.grpc import exc_to_code from google.pubsub.v1.pubsub_pb2 import PubsubMessage +from google.pubsub.v1.pubsub_pb2 import PushConfig from grpc.beta.interfaces import StatusCode # pylint: enable=import-error @@ -47,9 +48,9 @@ def list_topics(self, project): :rtype: tuple, (list, str) :returns: list of ``Topic`` resource dicts, plus a - "next page token" string: if not None, indicates that - more topics can be retrieved with another call (pass that - value as ``page_token``). + "next page token" string: if not None, indicates that + more topics can be retrieved with another call (pass that + value as ``page_token``). """ options = CallOptions(is_page_streaming=False) path = 'projects/%s' % (project,) @@ -180,7 +181,279 @@ def topic_list_subscriptions(self, topic_path): return subs, response.next_page_token +class _SubscriberAPI(object): + """Helper mapping subscriber-related APIs. + + :type gax_api: :class:`google.pubsub.v1.publisher_api.SubscriberApi` + :param gax_api: API object used to make GAX requests. + """ + def __init__(self, gax_api): + self._gax_api = gax_api + + def list_subscriptions(self, project): + """List subscriptions for the project associated with this API. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/list + + :type project: string + :param project: project ID + + :rtype: tuple, (list, str) + :returns: list of ``Subscription`` resource dicts, plus a + "next page token" string: if not None, indicates that + more topics can be retrieved with another call (pass that + value as ``page_token``). + """ + options = CallOptions(is_page_streaming=False) + path = 'projects/%s' % (project,) + response = self._gax_api.list_subscriptions(path, options) + subscriptions = [_subscription_pb_to_mapping(sub_pb) + for sub_pb in response.subscriptions] + return subscriptions, response.next_page_token + + def subscription_create(self, subscription_path, topic_path, + ack_deadline=None, push_endpoint=None): + """API call: create a subscription + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/create + + :type subscription_path: string + :param subscription_path: the fully-qualified path of the new + subscription, in format + ``projects//subscriptions/``. + + :type topic_path: string + :param topic_path: the fully-qualified path of the topic being + subscribed, in format + ``projects//topics/``. + + :type ack_deadline: int, or ``NoneType`` + :param ack_deadline: the deadline (in seconds) by which messages pulled + from the back-end must be acknowledged. + + :type push_endpoint: string, or ``NoneType`` + :param push_endpoint: URL to which messages will be pushed by the + back-end. If not set, the application must pull + messages. + + :rtype: dict + :returns: ``Subscription`` resource returned from the API. + """ + if push_endpoint is not None: + push_config = PushConfig(push_endpoint=push_endpoint) + else: + push_config = None + + if ack_deadline is None: + ack_deadline = 0 + + try: + sub_pb = self._gax_api.create_subscription( + subscription_path, topic_path, push_config, ack_deadline) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: + raise Conflict(topic_path) + raise + return _subscription_pb_to_mapping(sub_pb) + + def subscription_get(self, subscription_path): + """API call: retrieve a subscription + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/get + + :type subscription_path: string + :param subscription_path: the fully-qualified path of the subscription, + in format + ``projects//subscriptions/``. + + :rtype: dict + :returns: ``Subscription`` resource returned from the API. + """ + try: + sub_pb = self._gax_api.get_subscription(subscription_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(subscription_path) + raise + return _subscription_pb_to_mapping(sub_pb) + + def subscription_delete(self, subscription_path): + """API call: delete a subscription + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/delete + + :type subscription_path: string + :param subscription_path: the fully-qualified path of the subscription, + in format + ``projects//subscriptions/``. + """ + try: + self._gax_api.delete_subscription(subscription_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(subscription_path) + raise + + def subscription_modify_push_config(self, subscription_path, + push_endpoint): + """API call: update push config of a subscription + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig + + :type subscription_path: string + :param subscription_path: the fully-qualified path of the new + subscription, in format + ``projects//subscriptions/``. + + :type push_endpoint: string, or ``NoneType`` + :param push_endpoint: URL to which messages will be pushed by the + back-end. If not set, the application must pull + messages. + """ + push_config = PushConfig(push_endpoint=push_endpoint) + try: + self._gax_api.modify_push_config(subscription_path, push_config) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(subscription_path) + raise + + def subscription_pull(self, subscription_path, return_immediately=False, + max_messages=1): + """API call: retrieve messages for a subscription + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig + + :type subscription_path: string + :param subscription_path: the fully-qualified path of the new + subscription, in format + ``projects//subscriptions/``. + + :type return_immediately: boolean + :param return_immediately: if True, the back-end returns even if no + messages are available; if False, the API + call blocks until one or more messages are + available. + + :type max_messages: int + :param max_messages: the maximum number of messages to return. + + :rtype: list of dict + :returns: the ``receivedMessages`` element of the response. + """ + try: + response_pb = self._gax_api.pull( + subscription_path, max_messages, return_immediately) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(subscription_path) + raise + return [_received_message_pb_to_mapping(rmpb) + for rmpb in response_pb.received_messages] + + def subscription_acknowledge(self, subscription_path, ack_ids): + """API call: acknowledge retrieved messages + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig + + :type subscription_path: string + :param subscription_path: the fully-qualified path of the new + subscription, in format + ``projects//subscriptions/``. + + :type ack_ids: list of string + :param ack_ids: ack IDs of messages being acknowledged + """ + try: + self._gax_api.acknowledge(subscription_path, ack_ids) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(subscription_path) + raise + + def subscription_modify_ack_deadline(self, subscription_path, ack_ids, + ack_deadline): + """API call: update ack deadline for retrieved messages + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyAckDeadline + + :type subscription_path: string + :param subscription_path: the fully-qualified path of the new + subscription, in format + ``projects//subscriptions/``. + + :type ack_ids: list of string + :param ack_ids: ack IDs of messages being acknowledged + + :type ack_deadline: int + :param ack_deadline: the deadline (in seconds) by which messages pulled + from the back-end must be acknowledged. + """ + try: + self._gax_api.modify_ack_deadline( + subscription_path, ack_ids, ack_deadline) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(subscription_path) + raise + + def _message_pb_from_dict(message): """Helper for :meth:`_PublisherAPI.topic_publish`.""" return PubsubMessage(data=_to_bytes(message['data']), attributes=message['attributes']) + + +def _subscription_pb_to_mapping(sub_pb): + """Helper for :meth:`list_subscriptions`, et aliae + + Ideally, would use a function from :mod:`protobuf.json_format`, but + the right one isn't public. See: + https://github.com/google/protobuf/issues/1351 + """ + mapping = { + 'name': sub_pb.name, + 'topic': sub_pb.topic, + 'ack_deadline': sub_pb.ack_deadline, + } + if sub_pb.push_config.push_endpoint != '': + mapping['push_config'] = { + 'push_endpoint': sub_pb.push_config.push_endpoint, + } + return mapping + + +def _message_pb_to_mapping(message_pb): + """Helper for :meth:`pull`, et aliae + + Ideally, would use a function from :mod:`protobuf.json_format`, but + the right one isn't public. See: + https://github.com/google/protobuf/issues/1351 + """ + return { + 'messageId': message_pb.message_id, + 'data': message_pb.data, + 'attributes': message_pb.attributes, + } + + +def _received_message_pb_to_mapping(received_message_pb): + """Helper for :meth:`pull`, et aliae + + Ideally, would use a function from :mod:`protobuf.json_format`, but + the right one isn't public. See: + https://github.com/google/protobuf/issues/1351 + """ + return { + 'ackId': received_message_pb.ack_id, + 'message': _message_pb_to_mapping( + received_message_pb.message), + } diff --git a/gcloud/pubsub/connection.py b/gcloud/pubsub/connection.py index 76c232e45a10..9826e45f2bf0 100644 --- a/gcloud/pubsub/connection.py +++ b/gcloud/pubsub/connection.py @@ -299,7 +299,7 @@ def subscription_create(self, subscription_path, topic_path, :type subscription_path: string :param subscription_path: the fully-qualified path of the new subscription, in format - ``projects//subscriptions/``. + ``projects//subscriptions/``. :type topic_path: string :param topic_path: the fully-qualified path of the topic being @@ -373,7 +373,7 @@ def subscription_modify_push_config(self, subscription_path, :type subscription_path: string :param subscription_path: the fully-qualified path of the new subscription, in format - ``projects//subscriptions/``. + ``projects//subscriptions/``. :type push_endpoint: string, or ``NoneType`` :param push_endpoint: URL to which messages will be pushed by the @@ -395,7 +395,7 @@ def subscription_pull(self, subscription_path, return_immediately=False, :type subscription_path: string :param subscription_path: the fully-qualified path of the new subscription, in format - ``projects//subscriptions/``. + ``projects//subscriptions/``. :type return_immediately: boolean :param return_immediately: if True, the back-end returns even if no @@ -427,7 +427,7 @@ def subscription_acknowledge(self, subscription_path, ack_ids): :type subscription_path: string :param subscription_path: the fully-qualified path of the new subscription, in format - ``projects//subscriptions/``. + ``projects//subscriptions/``. :type ack_ids: list of string :param ack_ids: ack IDs of messages being acknowledged @@ -449,7 +449,7 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids, :type subscription_path: string :param subscription_path: the fully-qualified path of the new subscription, in format - ``projects//subscriptions/``. + ``projects//subscriptions/``. :type ack_ids: list of string :param ack_ids: ack IDs of messages being acknowledged diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index 2feae097c76f..c544a583e679 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -25,8 +25,7 @@ _HAVE_GAX = True -@unittest2.skipUnless(_HAVE_GAX, 'No gax-python') -class Test_PublisherAPI(unittest2.TestCase): +class _Base(object): PROJECT = 'PROJECT' PROJECT_PATH = 'projects/%s' % (PROJECT,) LIST_TOPICS_PATH = '%s/topics' % (PROJECT_PATH,) @@ -36,13 +35,17 @@ class Test_PublisherAPI(unittest2.TestCase): SUB_NAME = 'sub_name' SUB_PATH = '%s/subscriptions/%s' % (TOPIC_PATH, SUB_NAME) + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + +@unittest2.skipUnless(_HAVE_GAX, 'No gax-python') +class Test_PublisherAPI(_Base, unittest2.TestCase): + def _getTargetClass(self): from gcloud.pubsub._gax import _PublisherAPI return _PublisherAPI - def _makeOne(self, *args, **kw): - return self._getTargetClass()(*args, **kw) - def test_ctor(self): gax_api = _GAXPublisherAPI() api = self._makeOne(gax_api) @@ -273,18 +276,372 @@ def test_topic_list_subscriptions_error(self): self.assertFalse(options.is_page_streaming) -class _GAXPublisherAPI(object): +@unittest2.skipUnless(_HAVE_GAX, 'No gax-python') +class Test_SubscriberAPI(_Base, unittest2.TestCase): + + PUSH_ENDPOINT = 'https://api.example.com/push' + + def _getTargetClass(self): + from gcloud.pubsub._gax import _SubscriberAPI + return _SubscriberAPI + + def test_ctor(self): + gax_api = _GAXSubscriberAPI() + api = self._makeOne(gax_api) + self.assertTrue(api._gax_api is gax_api) + + def test_list_subscriptions_no_paging(self): + response = _ListSubscriptionsResponsePB([_SubscriptionPB( + self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)]) + gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response) + api = self._makeOne(gax_api) + + subscriptions, next_token = api.list_subscriptions(self.PROJECT) + + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertIsInstance(subscription, dict) + self.assertEqual(subscription['name'], self.SUB_PATH) + self.assertEqual(subscription['topic'], self.TOPIC_PATH) + self.assertEqual(subscription['push_config'], + {'push_endpoint': self.PUSH_ENDPOINT}) + self.assertEqual(subscription['ack_deadline'], 0) + self.assertEqual(next_token, None) + + name, options = gax_api._list_subscriptions_called_with + self.assertEqual(name, self.PROJECT_PATH) + self.assertFalse(options.is_page_streaming) + + def test_subscription_create(self): + sub_pb = _SubscriptionPB(self.SUB_PATH, self.TOPIC_PATH, '', 0) + gax_api = _GAXSubscriberAPI(_create_subscription_response=sub_pb) + api = self._makeOne(gax_api) + + resource = api.subscription_create(self.SUB_PATH, self.TOPIC_PATH) + + expected = { + 'name': self.SUB_PATH, + 'topic': self.TOPIC_PATH, + 'ack_deadline': 0, + } + self.assertEqual(resource, expected) + name, topic, push_config, ack_deadline, options = ( + gax_api._create_subscription_called_with) + self.assertEqual(name, self.SUB_PATH) + self.assertEqual(topic, self.TOPIC_PATH) + self.assertEqual(push_config, None) + self.assertEqual(ack_deadline, 0) + self.assertEqual(options, None) + + def test_subscription_create_already_exists(self): + from gcloud.exceptions import Conflict + DEADLINE = 600 + gax_api = _GAXSubscriberAPI(_create_subscription_conflict=True) + api = self._makeOne(gax_api) + + with self.assertRaises(Conflict): + api.subscription_create( + self.SUB_PATH, self.TOPIC_PATH, DEADLINE, self.PUSH_ENDPOINT) + + name, topic, push_config, ack_deadline, options = ( + gax_api._create_subscription_called_with) + self.assertEqual(name, self.SUB_PATH) + self.assertEqual(topic, self.TOPIC_PATH) + self.assertEqual(push_config.push_endpoint, self.PUSH_ENDPOINT) + self.assertEqual(ack_deadline, DEADLINE) + self.assertEqual(options, None) + + def test_subscription_create_error(self): + from google.gax.errors import GaxError + gax_api = _GAXSubscriberAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.subscription_create(self.SUB_PATH, self.TOPIC_PATH) + + name, topic, push_config, ack_deadline, options = ( + gax_api._create_subscription_called_with) + self.assertEqual(name, self.SUB_PATH) + self.assertEqual(topic, self.TOPIC_PATH) + self.assertEqual(push_config, None) + self.assertEqual(ack_deadline, 0) + self.assertEqual(options, None) + + def test_subscription_get_hit(self): + sub_pb = _SubscriptionPB(self.SUB_PATH, self.TOPIC_PATH, '', 0) + gax_api = _GAXSubscriberAPI(_get_subscription_response=sub_pb) + api = self._makeOne(gax_api) + + resource = api.subscription_get(self.SUB_PATH) + + expected = { + 'name': self.SUB_PATH, + 'topic': self.TOPIC_PATH, + 'ack_deadline': 0, + } + self.assertEqual(resource, expected) + sub_path, options = gax_api._get_subscription_called_with + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(options, None) + + def test_subscription_get_miss(self): + from gcloud.exceptions import NotFound + gax_api = _GAXSubscriberAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.subscription_get(self.SUB_PATH) + + sub_path, options = gax_api._get_subscription_called_with + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(options, None) + + def test_subscription_get_error(self): + from google.gax.errors import GaxError + gax_api = _GAXSubscriberAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.subscription_get(self.SUB_PATH) + + sub_path, options = gax_api._get_subscription_called_with + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(options, None) + + def test_subscription_delete_hit(self): + gax_api = _GAXSubscriberAPI(_delete_subscription_ok=True) + api = self._makeOne(gax_api) + + api.subscription_delete(self.TOPIC_PATH) + + sub_path, options = gax_api._delete_subscription_called_with + self.assertEqual(sub_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_subscription_delete_miss(self): + from gcloud.exceptions import NotFound + gax_api = _GAXSubscriberAPI(_delete_subscription_ok=False) + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.subscription_delete(self.TOPIC_PATH) + + sub_path, options = gax_api._delete_subscription_called_with + self.assertEqual(sub_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_subscription_delete_error(self): + from google.gax.errors import GaxError + gax_api = _GAXSubscriberAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.subscription_delete(self.TOPIC_PATH) + + sub_path, options = gax_api._delete_subscription_called_with + self.assertEqual(sub_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_subscription_modify_push_config_hit(self): + gax_api = _GAXSubscriberAPI(_modify_push_config_ok=True) + api = self._makeOne(gax_api) + + api.subscription_modify_push_config(self.SUB_PATH, self.PUSH_ENDPOINT) + + sub_path, config, options = gax_api._modify_push_config_called_with + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(config.push_endpoint, self.PUSH_ENDPOINT) + self.assertEqual(options, None) + + def test_subscription_modify_push_config_miss(self): + from gcloud.exceptions import NotFound + gax_api = _GAXSubscriberAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.subscription_modify_push_config( + self.SUB_PATH, self.PUSH_ENDPOINT) + + sub_path, config, options = gax_api._modify_push_config_called_with + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(config.push_endpoint, self.PUSH_ENDPOINT) + self.assertEqual(options, None) + + def test_subscription_modify_push_config_error(self): + from google.gax.errors import GaxError + gax_api = _GAXSubscriberAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.subscription_modify_push_config( + self.SUB_PATH, self.PUSH_ENDPOINT) + + sub_path, config, options = gax_api._modify_push_config_called_with + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(config.push_endpoint, self.PUSH_ENDPOINT) + self.assertEqual(options, None) + + def test_subscription_pull_explicit(self): + import base64 + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + ACK_ID = 'DEADBEEF' + MSG_ID = 'BEADCAFE' + MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}} + RECEIVED = [{'ackId': ACK_ID, 'message': MESSAGE}] + message_pb = _PubsubMessagePB(MSG_ID, B64, {'a': 'b'}) + response_pb = _PullResponsePB([_ReceivedMessagePB(ACK_ID, message_pb)]) + gax_api = _GAXSubscriberAPI(_pull_response=response_pb) + api = self._makeOne(gax_api) + MAX_MESSAGES = 10 + + received = api.subscription_pull( + self.SUB_PATH, return_immediately=True, max_messages=MAX_MESSAGES) + + self.assertEqual(received, RECEIVED) + sub_path, max_messages, return_immediately, options = ( + gax_api._pull_called_with) + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(max_messages, MAX_MESSAGES) + self.assertTrue(return_immediately) + self.assertEqual(options, None) + + def test_subscription_pull_defaults_miss(self): + from gcloud.exceptions import NotFound + gax_api = _GAXSubscriberAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.subscription_pull(self.SUB_PATH) + + sub_path, max_messages, return_immediately, options = ( + gax_api._pull_called_with) + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(max_messages, 1) + self.assertFalse(return_immediately) + self.assertEqual(options, None) + + def test_subscription_pull_defaults_error(self): + from google.gax.errors import GaxError + gax_api = _GAXSubscriberAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.subscription_pull(self.SUB_PATH) + + sub_path, max_messages, return_immediately, options = ( + gax_api._pull_called_with) + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(max_messages, 1) + self.assertFalse(return_immediately) + self.assertEqual(options, None) + + def test_subscription_acknowledge_hit(self): + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + gax_api = _GAXSubscriberAPI(_acknowledge_ok=True) + api = self._makeOne(gax_api) + + api.subscription_acknowledge(self.SUB_PATH, [ACK_ID1, ACK_ID2]) + + sub_path, ack_ids, options = gax_api._acknowledge_called_with + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(ack_ids, [ACK_ID1, ACK_ID2]) + self.assertEqual(options, None) + + def test_subscription_acknowledge_miss(self): + from gcloud.exceptions import NotFound + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + gax_api = _GAXSubscriberAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.subscription_acknowledge(self.SUB_PATH, [ACK_ID1, ACK_ID2]) + + sub_path, ack_ids, options = gax_api._acknowledge_called_with + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(ack_ids, [ACK_ID1, ACK_ID2]) + self.assertEqual(options, None) + + def test_subscription_acknowledge_error(self): + from google.gax.errors import GaxError + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + gax_api = _GAXSubscriberAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.subscription_acknowledge(self.SUB_PATH, [ACK_ID1, ACK_ID2]) + + sub_path, ack_ids, options = gax_api._acknowledge_called_with + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(ack_ids, [ACK_ID1, ACK_ID2]) + self.assertEqual(options, None) + + def test_subscription_modify_ack_deadline_hit(self): + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + NEW_DEADLINE = 90 + gax_api = _GAXSubscriberAPI(_modify_ack_deadline_ok=True) + api = self._makeOne(gax_api) + + api.subscription_modify_ack_deadline( + self.SUB_PATH, [ACK_ID1, ACK_ID2], NEW_DEADLINE) + + sub_path, ack_ids, deadline, options = ( + gax_api._modify_ack_deadline_called_with) + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(ack_ids, [ACK_ID1, ACK_ID2]) + self.assertEqual(deadline, NEW_DEADLINE) + self.assertEqual(options, None) + + def test_subscription_modify_ack_deadline_miss(self): + from gcloud.exceptions import NotFound + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + NEW_DEADLINE = 90 + gax_api = _GAXSubscriberAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.subscription_modify_ack_deadline( + self.SUB_PATH, [ACK_ID1, ACK_ID2], NEW_DEADLINE) + + sub_path, ack_ids, deadline, options = ( + gax_api._modify_ack_deadline_called_with) + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(ack_ids, [ACK_ID1, ACK_ID2]) + self.assertEqual(deadline, NEW_DEADLINE) + self.assertEqual(options, None) + + def test_subscription_modify_ack_deadline_error(self): + from google.gax.errors import GaxError + ACK_ID1 = 'DEADBEEF' + ACK_ID2 = 'BEADCAFE' + NEW_DEADLINE = 90 + gax_api = _GAXSubscriberAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.subscription_modify_ack_deadline( + self.SUB_PATH, [ACK_ID1, ACK_ID2], NEW_DEADLINE) + + sub_path, ack_ids, deadline, options = ( + gax_api._modify_ack_deadline_called_with) + self.assertEqual(sub_path, self.SUB_PATH) + self.assertEqual(ack_ids, [ACK_ID1, ACK_ID2]) + self.assertEqual(deadline, NEW_DEADLINE) + self.assertEqual(options, None) + + +class _GaxAPIBase(object): - _create_topic_conflict = False _random_gax_error = False def __init__(self, **kw): self.__dict__.update(kw) - def list_topics(self, name, options): - self._list_topics_called_with = name, options - return self._list_topics_response - def _make_grpc_error(self, status_code): from grpc.framework.interfaces.face.face import AbortionError @@ -304,6 +661,15 @@ def _make_grpc_failed_precondition(self): from grpc.beta.interfaces import StatusCode return self._make_grpc_error(StatusCode.FAILED_PRECONDITION) + +class _GAXPublisherAPI(_GaxAPIBase): + + _create_topic_conflict = False + + def list_topics(self, name, options): + self._list_topics_called_with = name, options + return self._list_topics_response + def create_topic(self, name, options=None): from google.gax.errors import GaxError self._create_topic_called_with = name, options @@ -352,6 +718,84 @@ def list_topic_subscriptions(self, topic, options=None): raise GaxError('miss', self._make_grpc_not_found()) +class _GAXSubscriberAPI(_GaxAPIBase): + + _create_subscription_conflict = False + _modify_push_config_ok = False + _acknowledge_ok = False + _modify_ack_deadline_ok = False + + def list_subscriptions(self, project, options=None): + self._list_subscriptions_called_with = (project, options) + return self._list_subscriptions_response + + def create_subscription(self, name, topic, + push_config, ack_deadline_seconds, + options=None): + from google.gax.errors import GaxError + self._create_subscription_called_with = ( + name, topic, push_config, ack_deadline_seconds, options) + if self._random_gax_error: + raise GaxError('error') + if self._create_subscription_conflict: + raise GaxError('conflict', self._make_grpc_failed_precondition()) + return self._create_subscription_response + + def get_subscription(self, name, options=None): + from google.gax.errors import GaxError + self._get_subscription_called_with = name, options + if self._random_gax_error: + raise GaxError('error') + try: + return self._get_subscription_response + except AttributeError: + raise GaxError('miss', self._make_grpc_not_found()) + + def delete_subscription(self, name, options=None): + from google.gax.errors import GaxError + self._delete_subscription_called_with = name, options + if self._random_gax_error: + raise GaxError('error') + if not self._delete_subscription_ok: + raise GaxError('miss', self._make_grpc_not_found()) + + def modify_push_config(self, name, push_config, options=None): + from google.gax.errors import GaxError + self._modify_push_config_called_with = name, push_config, options + if self._random_gax_error: + raise GaxError('error') + if not self._modify_push_config_ok: + raise GaxError('miss', self._make_grpc_not_found()) + + def pull(self, name, max_messages, return_immediately, options=None): + from google.gax.errors import GaxError + self._pull_called_with = ( + name, max_messages, return_immediately, options) + if self._random_gax_error: + raise GaxError('error') + try: + return self._pull_response + except AttributeError: + raise GaxError('miss', self._make_grpc_not_found()) + + def acknowledge(self, name, ack_ids, options=None): + from google.gax.errors import GaxError + self._acknowledge_called_with = name, ack_ids, options + if self._random_gax_error: + raise GaxError('error') + if not self._acknowledge_ok: + raise GaxError('miss', self._make_grpc_not_found()) + + def modify_ack_deadline(self, name, ack_ids, deadline, options=None): + from google.gax.errors import GaxError + self._modify_ack_deadline_called_with = ( + name, ack_ids, deadline, options) + if self._random_gax_error: + raise GaxError('error') + if not self._modify_ack_deadline_ok: + raise GaxError('miss', self._make_grpc_not_found()) + + class _TopicPB(object): def __init__(self, name): @@ -376,3 +820,46 @@ class _ListTopicSubscriptionsResponsePB(object): def __init__(self, subscriptions, next_page_token=None): self.subscriptions = subscriptions self.next_page_token = next_page_token + + +class _PushConfigPB(object): + + def __init__(self, push_endpoint): + self.push_endpoint = push_endpoint + + +class _PubsubMessagePB(object): + + def __init__(self, message_id, data, attributes): + self.message_id = message_id + self.data = data + self.attributes = attributes + + +class _ReceivedMessagePB(object): + + def __init__(self, ack_id, message): + self.ack_id = ack_id + self.message = message + + +class _PullResponsePB(object): + + def __init__(self, received_messages): + self.received_messages = received_messages + + +class _SubscriptionPB(object): + + def __init__(self, name, topic, push_endpoint, ack_deadline): + self.name = name + self.topic = topic + self.push_config = _PushConfigPB(push_endpoint) + self.ack_deadline = ack_deadline + + +class _ListSubscriptionsResponsePB(object): + + def __init__(self, subscription_pbs, next_page_token=None): + self.subscriptions = subscription_pbs + self.next_page_token = next_page_token