From d42f54c7e5ae3ecf752e51afe2b67369a649d0df Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Thu, 12 Dec 2019 12:22:16 -0800 Subject: [PATCH] Pub/Sub: remove infinite while loops in subscriber examples (#2604) * use result() on streaming pull futures instead of infinite while * remove unused imports --- pubsub/cloud-client/publisher_test.py | 75 ++++++----- pubsub/cloud-client/subscriber.py | 98 ++++++++------ pubsub/cloud-client/subscriber_test.py | 180 +++++++++++-------------- 3 files changed, 179 insertions(+), 174 deletions(-) diff --git a/pubsub/cloud-client/publisher_test.py b/pubsub/cloud-client/publisher_test.py index 125fae3c06b9..fbe30694ae45 100644 --- a/pubsub/cloud-client/publisher_test.py +++ b/pubsub/cloud-client/publisher_test.py @@ -25,7 +25,8 @@ UUID = uuid.uuid4().hex PROJECT = os.environ["GCLOUD_PROJECT"] -TOPIC = "publisher-test-topic-" + UUID +TOPIC_ADMIN = "publisher-test-topic-admin-" + UUID +TOPIC_PUBLISH = "publisher-test-topic-publish-" + UUID @pytest.fixture @@ -34,15 +35,30 @@ def client(): @pytest.fixture -def topic(client): - topic_path = client.topic_path(PROJECT, TOPIC) +def topic_admin(client): + topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) try: - response = client.get_topic(topic_path) + topic = client.get_topic(topic_path) except: # noqa - response = client.create_topic(topic_path) + topic = client.create_topic(topic_path) - yield response.name + yield topic.name + # Teardown of `topic_admin` is handled in `test_delete()`. + + +@pytest.fixture +def topic_publish(client): + topic_path = client.topic_path(PROJECT, TOPIC_PUBLISH) + + try: + topic = client.get_topic(topic_path) + except: # noqa + topic = client.create_topic(topic_path) + + yield topic.name + + client.delete_topic(topic.name) def _make_sleep_patch(): @@ -58,83 +74,74 @@ def new_sleep(period): return mock.patch("time.sleep", new=new_sleep) -def _to_delete(): - publisher_client = pubsub_v1.PublisherClient() - publisher_client.delete_topic( - "projects/{}/topics/{}".format(PROJECT, TOPIC) - ) - - -def test_list(client, topic, capsys): +def test_list(client, topic_admin, capsys): @eventually_consistent.call def _(): publisher.list_topics(PROJECT) out, _ = capsys.readouterr() - assert topic in out + assert topic_admin in out def test_create(client): - topic_path = client.topic_path(PROJECT, TOPIC) + topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) try: client.delete_topic(topic_path) except Exception: pass - publisher.create_topic(PROJECT, TOPIC) + publisher.create_topic(PROJECT, TOPIC_ADMIN) @eventually_consistent.call def _(): assert client.get_topic(topic_path) -def test_delete(client, topic): - publisher.delete_topic(PROJECT, TOPIC) +def test_delete(client, topic_admin): + publisher.delete_topic(PROJECT, TOPIC_ADMIN) @eventually_consistent.call def _(): with pytest.raises(Exception): - client.get_topic(client.topic_path(PROJECT, TOPIC)) + client.get_topic(client.topic_path(PROJECT, TOPIC_ADMIN)) -def test_publish(topic, capsys): - publisher.publish_messages(PROJECT, TOPIC) +def test_publish(topic_publish, capsys): + publisher.publish_messages(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_custom_attributes(topic, capsys): - publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC) +def test_publish_with_custom_attributes(topic_publish, capsys): + publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_batch_settings(topic, capsys): - publisher.publish_messages_with_batch_settings(PROJECT, TOPIC) +def test_publish_with_batch_settings(topic_publish, capsys): + publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_retry_settings(topic, capsys): - publisher.publish_messages_with_retry_settings(PROJECT, TOPIC) +def test_publish_with_retry_settings(topic_publish, capsys): + publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_error_handler(topic, capsys): - publisher.publish_messages_with_error_handler(PROJECT, TOPIC) +def test_publish_with_error_handler(topic_publish, capsys): + publisher.publish_messages_with_error_handler(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_futures(topic, capsys): - publisher.publish_messages_with_futures(PROJECT, TOPIC) +def test_publish_with_futures(topic_publish, capsys): + publisher.publish_messages_with_futures(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out - - _to_delete() diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index ea1cc9ff9e72..0d328d232d05 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -161,16 +161,16 @@ def update_subscription(project_id, subscription_name, endpoint): # [END pubsub_update_push_configuration] -def receive_messages(project_id, subscription_name): +def receive_messages(project_id, subscription_name, timeout=None): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_async_pull] # [START pubsub_quickstart_subscriber] - import time - from google.cloud import pubsub_v1 # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO timeout = 5.0 # "How long the subscriber should listen for + # messages in seconds" subscriber = pubsub_v1.SubscriberClient() # The `subscription_path` method creates a fully qualified identifier @@ -183,27 +183,33 @@ def callback(message): print("Received message: {}".format(message)) message.ack() - subscriber.subscribe(subscription_path, callback=callback) + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback + ) + print("Listening for messages on {}..\n".format(subscription_path)) - # The subscriber is non-blocking. We must keep the main thread from - # exiting to allow it to process messages asynchronously in the background. - print("Listening for messages on {}".format(subscription_path)) - while True: - time.sleep(60) + # result() in a future will block indefinitely if `timeout` is not set, + # unless an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except: # noqa + streaming_pull_future.cancel() # [END pubsub_subscriber_async_pull] # [END pubsub_quickstart_subscriber] -def receive_messages_with_custom_attributes(project_id, subscription_name): +def receive_messages_with_custom_attributes( + project_id, subscription_name, timeout=None +): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_sync_pull_custom_attributes] # [START pubsub_subscriber_async_pull_custom_attributes] - import time - from google.cloud import pubsub_v1 # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO timeout = 5.0 # "How long the subscriber should listen for + # messages in seconds" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( @@ -219,26 +225,32 @@ def callback(message): print("{}: {}".format(key, value)) message.ack() - subscriber.subscribe(subscription_path, callback=callback) + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback + ) + print("Listening for messages on {}..\n".format(subscription_path)) - # The subscriber is non-blocking, so we must keep the main thread from - # exiting to allow it to process messages in the background. - print("Listening for messages on {}".format(subscription_path)) - while True: - time.sleep(60) + # result() in a future will block indefinitely if `timeout` is not set, + # unless an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except: # noqa + streaming_pull_future.cancel() # [END pubsub_subscriber_async_pull_custom_attributes] # [END pubsub_subscriber_sync_pull_custom_attributes] -def receive_messages_with_flow_control(project_id, subscription_name): +def receive_messages_with_flow_control( + project_id, subscription_name, timeout=None +): """Receives messages from a pull subscription with flow control.""" # [START pubsub_subscriber_flow_settings] - import time - from google.cloud import pubsub_v1 # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO timeout = 5.0 # "How long the subscriber should listen for + # messages in seconds" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( @@ -251,15 +263,18 @@ def callback(message): # Limit the subscriber to only have ten outstanding messages at a time. flow_control = pubsub_v1.types.FlowControl(max_messages=10) - subscriber.subscribe( + + streaming_pull_future = subscriber.subscribe( subscription_path, callback=callback, flow_control=flow_control ) + print("Listening for messages on {}..\n".format(subscription_path)) - # The subscriber is non-blocking, so we must keep the main thread from - # exiting to allow it to process messages in the background. - print("Listening for messages on {}".format(subscription_path)) - while True: - time.sleep(60) + # result() in a future will block indefinitely if `timeout` is not set, + # unless an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except: # noqa + streaming_pull_future.cancel() # [END pubsub_subscriber_flow_settings] @@ -386,13 +401,15 @@ def worker(msg): # [END pubsub_subscriber_sync_pull_with_lease] -def listen_for_errors(project_id, subscription_name): +def listen_for_errors(project_id, subscription_name, timeout=None): """Receives messages and catches errors from a pull subscription.""" # [START pubsub_subscriber_error_listener] from google.cloud import pubsub_v1 # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pubsub subscription name" + # TODO timeout = 5.0 # "How long the subscriber should listen for + # messages in seconds" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( @@ -403,16 +420,19 @@ def callback(message): print("Received message: {}".format(message)) message.ack() - future = subscriber.subscribe(subscription_path, callback=callback) + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback + ) + print("Listening for messages on {}..\n".format(subscription_path)) - # Blocks the thread while messages are coming in through the stream. Any - # exceptions that crop up on the thread will be set on the future. + # result() in a future will block indefinitely if `timeout` is not set, + # unless an exception is encountered first. try: - # When timeout is unspecified, the result method waits indefinitely. - future.result(timeout=30) + streaming_pull_future.result(timeout=timeout) except Exception as e: + streaming_pull_future.cancel() print( - "Listening for messages on {} threw an Exception: {}.".format( + "Listening for messages on {} threw an exception: {}.".format( subscription_name, e ) ) @@ -518,14 +538,14 @@ def callback(message): args.project_id, args.subscription_name, args.endpoint ) elif args.command == "receive": - receive_messages(args.project_id, args.subscription_name) + receive_messages(args.project_id, args.subscription_name, args.timeout) elif args.command == "receive-custom-attributes": receive_messages_with_custom_attributes( - args.project_id, args.subscription_name + args.project_id, args.subscription_name, args.timeout ) elif args.command == "receive-flow-control": receive_messages_with_flow_control( - args.project_id, args.subscription_name + args.project_id, args.subscription_name, args.timeout ) elif args.command == "receive-synchronously": synchronous_pull(args.project_id, args.subscription_name) @@ -534,4 +554,6 @@ def callback(message): args.project_id, args.subscription_name ) elif args.command == "listen_for_errors": - listen_for_errors(args.project_id, args.subscription_name) + listen_for_errors( + args.project_id, args.subscription_name, args.timeout + ) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 0645c0738e1c..50353c1c6e42 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -13,12 +13,10 @@ # limitations under the License. import os -import time import uuid from gcp_devrel.testing import eventually_consistent from google.cloud import pubsub_v1 -import mock import pytest import subscriber @@ -26,9 +24,9 @@ UUID = uuid.uuid4().hex PROJECT = os.environ["GCLOUD_PROJECT"] TOPIC = "subscription-test-topic-" + UUID -SUBSCRIPTION_ONE = "subscription-test-subscription-one-" + UUID -SUBSCRIPTION_TWO = "subscription-test-subscription-two-" + UUID -SUBSCRIPTION_THREE = "subscription-test-subscription-three-" + UUID +SUBSCRIPTION_ADMIN = "subscription-test-subscription-admin-" + UUID +SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID +SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) @@ -43,11 +41,13 @@ def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - response = publisher_client.get_topic(topic_path) + subscription = publisher_client.get_topic(topic_path) except: # noqa - response = publisher_client.create_topic(topic_path) + subscription = publisher_client.create_topic(topic_path) - yield response.name + yield subscription.name + + publisher_client.delete_topic(subscription.name) @pytest.fixture(scope="module") @@ -56,72 +56,76 @@ def subscriber_client(): @pytest.fixture(scope="module") -def subscription_one(subscriber_client, topic): +def subscription_admin(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE + PROJECT, SUBSCRIPTION_ADMIN ) try: - response = subscriber_client.get_subscription(subscription_path) + subscription = subscriber_client.get_subscription(subscription_path) except: # noqa - response = subscriber_client.create_subscription( + subscription = subscriber_client.create_subscription( subscription_path, topic=topic ) - yield response.name + yield subscription.name @pytest.fixture(scope="module") -def subscription_two(subscriber_client, topic): +def subscription_sync(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_TWO + PROJECT, SUBSCRIPTION_SYNC ) try: - response = subscriber_client.get_subscription(subscription_path) + subscription = subscriber_client.get_subscription(subscription_path) except: # noqa - response = subscriber_client.create_subscription( + subscription = subscriber_client.create_subscription( subscription_path, topic=topic ) - yield response.name + yield subscription.name + + subscriber_client.delete_subscription(subscription.name) @pytest.fixture(scope="module") -def subscription_three(subscriber_client, topic): +def subscription_async(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_THREE + PROJECT, SUBSCRIPTION_ASYNC ) try: - response = subscriber_client.get_subscription(subscription_path) + subscription = subscriber_client.get_subscription(subscription_path) except: # noqa - response = subscriber_client.create_subscription( + subscription = subscriber_client.create_subscription( subscription_path, topic=topic ) - yield response.name + yield subscription.name + + subscriber_client.delete_subscription(subscription.name) -def test_list_in_topic(subscription_one, capsys): +def test_list_in_topic(subscription_admin, capsys): @eventually_consistent.call def _(): subscriber.list_subscriptions_in_topic(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert subscription_one in out + assert subscription_admin in out -def test_list_in_project(subscription_one, capsys): +def test_list_in_project(subscription_admin, capsys): @eventually_consistent.call def _(): subscriber.list_subscriptions_in_project(PROJECT) out, _ = capsys.readouterr() - assert subscription_one in out + assert subscription_admin in out def test_create(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE + PROJECT, SUBSCRIPTION_ADMIN ) try: @@ -129,7 +133,7 @@ def test_create(subscriber_client): except Exception: pass - subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION_ONE) + subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN) @eventually_consistent.call def _(): @@ -138,7 +142,7 @@ def _(): def test_create_push(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE + PROJECT, SUBSCRIPTION_ADMIN ) try: subscriber_client.delete_subscription(subscription_path) @@ -146,7 +150,7 @@ def test_create_push(subscriber_client): pass subscriber.create_push_subscription( - PROJECT, TOPIC, SUBSCRIPTION_ONE, ENDPOINT + PROJECT, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT ) @eventually_consistent.call @@ -154,135 +158,107 @@ def _(): assert subscriber_client.get_subscription(subscription_path) -def test_update(subscriber_client, subscription_one, capsys): - subscriber.update_subscription(PROJECT, SUBSCRIPTION_ONE, NEW_ENDPOINT) +def test_update(subscriber_client, subscription_admin, capsys): + subscriber.update_subscription(PROJECT, SUBSCRIPTION_ADMIN, NEW_ENDPOINT) out, _ = capsys.readouterr() assert "Subscription updated" in out -def test_delete(subscriber_client, subscription_one): - subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ONE) +def test_delete(subscriber_client, subscription_admin): + subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ADMIN) @eventually_consistent.call def _(): with pytest.raises(Exception): - subscriber_client.get_subscription(subscription_one) + subscriber_client.get_subscription(subscription_admin) def _publish_messages(publisher_client, topic): for n in range(5): - data = u"Message {}".format(n).encode("utf-8") - future = publisher_client.publish(topic, data=data) - future.result() - - -def _publish_messages_with_custom_attributes(publisher_client, topic): - data = u"Test message".encode("utf-8") - future = publisher_client.publish(topic, data=data, origin="python-sample") - future.result() - - -def _make_sleep_patch(): - real_sleep = time.sleep - - def new_sleep(period): - if period == 60: - real_sleep(5) - raise RuntimeError("sigil") - else: - real_sleep(period) - - return mock.patch("time.sleep", new=new_sleep) - - -def _to_delete(): - publisher_client = pubsub_v1.PublisherClient() - subscriber_client = pubsub_v1.SubscriberClient() - resources = [TOPIC, SUBSCRIPTION_TWO, SUBSCRIPTION_THREE] - - for item in resources: - if "subscription-test-topic" in item: - publisher_client.delete_topic( - "projects/{}/topics/{}".format(PROJECT, item) - ) - if "subscription-test-subscription" in item: - subscriber_client.delete_subscription( - "projects/{}/subscriptions/{}".format(PROJECT, item) - ) + data = u"message {}".format(n).encode("utf-8") + publish_future = publisher_client.publish( + topic, data=data, origin="python-sample" + ) + publish_future.result() -def test_receive(publisher_client, topic, subscription_two, capsys): +def test_receive(publisher_client, topic, subscription_async, capsys): _publish_messages(publisher_client, topic) - with _make_sleep_patch(): - with pytest.raises(RuntimeError, match="sigil"): - subscriber.receive_messages(PROJECT, SUBSCRIPTION_TWO) + subscriber.receive_messages(PROJECT, SUBSCRIPTION_ASYNC, 5) out, _ = capsys.readouterr() assert "Listening" in out - assert subscription_two in out - assert "Message" in out + assert subscription_async in out + assert "message" in out def test_receive_with_custom_attributes( - publisher_client, topic, subscription_two, capsys + publisher_client, topic, subscription_async, capsys ): - _publish_messages_with_custom_attributes(publisher_client, topic) + _publish_messages(publisher_client, topic) - with _make_sleep_patch(): - with pytest.raises(RuntimeError, match="sigil"): - subscriber.receive_messages_with_custom_attributes( - PROJECT, SUBSCRIPTION_TWO - ) + subscriber.receive_messages_with_custom_attributes( + PROJECT, SUBSCRIPTION_ASYNC, 5 + ) out, _ = capsys.readouterr() - assert "Test message" in out + assert "message" in out assert "origin" in out assert "python-sample" in out def test_receive_with_flow_control( - publisher_client, topic, subscription_two, capsys + publisher_client, topic, subscription_async, capsys ): _publish_messages(publisher_client, topic) - with _make_sleep_patch(): - with pytest.raises(RuntimeError, match="sigil"): - subscriber.receive_messages_with_flow_control( - PROJECT, SUBSCRIPTION_TWO - ) + subscriber.receive_messages_with_flow_control( + PROJECT, SUBSCRIPTION_ASYNC, 5 + ) out, _ = capsys.readouterr() assert "Listening" in out - assert subscription_two in out - assert "Message" in out + assert subscription_async in out + assert "message" in out def test_receive_synchronously( - publisher_client, topic, subscription_three, capsys + publisher_client, topic, subscription_sync, capsys ): _publish_messages(publisher_client, topic) - subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_THREE) + subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC) out, _ = capsys.readouterr() assert "Done." in out def test_receive_synchronously_with_lease( - publisher_client, topic, subscription_three, capsys + publisher_client, topic, subscription_sync, capsys ): _publish_messages(publisher_client, topic) subscriber.synchronous_pull_with_lease_management( - PROJECT, SUBSCRIPTION_THREE + PROJECT, SUBSCRIPTION_SYNC ) out, _ = capsys.readouterr() assert "Done." in out - # Clean up resources after all the tests. - _to_delete() + +def test_listen_for_errors( + publisher_client, topic, subscription_async, capsys +): + + _publish_messages(publisher_client, topic) + + subscriber.listen_for_errors(PROJECT, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async in out + assert "threw an exception" in out