Skip to content

Commit

Permalink
Pub/Sub: remove infinite while loops in subscriber examples (#2604)
Browse files Browse the repository at this point in the history
* use result() on streaming pull futures instead of infinite while
* remove unused imports
  • Loading branch information
anguillanneuf authored and busunkim96 committed Dec 12, 2019
1 parent 2329466 commit d42f54c
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 174 deletions.
75 changes: 41 additions & 34 deletions pubsub/cloud-client/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

UUID = uuid.uuid4().hex
PROJECT = os.environ["GCLOUD_PROJECT"]
TOPIC = "publisher-test-topic-" + UUID
TOPIC_ADMIN = "publisher-test-topic-admin-" + UUID
TOPIC_PUBLISH = "publisher-test-topic-publish-" + UUID


@pytest.fixture
Expand All @@ -34,15 +35,30 @@ def client():


@pytest.fixture
def topic(client):
topic_path = client.topic_path(PROJECT, TOPIC)
def topic_admin(client):
topic_path = client.topic_path(PROJECT, TOPIC_ADMIN)

try:
response = client.get_topic(topic_path)
topic = client.get_topic(topic_path)
except: # noqa
response = client.create_topic(topic_path)
topic = client.create_topic(topic_path)

yield response.name
yield topic.name
# Teardown of `topic_admin` is handled in `test_delete()`.


@pytest.fixture
def topic_publish(client):
topic_path = client.topic_path(PROJECT, TOPIC_PUBLISH)

try:
topic = client.get_topic(topic_path)
except: # noqa
topic = client.create_topic(topic_path)

yield topic.name

client.delete_topic(topic.name)


def _make_sleep_patch():
Expand All @@ -58,83 +74,74 @@ def new_sleep(period):
return mock.patch("time.sleep", new=new_sleep)


def _to_delete():
publisher_client = pubsub_v1.PublisherClient()
publisher_client.delete_topic(
"projects/{}/topics/{}".format(PROJECT, TOPIC)
)


def test_list(client, topic, capsys):
def test_list(client, topic_admin, capsys):
@eventually_consistent.call
def _():
publisher.list_topics(PROJECT)
out, _ = capsys.readouterr()
assert topic in out
assert topic_admin in out


def test_create(client):
topic_path = client.topic_path(PROJECT, TOPIC)
topic_path = client.topic_path(PROJECT, TOPIC_ADMIN)
try:
client.delete_topic(topic_path)
except Exception:
pass

publisher.create_topic(PROJECT, TOPIC)
publisher.create_topic(PROJECT, TOPIC_ADMIN)

@eventually_consistent.call
def _():
assert client.get_topic(topic_path)


def test_delete(client, topic):
publisher.delete_topic(PROJECT, TOPIC)
def test_delete(client, topic_admin):
publisher.delete_topic(PROJECT, TOPIC_ADMIN)

@eventually_consistent.call
def _():
with pytest.raises(Exception):
client.get_topic(client.topic_path(PROJECT, TOPIC))
client.get_topic(client.topic_path(PROJECT, TOPIC_ADMIN))


def test_publish(topic, capsys):
publisher.publish_messages(PROJECT, TOPIC)
def test_publish(topic_publish, capsys):
publisher.publish_messages(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out


def test_publish_with_custom_attributes(topic, capsys):
publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC)
def test_publish_with_custom_attributes(topic_publish, capsys):
publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out


def test_publish_with_batch_settings(topic, capsys):
publisher.publish_messages_with_batch_settings(PROJECT, TOPIC)
def test_publish_with_batch_settings(topic_publish, capsys):
publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out


def test_publish_with_retry_settings(topic, capsys):
publisher.publish_messages_with_retry_settings(PROJECT, TOPIC)
def test_publish_with_retry_settings(topic_publish, capsys):
publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out


def test_publish_with_error_handler(topic, capsys):
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)
def test_publish_with_error_handler(topic_publish, capsys):
publisher.publish_messages_with_error_handler(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out


def test_publish_with_futures(topic, capsys):
publisher.publish_messages_with_futures(PROJECT, TOPIC)
def test_publish_with_futures(topic_publish, capsys):
publisher.publish_messages_with_futures(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out

_to_delete()
98 changes: 60 additions & 38 deletions pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,16 @@ def update_subscription(project_id, subscription_name, endpoint):
# [END pubsub_update_push_configuration]


def receive_messages(project_id, subscription_name):
def receive_messages(project_id, subscription_name, timeout=None):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_async_pull]
# [START pubsub_quickstart_subscriber]
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO timeout = 5.0 # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
Expand All @@ -183,27 +183,33 @@ def callback(message):
print("Received message: {}".format(message))
message.ack()

subscriber.subscribe(subscription_path, callback=callback)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
time.sleep(60)
# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_async_pull]
# [END pubsub_quickstart_subscriber]


def receive_messages_with_custom_attributes(project_id, subscription_name):
def receive_messages_with_custom_attributes(
project_id, subscription_name, timeout=None
):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_sync_pull_custom_attributes]
# [START pubsub_subscriber_async_pull_custom_attributes]
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO timeout = 5.0 # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
Expand All @@ -219,26 +225,32 @@ def callback(message):
print("{}: {}".format(key, value))
message.ack()

