From 4506b59b4c8632a051a6aabc1f5b4bccb13fc7ed Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Mon, 28 Aug 2017 09:15:55 -0700 Subject: [PATCH] Update pubsub samples [(#1092)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1092) --- samples/snippets/iam.py | 138 ++++++++++++++-------------- samples/snippets/iam_test.py | 110 +++++++++++----------- samples/snippets/publisher.py | 118 ++++++++++++++++++------ samples/snippets/publisher_test.py | 76 ++++++++++----- samples/snippets/quickstart.py | 19 ++-- samples/snippets/quickstart_test.py | 30 +++--- samples/snippets/requirements.txt | 2 +- samples/snippets/subscriber.py | 113 ++++++++++++++--------- samples/snippets/subscriber_test.py | 137 +++++++++++++++++++-------- 9 files changed, 460 insertions(+), 283 deletions(-) diff --git a/samples/snippets/iam.py b/samples/snippets/iam.py index 7f35f9d9f..b46bc1147 100644 --- a/samples/snippets/iam.py +++ b/samples/snippets/iam.py @@ -23,115 +23,113 @@ import argparse -from google.cloud import pubsub +from google.cloud import pubsub_v1 -def get_topic_policy(topic_name): +def get_topic_policy(project, topic_name): """Prints the IAM policy for the given topic.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) + client = pubsub_v1.PublisherClient() + topic_path = client.topic_path(project, topic_name) - policy = topic.get_iam_policy() + policy = client.get_iam_policy(topic_path) - print('Policy for topic {}:'.format(topic.name)) - print('Version: {}'.format(policy.version)) - print('Owners: {}'.format(policy.owners)) - print('Editors: {}'.format(policy.editors)) - print('Viewers: {}'.format(policy.viewers)) - print('Publishers: {}'.format(policy.publishers)) - print('Subscribers: {}'.format(policy.subscribers)) + print('Policy for topic {}:'.format(topic_path)) + for binding in policy.bindings: + print('Role: {}, Members: {}'.format(binding.role, binding.members)) -def get_subscription_policy(topic_name, subscription_name): +def get_subscription_policy(project, subscription_name): """Prints the IAM policy for the given subscription.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) - subscription = topic.subscription(subscription_name) + client = pubsub_v1.SubscriberClient() + subscription_path = client.subscription_path(project, subscription_name) - policy = subscription.get_iam_policy() + policy = client.get_iam_policy(subscription_path) - print('Policy for subscription {} on topic {}:'.format( - subscription.name, topic.name)) - print('Version: {}'.format(policy.version)) - print('Owners: {}'.format(policy.owners)) - print('Editors: {}'.format(policy.editors)) - print('Viewers: {}'.format(policy.viewers)) - print('Publishers: {}'.format(policy.publishers)) - print('Subscribers: {}'.format(policy.subscribers)) + print('Policy for subscription {}:'.format(subscription_path)) + for binding in policy.bindings: + print('Role: {}, Members: {}'.format(binding.role, binding.members)) -def set_topic_policy(topic_name): +def set_topic_policy(project, topic_name): """Sets the IAM policy for a topic.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) - policy = topic.get_iam_policy() + client = pubsub_v1.PublisherClient() + topic_path = client.topic_path(project, topic_name) + + policy = client.get_iam_policy(topic_path) # Add all users as viewers. - policy['roles/pubsub.viewer'] = [policy.all_users()] - # Add a group as publisherss. - publishers = policy.get('roles/pubsub.publisher', []) - publishers.add(policy.group('cloud-logs@google.com')) - policy['roles/pubsub.publisher'] = publishers + policy.bindings.add( + role='roles/pubsub.viewer', + members=['allUsers']) + + # Add a group as a publisher. + policy.bindings.add( + role='roles/pubsub.publisher', + members=['group:cloud-logs@google.com']) # Set the policy - topic.set_iam_policy(policy) + policy = client.set_iam_policy(topic_path, policy) - print('IAM policy for topic {} set.'.format(topic.name)) + print('IAM policy for topic {} set: {}'.format( + topic_name, policy)) -def set_subscription_policy(topic_name, subscription_name): +def set_subscription_policy(project, subscription_name): """Sets the IAM policy for a topic.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) - subscription = topic.subscription(subscription_name) - policy = subscription.get_iam_policy() + client = pubsub_v1.SubscriberClient() + subscription_path = client.subscription_path(project, subscription_name) + + policy = client.get_iam_policy(subscription_path) # Add all users as viewers. - policy['roles/viewer'] = [policy.all_users()] - # # Add a group as editors. - editors = policy.get('roles/editor', []) - editors.add(policy.group('cloud-logs@google.com')) - policy['roles/editor'] = editors + policy.bindings.add( + role='roles/pubsub.viewer', + members=['allUsers']) + + # Add a group as an editor. + policy.bindings.add( + role='roles/editor', + members=['group:cloud-logs@google.com']) # Set the policy - subscription.set_iam_policy(policy) + policy = client.set_iam_policy(subscription_path, policy) - print('IAM policy for subscription {} on topic {} set.'.format( - topic.name, subscription.name)) + print('IAM policy for subscription {} set: {}'.format( + subscription_name, policy)) -def check_topic_permissions(topic_name): +def check_topic_permissions(project, topic_name): """Checks to which permissions are available on the given topic.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) + client = pubsub_v1.PublisherClient() + topic_path = client.topic_path(project, topic_name) permissions_to_check = [ 'pubsub.topics.publish', 'pubsub.topics.update' ] - allowed_permissions = topic.check_iam_permissions(permissions_to_check) + allowed_permissions = client.test_iam_permissions( + topic_path, permissions_to_check) print('Allowed permissions for topic {}: {}'.format( - topic.name, allowed_permissions)) + topic_path, allowed_permissions)) -def check_subscription_permissions(topic_name, subscription_name): +def check_subscription_permissions(project, subscription_name): """Checks to which permissions are available on the given subscription.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) - subscription = topic.subscription(subscription_name) + client = pubsub_v1.SubscriberClient() + subscription_path = client.subscription_path(project, subscription_name) permissions_to_check = [ 'pubsub.subscriptions.consume', 'pubsub.subscriptions.update' ] - allowed_permissions = subscription.check_iam_permissions( - permissions_to_check) + allowed_permissions = client.test_iam_permissions( + subscription_path, permissions_to_check) - print('Allowed permissions for subscription {} on topic {}: {}'.format( - subscription.name, topic.name, allowed_permissions)) + print('Allowed permissions for subscription {}: {}'.format( + subscription_path, allowed_permissions)) if __name__ == '__main__': @@ -139,6 +137,7 @@ def check_subscription_permissions(topic_name, subscription_name): description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) + parser.add_argument('project', help='Your Google Cloud project ID') subparsers = parser.add_subparsers(dest='command') @@ -148,7 +147,6 @@ def check_subscription_permissions(topic_name, subscription_name): get_subscription_policy_parser = subparsers.add_parser( 'get-subscription-policy', help=get_subscription_policy.__doc__) - get_subscription_policy_parser.add_argument('topic_name') get_subscription_policy_parser.add_argument('subscription_name') set_topic_policy_parser = subparsers.add_parser( @@ -157,7 +155,6 @@ def check_subscription_permissions(topic_name, subscription_name): set_subscription_policy_parser = subparsers.add_parser( 'set-subscription-policy', help=set_subscription_policy.__doc__) - set_subscription_policy_parser.add_argument('topic_name') set_subscription_policy_parser.add_argument('subscription_name') check_topic_permissions_parser = subparsers.add_parser( @@ -167,20 +164,19 @@ def check_subscription_permissions(topic_name, subscription_name): check_subscription_permissions_parser = subparsers.add_parser( 'check-subscription-permissions', help=check_subscription_permissions.__doc__) - check_subscription_permissions_parser.add_argument('topic_name') check_subscription_permissions_parser.add_argument('subscription_name') args = parser.parse_args() if args.command == 'get-topic-policy': - get_topic_policy(args.topic_name) + get_topic_policy(args.project, args.topic_name) elif args.command == 'get-subscription-policy': - get_subscription_policy(args.topic_name, args.subscription_name) + get_subscription_policy(args.project, args.subscription_name) elif args.command == 'set-topic-policy': - set_topic_policy(args.topic_name) + set_topic_policy(args.project, args.topic_name) elif args.command == 'set-subscription-policy': - set_subscription_policy(args.topic_name, args.subscription_name) + set_subscription_policy(args.project, args.subscription_name) elif args.command == 'check-topic-permissions': - check_topic_permissions(args.topic_name) + check_topic_permissions(args.project, args.topic_name) elif args.command == 'check-subscription-permissions': - check_subscription_permissions(args.topic_name, args.subscription_name) + check_subscription_permissions(args.project, args.subscription_name) diff --git a/samples/snippets/iam_test.py b/samples/snippets/iam_test.py index f36dbd214..3deaec746 100644 --- a/samples/snippets/iam_test.py +++ b/samples/snippets/iam_test.py @@ -12,94 +12,100 @@ # See the License for the specific language governing permissions and # limitations under the License. -from google.cloud import pubsub +import os + +from google.cloud import pubsub_v1 import pytest import iam -TEST_TOPIC = 'iam-test-topic' -TEST_SUBSCRIPTION = 'iam-test-subscription' +PROJECT = os.environ['GCLOUD_PROJECT'] +TOPIC = 'iam-test-topic' +SUBSCRIPTION = 'iam-test-subscription' @pytest.fixture(scope='module') -def test_topic(): - client = pubsub.Client() - topic = client.topic(TEST_TOPIC) +def publisher_client(): + yield pubsub_v1.PublisherClient() - if not topic.exists(): - topic.create() - yield topic +@pytest.fixture(scope='module') +def topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) - if topic.exists(): - topic.delete() + try: + publisher_client.delete_topic(topic_path) + except: + pass + publisher_client.create_topic(topic_path) -@pytest.fixture -def test_subscription(test_topic): - subscription = test_topic.subscription(TEST_SUBSCRIPTION) - yield subscription - if subscription.exists(): - subscription.delete() + yield topic_path -def test_get_topic_policy(test_topic, capsys): - iam.get_topic_policy(test_topic.name) +@pytest.fixture(scope='module') +def subscriber_client(): + yield pubsub_v1.SubscriberClient() - out, _ = capsys.readouterr() - assert test_topic.name in out +@pytest.fixture +def subscription(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION) + + try: + subscriber_client.delete_subscription(subscription_path) + except: + pass -def test_get_subscription_policy(test_subscription, capsys): - test_subscription.create() + subscriber_client.create_subscription(subscription_path, topic=topic) - iam.get_subscription_policy( - test_subscription.topic.name, - test_subscription.name) + yield subscription_path + + +def test_get_topic_policy(topic, capsys): + iam.get_topic_policy(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert test_subscription.topic.name in out - assert test_subscription.name in out + assert topic in out + +def test_get_subscription_policy(subscription, capsys): + iam.get_subscription_policy(PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + assert subscription in out -def test_set_topic_policy(test_topic): - iam.set_topic_policy(test_topic.name) - policy = test_topic.get_iam_policy() - assert policy.viewers - assert policy['roles/pubsub.publisher'] +def test_set_topic_policy(publisher_client, topic): + iam.set_topic_policy(PROJECT, TOPIC) + policy = publisher_client.get_iam_policy(topic) + assert 'roles/pubsub.publisher' in str(policy) + assert 'allUsers' in str(policy) -def test_set_subscription_policy(test_subscription): - test_subscription.create() - iam.set_subscription_policy( - test_subscription.topic.name, - test_subscription.name) +def test_set_subscription_policy(subscriber_client, subscription): + iam.set_subscription_policy(PROJECT, SUBSCRIPTION) - policy = test_subscription.get_iam_policy() - assert policy.viewers - assert policy.editors + policy = subscriber_client.get_iam_policy(subscription) + assert 'roles/pubsub.viewer' in str(policy) + assert 'allUsers' in str(policy) -def test_check_topic_permissions(test_topic, capsys): - iam.check_topic_permissions(test_topic.name) +def test_check_topic_permissions(topic, capsys): + iam.check_topic_permissions(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert test_topic.name in out + assert topic in out assert 'pubsub.topics.publish' in out -def test_check_subscription_permissions(test_subscription, capsys): - test_subscription.create() - - iam.check_subscription_permissions( - test_subscription.topic.name, - test_subscription.name) +def test_check_subscription_permissions(subscription, capsys): + iam.check_subscription_permissions(PROJECT, SUBSCRIPTION) out, _ = capsys.readouterr() - assert test_subscription.topic.name in out - assert test_subscription.name in out + assert subscription in out assert 'pubsub.subscriptions.consume' in out diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 465047560..4304ddf91 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -23,48 +23,92 @@ import argparse -from google.cloud import pubsub +from google.cloud import pubsub_v1 -def list_topics(): - """Lists all Pub/Sub topics in the current project.""" - pubsub_client = pubsub.Client() +def list_topics(project): + """Lists all Pub/Sub topics in the given project.""" + publisher = pubsub_v1.PublisherClient() + project_path = publisher.project_path(project) - for topic in pubsub_client.list_topics(): - print(topic.name) + for topic in publisher.list_topics(project_path): + print(topic) -def create_topic(topic_name): +def create_topic(project, topic_name): """Create a new Pub/Sub topic.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project, topic_name) - topic.create() + topic = publisher.create_topic(topic_path) - print('Topic {} created.'.format(topic.name)) + print('Topic created: {}'.format(topic)) -def delete_topic(topic_name): +def delete_topic(project, topic_name): """Deletes an existing Pub/Sub topic.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project, topic_name) - topic.delete() + publisher.delete_topic(topic_path) - print('Topic {} deleted.'.format(topic.name)) + print('Topic deleted: {}'.format(topic_path)) -def publish_message(topic_name, data): - """Publishes a message to a Pub/Sub topic with the given data.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) +def publish_messages(project, topic_name): + """Publishes multiple messages to a Pub/Sub topic.""" + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project, topic_name) - # Data must be a bytestring - data = data.encode('utf-8') + for n in range(1, 10): + data = u'Message number {}'.format(n) + # Data must be a bytestring + data = data.encode('utf-8') + publisher.publish(topic_path, data=data) - message_id = topic.publish(data) + print('Published messages.') - print('Message {} published.'.format(message_id)) + +def publish_messages_with_futures(project, topic_name): + """Publishes multiple messages to a Pub/Sub topic and prints their + message IDs.""" + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project, topic_name) + + # When you publish a message, the client returns a Future. This Future + # can be used to track when the message is published. + futures = [] + + for n in range(1, 10): + data = u'Message number {}'.format(n) + # Data must be a bytestring + data = data.encode('utf-8') + message_future = publisher.publish(topic_path, data=data) + futures.append(message_future) + + print('Published message IDs:') + for future in futures: + print(future.result()) + + +def publish_messages_with_batch_settings(project, topic_name): + """Publishes multiple messages to a Pub/Sub topic with batch settings.""" + # Configure the batch to publish once there is one kilobyte of data or + # 1 second has passed. + batch_settings = pubsub_v1.types.BatchSettings( + max_bytes=1024, # One kilobyte + max_latency=1, # One second + ) + publisher = pubsub_v1.PublisherClient(batch_settings) + topic_path = publisher.topic_path(project, topic_name) + + for n in range(1, 10): + data = u'Message number {}'.format(n) + # Data must be a bytestring + data = data.encode('utf-8') + publisher.publish(topic_path, data=data) + + print('Published messages.') if __name__ == '__main__': @@ -72,6 +116,7 @@ def publish_message(topic_name, data): description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) + parser.add_argument('project', help='Your Google Cloud project ID') subparsers = parser.add_subparsers(dest='command') subparsers.add_parser('list', help=list_topics.__doc__) @@ -83,17 +128,30 @@ def publish_message(topic_name, data): delete_parser.add_argument('topic_name') publish_parser = subparsers.add_parser( - 'publish', help=publish_message.__doc__) + 'publish', help=publish_messages.__doc__) publish_parser.add_argument('topic_name') - publish_parser.add_argument('data') + + publish_with_futures_parser = subparsers.add_parser( + 'publish-with-futures', + help=publish_messages_with_futures.__doc__) + publish_with_futures_parser.add_argument('topic_name') + + publish_with_batch_settings_parser = subparsers.add_parser( + 'publish-with-batch-settings', + help=publish_messages_with_batch_settings.__doc__) + publish_with_batch_settings_parser.add_argument('topic_name') args = parser.parse_args() if args.command == 'list': - list_topics() + list_topics(args.project) elif args.command == 'create': - create_topic(args.topic_name) + create_topic(args.project, args.topic_name) elif args.command == 'delete': - delete_topic(args.topic_name) + delete_topic(args.project, args.topic_name) elif args.command == 'publish': - publish_message(args.topic_name, args.data) + publish_messages(args.project, args.topic_name) + elif args.command == 'publish-with-futures': + publish_messages_with_futures(args.project, args.topic_name) + elif args.command == 'publish-with-batch-settings': + publish_messages_with_batch_settings(args.project, args.topic_name) diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index d7ca67098..b400c9f24 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -12,56 +12,84 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os + from gcp_devrel.testing import eventually_consistent -from google.cloud import pubsub +from google.cloud import pubsub_v1 import pytest import publisher -TEST_TOPIC = 'publisher-test-topic' +PROJECT = os.environ['GCLOUD_PROJECT'] +TOPIC = 'publisher-test-topic' + + +@pytest.fixture +def client(): + yield pubsub_v1.PublisherClient() @pytest.fixture -def test_topic(): - client = pubsub.Client() - topic = client.topic(TEST_TOPIC) - yield topic - if topic.exists(): - topic.delete() +def topic(client): + topic_path = client.topic_path(PROJECT, TOPIC) + try: + client.delete_topic(topic_path) + except: + pass -def test_list(test_topic, capsys): - test_topic.create() + client.create_topic(topic_path) + yield topic_path + + +def test_list(client, topic, capsys): @eventually_consistent.call def _(): - publisher.list_topics() + publisher.list_topics(PROJECT) out, _ = capsys.readouterr() - assert test_topic.name in out + assert topic in out -def test_create(test_topic): - publisher.create_topic(test_topic.name) +def test_create(client): + topic_path = client.topic_path(PROJECT, TOPIC) + try: + client.delete_topic(topic_path) + except: + pass + + publisher.create_topic(PROJECT, TOPIC) @eventually_consistent.call def _(): - assert test_topic.exists() - + assert client.get_topic(topic_path) -def test_delete(test_topic): - test_topic.create() - publisher.delete_topic(test_topic.name) +def test_delete(client, topic): + publisher.delete_topic(PROJECT, TOPIC) @eventually_consistent.call def _(): - assert not test_topic.exists() + with pytest.raises(Exception): + client.get_topic(client.topic_path(PROJECT, TOPIC)) -def test_publish(test_topic, capsys): - test_topic.create() +def test_publish(topic, capsys): + publisher.publish_messages(PROJECT, TOPIC) + + out, _ = capsys.readouterr() + assert 'Published' in out + + +def test_publish_with_batch_settings(topic, capsys): + publisher.publish_messages_with_batch_settings(PROJECT, TOPIC) + + out, _ = capsys.readouterr() + assert 'Published' in out + - publisher.publish_message(test_topic.name, 'hello') +def test_publish_with_futures(topic, capsys): + publisher.publish_messages_with_futures(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert 'published' in out + assert 'Published' in out diff --git a/samples/snippets/quickstart.py b/samples/snippets/quickstart.py index fdcb45003..c9823d789 100644 --- a/samples/snippets/quickstart.py +++ b/samples/snippets/quickstart.py @@ -18,21 +18,20 @@ def run_quickstart(): # [START pubsub_quickstart] # Imports the Google Cloud client library - from google.cloud import pubsub + from google.cloud import pubsub_v1 # Instantiates a client - pubsub_client = pubsub.Client() + publisher = pubsub_v1.PublisherClient() - # The name for the new topic - topic_name = 'my-new-topic' + # The resource path for the new topic contains the project ID + # and the topic name. + topic_path = publisher.topic_path( + 'my-project', 'my-new-topic') - # Prepares the new topic - topic = pubsub_client.topic(topic_name) + # Create the topic. + topic = publisher.create_topic(topic_path) - # Creates the new topic - topic.create() - - print('Topic {} created.'.format(topic.name)) + print('Topic created: {}'.format(topic)) # [END pubsub_quickstart] diff --git a/samples/snippets/quickstart_test.py b/samples/snippets/quickstart_test.py index bbb3bd75f..71e157d48 100644 --- a/samples/snippets/quickstart_test.py +++ b/samples/snippets/quickstart_test.py @@ -12,34 +12,36 @@ # See the License for the specific language governing permissions and # limitations under the License. -from google.cloud import pubsub +import os + +from google.cloud import pubsub_v1 +import mock import pytest import quickstart - -# Must match the dataset listed in quickstart.py (there's no easy way to -# extract this). +PROJECT = os.environ['GCLOUD_PROJECT'] +# Must match the dataset listed in quickstart.py TOPIC_NAME = 'my-new-topic' +TOPIC_PATH = 'projects/{}/topics/{}'.format(PROJECT, TOPIC_NAME) @pytest.fixture def temporary_topic(): - """Fixture that ensures the test dataset does not exist before or - after a test.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(TOPIC_NAME) + """Fixture that ensures the test topic does not exist before the test.""" + publisher = pubsub_v1.PublisherClient() - if topic.exists(): - topic.delete() + try: + publisher.delete_topic(TOPIC_PATH) + except: + pass yield - if topic.exists(): - topic.delete() - -def test_quickstart(capsys, temporary_topic): +@mock.patch.object( + pubsub_v1.PublisherClient, 'topic_path', return_value=TOPIC_PATH) +def test_quickstart(unused_topic_path, temporary_topic, capsys): quickstart.run_quickstart() out, _ = capsys.readouterr() assert TOPIC_NAME in out diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index 65c3daf96..6b4c47db9 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1 +1 @@ -google-cloud-pubsub==0.27.0 +google-cloud-pubsub==0.28.2 diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 9a564496e..aef2ab679 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -22,63 +22,83 @@ """ import argparse +import time -from google.cloud import pubsub +from google.cloud import pubsub_v1 -def list_subscriptions(topic_name): +def list_subscriptions(project, topic_name): """Lists all subscriptions for a given topic.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project, topic_name) - for subscription in topic.list_subscriptions(): + for subscription in subscriber.list_subscriptions(topic_path): print(subscription.name) -def create_subscription(topic_name, subscription_name): +def create_subscription(project, topic_name, subscription_name): """Create a new pull subscription on the given topic.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project, topic_name) + subscription_path = subscriber.subscription_path( + project, subscription_name) - subscription = topic.subscription(subscription_name) - subscription.create() + subscription = subscriber.create_subscription( + subscription_path, topic_path) - print('Subscription {} created on topic {}.'.format( - subscription.name, topic.name)) + print('Subscription created: {}'.format(subscription)) -def delete_subscription(topic_name, subscription_name): +def delete_subscription(project, subscription_name): """Deletes an existing Pub/Sub topic.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) - subscription = topic.subscription(subscription_name) + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path( + project, subscription_name) - subscription.delete() + subscriber.delete_subscription(subscription_path) - print('Subscription {} deleted on topic {}.'.format( - subscription.name, topic.name)) + print('Subscription deleted: {}'.format(subscription_path)) -def receive_message(topic_name, subscription_name): - """Receives a message from a pull subscription.""" - pubsub_client = pubsub.Client() - topic = pubsub_client.topic(topic_name) - subscription = topic.subscription(subscription_name) +def receive_messages(project, subscription_name): + """Receives messages from a pull subscription.""" + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path( + project, subscription_name) - # Change return_immediately=False to block until messages are - # received. - results = subscription.pull(return_immediately=True) + def callback(message): + print('Received message: {}'.format(message)) + message.ack() - print('Received {} messages.'.format(len(results))) + subscriber.subscribe(subscription_path, callback=callback) - for ack_id, message in results: - print('* {}: {}, {}'.format( - message.message_id, message.data, message.attributes)) + # The subscriber is non-blocking, so we must keep the main thread from + # exiting to allow it to process messages in the background. + print('Listening for messages on {}'.format(subscription_path)) + while True: + time.sleep(60) - # Acknowledge received messages. If you do not acknowledge, Pub/Sub will - # redeliver the message. - if results: - subscription.acknowledge([ack_id for ack_id, message in results]) + +def receive_messages_with_flow_control(project, subscription_name): + """Receives messages from a pull subscription with flow control.""" + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path( + project, subscription_name) + + def callback(message): + print('Received message: {}'.format(message)) + message.ack() + + # Limit the subscriber to only have ten outstanding messages at a time. + flow_control = pubsub_v1.types.FlowControl(max_messages=10) + subscriber.subscribe( + subscription_path, callback=callback, flow_control=flow_control) + + # The subscriber is non-blocking, so we must keep the main thread from + # exiting to allow it to process messages in the background. + print('Listening for messages on {}'.format(subscription_path)) + while True: + time.sleep(60) if __name__ == '__main__': @@ -86,6 +106,7 @@ def receive_message(topic_name, subscription_name): description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) + parser.add_argument('project', help='Your Google Cloud project ID') subparsers = parser.add_subparsers(dest='command') list_parser = subparsers.add_parser( @@ -99,21 +120,29 @@ def receive_message(topic_name, subscription_name): delete_parser = subparsers.add_parser( 'delete', help=delete_subscription.__doc__) - delete_parser.add_argument('topic_name') delete_parser.add_argument('subscription_name') receive_parser = subparsers.add_parser( - 'receive', help=receive_message.__doc__) - receive_parser.add_argument('topic_name') + 'receive', help=receive_messages.__doc__) receive_parser.add_argument('subscription_name') + receive_with_flow_control_parser = subparsers.add_parser( + 'receive-flow-control', + help=receive_messages_with_flow_control.__doc__) + receive_with_flow_control_parser.add_argument('subscription_name') + args = parser.parse_args() if args.command == 'list': - list_subscriptions(args.topic_name) + list_subscriptions(args.project, args.topic_name) elif args.command == 'create': - create_subscription(args.topic_name, args.subscription_name) + create_subscription( + args.project, args.topic_name, args.subscription_name) elif args.command == 'delete': - delete_subscription(args.topic_name, args.subscription_name) + delete_subscription( + args.project, args.subscription_name) elif args.command == 'receive': - receive_message(args.topic_name, args.subscription_name) + receive_messages(args.project, args.topic_name, args.subscription_name) + elif args.command == 'receive-flow-control': + receive_messages_with_flow_control( + args.project, args.topic_name, args.subscription_name) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 9f7f5a1bf..0acadf437 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -12,76 +12,135 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import time + from gcp_devrel.testing import eventually_consistent -from google.cloud import pubsub +from google.cloud import pubsub_v1 +import mock import pytest import subscriber -TEST_TOPIC = 'subscription-test-topic' -TEST_SUBSCRIPTION = 'subscription-test-subscription' +PROJECT = os.environ['GCLOUD_PROJECT'] +TOPIC = 'subscription-test-topic' +SUBSCRIPTION = 'subscription-test-subscription' + + +@pytest.fixture(scope='module') +def publisher_client(): + yield pubsub_v1.PublisherClient() @pytest.fixture(scope='module') -def test_topic(): - client = pubsub.Client() - topic = client.topic(TEST_TOPIC) +def topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + publisher_client.delete_topic(topic_path) + except: + pass + + publisher_client.create_topic(topic_path) - if not topic.exists(): - topic.create() + yield topic_path - yield topic - if topic.exists(): - topic.delete() +@pytest.fixture(scope='module') +def subscriber_client(): + yield pubsub_v1.SubscriberClient() @pytest.fixture -def test_subscription(test_topic): - subscription = test_topic.subscription(TEST_SUBSCRIPTION) - yield subscription - if subscription.exists(): - subscription.delete() +def subscription(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION) + + try: + subscriber_client.delete_subscription(subscription_path) + except: + pass + + subscriber_client.create_subscription(subscription_path, topic=topic) + yield subscription_path -def test_list(test_subscription, capsys): - test_subscription.create() +def test_list(subscription, capsys): @eventually_consistent.call def _(): - subscriber.list_subscriptions(test_subscription.topic.name) + subscriber.list_subscriptions(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert test_subscription.name in out + assert subscription in out -def test_create(test_subscription): - subscriber.create_subscription( - test_subscription.topic.name, test_subscription.name) +def test_create(subscriber_client): + subscription_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION) + try: + subscriber_client.delete_subscription(subscription_path) + except: + pass + + subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION) @eventually_consistent.call def _(): - assert test_subscription.exists() - + assert subscriber_client.get_subscription(subscription_path) -def test_delete(test_subscription): - test_subscription.create() - subscriber.delete_subscription( - test_subscription.topic.name, test_subscription.name) +def test_delete(subscriber_client, subscription): + subscriber.delete_subscription(PROJECT, SUBSCRIPTION) @eventually_consistent.call def _(): - assert not test_subscription.exists() + with pytest.raises(Exception): + subscriber_client.get_subscription(subscription) -def test_receive(test_subscription, capsys): - topic = test_subscription.topic - test_subscription.create() +def _publish_messages(publisher_client, topic): + for n in range(5): + data = u'Message {}'.format(n).encode('utf-8') + publisher_client.publish( + topic, data=data) - topic.publish('hello'.encode('utf-8')) - @eventually_consistent.call - def _(): - subscriber.receive_message(topic.name, test_subscription.name) - out, _ = capsys.readouterr() - assert 'hello' in out +def _make_sleep_patch(): + real_sleep = time.sleep + + def new_sleep(period): + if period == 60: + real_sleep(5) + raise RuntimeError('sigil') + else: + real_sleep(period) + + return mock.patch('time.sleep', new=new_sleep) + + +def test_receive(publisher_client, topic, subscription, capsys): + _publish_messages(publisher_client, topic) + + with _make_sleep_patch(): + with pytest.raises(RuntimeError, match='sigil'): + subscriber.receive_messages(PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + assert 'Listening' in out + assert subscription in out + assert 'Message 1' in out + + +def test_receive_with_flow_control( + publisher_client, topic, subscription, capsys): + _publish_messages(publisher_client, topic) + + with _make_sleep_patch(): + with pytest.raises(RuntimeError, match='sigil'): + subscriber.receive_messages_with_flow_control( + PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + assert 'Listening' in out + assert subscription in out + assert 'Message 1' in out