From fbf6f4af144d3ee21200c1c18a402c06a53ccfd5 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 21 Oct 2019 13:56:39 -0700 Subject: [PATCH] Pub/Sub: update how subscriber client listens to StreamingPullFuture (#2475) * update sub.py & requirements.txt * fix flaky subscriber test with separate subscriptions --- pubsub/cloud-client/iam_test.py | 2 +- pubsub/cloud-client/publisher.py | 2 +- pubsub/cloud-client/publisher_test.py | 12 +- pubsub/cloud-client/quickstart.py | 2 +- pubsub/cloud-client/quickstart/sub.py | 19 +-- pubsub/cloud-client/quickstart/sub_test.py | 91 +++++------ pubsub/cloud-client/quickstart_test.py | 2 +- pubsub/cloud-client/subscriber.py | 2 +- pubsub/cloud-client/subscriber_test.py | 180 ++++++++++----------- 9 files changed, 148 insertions(+), 164 deletions(-) diff --git a/pubsub/cloud-client/iam_test.py b/pubsub/cloud-client/iam_test.py index cfae98ffd00b..8a524c35a061 100644 --- a/pubsub/cloud-client/iam_test.py +++ b/pubsub/cloud-client/iam_test.py @@ -1,4 +1,4 @@ -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index 76554d0258fe..490c903b2c1b 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2019 Google LLC. All Rights Reserved. +# Copyright 2016 Google LLC. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/pubsub/cloud-client/publisher_test.py b/pubsub/cloud-client/publisher_test.py index b364553c2d41..5e550abd641d 100644 --- a/pubsub/cloud-client/publisher_test.py +++ b/pubsub/cloud-client/publisher_test.py @@ -1,4 +1,4 @@ -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -36,13 +36,11 @@ def topic(client): topic_path = client.topic_path(PROJECT, TOPIC) try: - client.delete_topic(topic_path) - except Exception: - pass - - client.create_topic(topic_path) + response = client.get_topic(topic_path) + except: # noqa + response = client.create_topic(topic_path) - yield topic_path + yield response.name def _make_sleep_patch(): diff --git a/pubsub/cloud-client/quickstart.py b/pubsub/cloud-client/quickstart.py index 10ff76f9b632..f48d085e06b5 100644 --- a/pubsub/cloud-client/quickstart.py +++ b/pubsub/cloud-client/quickstart.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/pubsub/cloud-client/quickstart/sub.py b/pubsub/cloud-client/quickstart/sub.py index 520803d70a5b..e39f14105b1a 100644 --- a/pubsub/cloud-client/quickstart/sub.py +++ b/pubsub/cloud-client/quickstart/sub.py @@ -16,7 +16,6 @@ # [START pubsub_quickstart_sub_all] import argparse -import time # [START pubsub_quickstart_sub_deps] from google.cloud import pubsub_v1 # [END pubsub_quickstart_sub_deps] @@ -34,20 +33,22 @@ def sub(project_id, subscription_name): project_id, subscription_name) def callback(message): - print('Received message {} of message ID {}'.format( + print('Received message {} of message ID {}\n'.format( message, message.message_id)) # Acknowledge the message. Unack'ed messages will be redelivered. message.ack() - print('Acknowledged message of message ID {}\n'.format( - message.message_id)) + print('Acknowledged message {}\n'.format(message.message_id)) - client.subscribe(subscription_path, callback=callback) + streaming_pull_future = client.subscribe( + subscription_path, callback=callback) print('Listening for messages on {}..\n'.format(subscription_path)) - # Keep the main thread from exiting so the subscriber can - # process messages in the background. - while True: - time.sleep(60) + # Calling result() on StreamingPullFuture keeps the main thread from + # exiting while messages get processed in the callbacks. + try: + streaming_pull_future.result() + except: # noqa + streaming_pull_future.cancel() if __name__ == '__main__': diff --git a/pubsub/cloud-client/quickstart/sub_test.py b/pubsub/cloud-client/quickstart/sub_test.py index 9c70384ed693..476139a02642 100644 --- a/pubsub/cloud-client/quickstart/sub_test.py +++ b/pubsub/cloud-client/quickstart/sub_test.py @@ -14,10 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import mock import os import pytest -import time from google.api_core.exceptions import AlreadyExists from google.cloud import pubsub_v1 @@ -29,84 +27,79 @@ TOPIC = 'quickstart-sub-test-topic' SUBSCRIPTION = 'quickstart-sub-test-topic-sub' - -@pytest.fixture(scope='module') -def publisher_client(): - yield pubsub_v1.PublisherClient() +publisher_client = pubsub_v1.PublisherClient() +subscriber_client = pubsub_v1.SubscriberClient() @pytest.fixture(scope='module') -def topic_path(publisher_client): +def topic_path(): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - publisher_client.create_topic(topic_path) + topic = publisher_client.create_topic(topic_path) + return topic.name except AlreadyExists: - pass - - yield topic_path - - -@pytest.fixture(scope='module') -def subscriber_client(): - yield pubsub_v1.SubscriberClient() + return topic_path @pytest.fixture(scope='module') -def subscription(subscriber_client, topic_path): +def subscription_path(topic_path): subscription_path = subscriber_client.subscription_path( PROJECT, SUBSCRIPTION) try: - subscriber_client.create_subscription(subscription_path, topic_path) + subscription = subscriber_client.create_subscription( + subscription_path, topic_path) + return subscription.name except AlreadyExists: - pass - - yield SUBSCRIPTION + return subscription_path -@pytest.fixture -def to_delete(publisher_client, subscriber_client): - doomed = [] - yield doomed - for client, item in doomed: +def _to_delete(resource_paths): + for item in resource_paths: if 'topics' in item: publisher_client.delete_topic(item) if 'subscriptions' in item: subscriber_client.delete_subscription(item) -def _make_sleep_patch(): - real_sleep = time.sleep +def _publish_messages(topic_path): + publish_future = publisher_client.publish(topic_path, data=b'Hello World!') + publish_future.result() + - def new_sleep(period): - if period == 60: - real_sleep(10) - raise RuntimeError('sigil') - else: - real_sleep(period) +def _sub_timeout(project_id, subscription_name): + # This is an exactly copy of `sub.py` except + # StreamingPullFuture.result() will time out after 10s. + client = pubsub_v1.SubscriberClient() + subscription_path = client.subscription_path( + project_id, subscription_name) - return mock.patch('time.sleep', new=new_sleep) + def callback(message): + print('Received message {} of message ID {}\n'.format( + message, message.message_id)) + message.ack() + print('Acknowledged message {}\n'.format(message.message_id)) + streaming_pull_future = client.subscribe( + subscription_path, callback=callback) + print('Listening for messages on {}..\n'.format(subscription_path)) + + try: + streaming_pull_future.result(timeout=10) + except: # noqa + streaming_pull_future.cancel() -def test_sub(publisher_client, - topic_path, - subscriber_client, - subscription, - to_delete, - capsys): - publisher_client.publish(topic_path, data=b'Hello, World!') +def test_sub(monkeypatch, topic_path, subscription_path, capsys): + monkeypatch.setattr(sub, 'sub', _sub_timeout) - to_delete.append((publisher_client, topic_path)) + _publish_messages(topic_path) - with _make_sleep_patch(): - with pytest.raises(RuntimeError, match='sigil'): - sub.sub(PROJECT, subscription) + sub.sub(PROJECT, SUBSCRIPTION) - to_delete.append((subscriber_client, - 'projects/{}/subscriptions/{}'.format(PROJECT, - SUBSCRIPTION))) + # Clean up resources. + _to_delete([topic_path, subscription_path]) out, _ = capsys.readouterr() assert "Received message" in out diff --git a/pubsub/cloud-client/quickstart_test.py b/pubsub/cloud-client/quickstart_test.py index 3fce09dc8f5b..d318b260c63c 100644 --- a/pubsub/cloud-client/quickstart_test.py +++ b/pubsub/cloud-client/quickstart_test.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index dbaa396cddd5..3bbad0ead1b0 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 1a5de9930fa5..4c5fd61223db 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -1,4 +1,4 @@ -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,9 +16,7 @@ import time from gcp_devrel.testing import eventually_consistent -from gcp_devrel.testing.flaky import flaky from google.cloud import pubsub_v1 -import google.api_core.exceptions import mock import pytest @@ -26,9 +24,9 @@ PROJECT = os.environ['GCLOUD_PROJECT'] TOPIC = 'subscription-test-topic' -SUBSCRIPTION = 'subscription-test-subscription' -SUBSCRIPTION_SYNC1 = 'subscription-test-subscription-sync1' -SUBSCRIPTION_SYNC2 = 'subscription-test-subscription-sync2' +SUBSCRIPTION_ONE = 'subscription-test-subscription-one' +SUBSCRIPTION_TWO = 'subscription-test-subscription-two' +SUBSCRIPTION_THREE = 'subscription-test-subscription-three' ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT) NEW_ENDPOINT = 'https://{}.appspot.com/push2'.format(PROJECT) @@ -43,13 +41,11 @@ def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - publisher_client.delete_topic(topic_path) - except Exception: - pass - - publisher_client.create_topic(topic_path) + response = publisher_client.get_topic(topic_path) + except: # noqa + response = publisher_client.create_topic(topic_path) - yield topic_path + yield response.name @pytest.fixture(scope='module') @@ -57,79 +53,74 @@ def subscriber_client(): yield pubsub_v1.SubscriberClient() -@pytest.fixture -def subscription(subscriber_client, topic): +@pytest.fixture(scope='module') +def subscription_one(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION_ONE) try: - subscriber_client.delete_subscription(subscription_path) - except Exception: - pass + response = subscriber_client.get_subscription(subscription_path) + except: # noqa + response = subscriber_client.create_subscription( + subscription_path, topic=topic) - try: - subscriber_client.create_subscription(subscription_path, topic=topic) - except google.api_core.exceptions.AlreadyExists: - pass + yield response.name - yield subscription_path - -@pytest.fixture -def subscription_sync1(subscriber_client, topic): - subscription_sync_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_SYNC1) +@pytest.fixture(scope='module') +def subscription_two(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION_TWO) try: - subscriber_client.delete_subscription(subscription_sync_path) - except Exception: - pass - - subscriber_client.create_subscription(subscription_sync_path, topic=topic) + response = subscriber_client.get_subscription(subscription_path) + except: # noqa + response = subscriber_client.create_subscription( + subscription_path, topic=topic) - yield subscription_sync_path + yield response.name -@pytest.fixture -def subscription_sync2(subscriber_client, topic): - subscription_sync_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_SYNC2) +@pytest.fixture(scope='module') +def subscription_three(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION_THREE) try: - subscriber_client.delete_subscription(subscription_sync_path) - except Exception: - pass - - subscriber_client.create_subscription(subscription_sync_path, topic=topic) + response = subscriber_client.get_subscription(subscription_path) + except: # noqa + response = subscriber_client.create_subscription( + subscription_path, topic=topic) - yield subscription_sync_path + yield response.name -def test_list_in_topic(subscription, capsys): +def test_list_in_topic(subscription_one, capsys): @eventually_consistent.call def _(): subscriber.list_subscriptions_in_topic(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert subscription in out + assert subscription_one in out -def test_list_in_project(subscription, capsys): +def test_list_in_project(subscription_one, capsys): @eventually_consistent.call def _(): subscriber.list_subscriptions_in_project(PROJECT) out, _ = capsys.readouterr() - assert subscription in out + assert subscription_one in out def test_create(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION_ONE) + try: subscriber_client.delete_subscription(subscription_path) except Exception: pass - subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION) + subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION_ONE) @eventually_consistent.call def _(): @@ -138,40 +129,40 @@ def _(): def test_create_push(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION_ONE) try: subscriber_client.delete_subscription(subscription_path) except Exception: pass - subscriber.create_push_subscription(PROJECT, TOPIC, SUBSCRIPTION, ENDPOINT) + subscriber.create_push_subscription( + PROJECT, TOPIC, SUBSCRIPTION_ONE, ENDPOINT) @eventually_consistent.call def _(): assert subscriber_client.get_subscription(subscription_path) -def test_delete(subscriber_client, subscription): - subscriber.delete_subscription(PROJECT, SUBSCRIPTION) +def test_update(subscriber_client, subscription_one, capsys): + subscriber.update_subscription(PROJECT, SUBSCRIPTION_ONE, NEW_ENDPOINT) - @eventually_consistent.call - def _(): - with pytest.raises(Exception): - subscriber_client.get_subscription(subscription) + out, _ = capsys.readouterr() + assert 'Subscription updated' in out -def test_update(subscriber_client, subscription, capsys): - subscriber.update_subscription(PROJECT, SUBSCRIPTION, NEW_ENDPOINT) +def test_delete(subscriber_client, subscription_one): + subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ONE) - out, _ = capsys.readouterr() - assert 'Subscription updated' in out + @eventually_consistent.call + def _(): + with pytest.raises(Exception): + subscriber_client.get_subscription(subscription_one) def _publish_messages(publisher_client, topic): for n in range(5): data = u'Message {}'.format(n).encode('utf-8') - future = publisher_client.publish( - topic, data=data) + future = publisher_client.publish(topic, data=data) future.result() @@ -194,49 +185,28 @@ def new_sleep(period): return mock.patch('time.sleep', new=new_sleep) -@flaky -def test_receive(publisher_client, topic, subscription, capsys): +def test_receive(publisher_client, topic, subscription_two, capsys): _publish_messages(publisher_client, topic) with _make_sleep_patch(): with pytest.raises(RuntimeError, match='sigil'): - subscriber.receive_messages(PROJECT, SUBSCRIPTION) + subscriber.receive_messages(PROJECT, SUBSCRIPTION_TWO) out, _ = capsys.readouterr() assert 'Listening' in out - assert subscription in out + assert subscription_two in out assert 'Message 1' in out -def test_receive_synchronously( - publisher_client, topic, subscription_sync1, capsys): - _publish_messages(publisher_client, topic) - - subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC1) - - out, _ = capsys.readouterr() - assert 'Done.' in out - - -def test_receive_synchronously_with_lease( - publisher_client, topic, subscription_sync2, capsys): - _publish_messages(publisher_client, topic) - - subscriber.synchronous_pull_with_lease_management( - PROJECT, SUBSCRIPTION_SYNC2) - - out, _ = capsys.readouterr() - assert 'Done.' in out - - def test_receive_with_custom_attributes( - publisher_client, topic, subscription, capsys): + publisher_client, topic, subscription_two, capsys): + _publish_messages_with_custom_attributes(publisher_client, topic) with _make_sleep_patch(): with pytest.raises(RuntimeError, match='sigil'): subscriber.receive_messages_with_custom_attributes( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION_TWO) out, _ = capsys.readouterr() assert 'Test message' in out @@ -245,15 +215,37 @@ def test_receive_with_custom_attributes( def test_receive_with_flow_control( - publisher_client, topic, subscription, capsys): + publisher_client, topic, subscription_two, capsys): + _publish_messages(publisher_client, topic) with _make_sleep_patch(): with pytest.raises(RuntimeError, match='sigil'): subscriber.receive_messages_with_flow_control( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION_TWO) out, _ = capsys.readouterr() assert 'Listening' in out - assert subscription in out + assert subscription_two in out assert 'Message 1' in out + + +def test_receive_synchronously( + publisher_client, topic, subscription_three, capsys): + _publish_messages(publisher_client, topic) + + subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_THREE) + + out, _ = capsys.readouterr() + assert 'Done.' in out + + +def test_receive_synchronously_with_lease( + publisher_client, topic, subscription_three, capsys): + _publish_messages(publisher_client, topic) + + subscriber.synchronous_pull_with_lease_management( + PROJECT, SUBSCRIPTION_THREE) + + out, _ = capsys.readouterr() + assert 'Done.' in out