subscriber.subscribe(subscription_path, callback=callback)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
time.sleep(60)
# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_async_pull_custom_attributes]
# [END pubsub_subscriber_sync_pull_custom_attributes]


def receive_messages_with_flow_control(project_id, subscription_name):
def receive_messages_with_flow_control(
project_id, subscription_name, timeout=None
):
"""Receives messages from a pull subscription with flow control."""
# [START pubsub_subscriber_flow_settings]
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO timeout = 5.0 # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
Expand All @@ -251,15 +263,18 @@ def callback(message):

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
subscriber.subscribe(

streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback, flow_control=flow_control
)
print("Listening for messages on {}..\n".format(subscription_path))

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
time.sleep(60)
# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_flow_settings]


Expand Down Expand Up @@ -386,13 +401,15 @@ def worker(msg):
# [END pubsub_subscriber_sync_pull_with_lease]


def listen_for_errors(project_id, subscription_name):
def listen_for_errors(project_id, subscription_name, timeout=None):
"""Receives messages and catches errors from a pull subscription."""
# [START pubsub_subscriber_error_listener]
from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pubsub subscription name"
# TODO timeout = 5.0 # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
Expand All @@ -403,16 +420,19 @@ def callback(message):
print("Received message: {}".format(message))
message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))

# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
# When timeout is unspecified, the result method waits indefinitely.
future.result(timeout=30)
streaming_pull_future.result(timeout=timeout)
except Exception as e:
streaming_pull_future.cancel()
print(
"Listening for messages on {} threw an Exception: {}.".format(
"Listening for messages on {} threw an exception: {}.".format(
subscription_name, e
)
)
Expand Down Expand Up @@ -518,14 +538,14 @@ def callback(message):
args.project_id, args.subscription_name, args.endpoint
)
elif args.command == "receive":
receive_messages(args.project_id, args.subscription_name)
receive_messages(args.project_id, args.subscription_name, args.timeout)
elif args.command == "receive-custom-attributes":
receive_messages_with_custom_attributes(
args.project_id, args.subscription_name
args.project_id, args.subscription_name, args.timeout
)
elif args.command == "receive-flow-control":
receive_messages_with_flow_control(
args.project_id, args.subscription_name
args.project_id, args.subscription_name, args.timeout
)
elif args.command == "receive-synchronously":
synchronous_pull(args.project_id, args.subscription_name)
Expand All @@ -534,4 +554,6 @@ def callback(message):
args.project_id, args.subscription_name
)
elif args.command == "listen_for_errors":
listen_for_errors(args.project_id, args.subscription_name)
listen_for_errors(
args.project_id, args.subscription_name, args.timeout
)
Loading

0 comments on commit d42f54c

Please sign in to comment.