Skip to content

Commit

Permalink
Modified publisher with error handling (#1568)
Browse files Browse the repository at this point in the history
  • Loading branch information
anguillanneuf authored and chenyumic committed Jul 19, 2018
1 parent 7c53d7c commit 8179bda
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
31 changes: 18 additions & 13 deletions pubsub/cloud-client/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""

import argparse
import concurrent.futures
import time

from google.cloud import pubsub_v1

Expand Down Expand Up @@ -130,29 +130,27 @@ def publish_messages_with_error_handler(project, topic_name):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

# When you publish a message, the client returns a Future. This Future
# can be used to track if an error has occurred.
futures = []

def callback(f):
exc = f.exception()
if exc:
def callback(message_future):
if message_future.exception():
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, exc))
topic_name, message_future.exception()))
else:
print(message_future.result())

for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message, the client returns a Future.
message_future = publisher.publish(topic_path, data=data)
message_future.add_done_callback(callback)
futures.append(message_future)

print('Published message IDs:')

# We must keep the main thread from exiting to allow it to process
# messages in the background.
concurrent.futures.wait(futures)

print('Published messages.')
while True:
time.sleep(60)
# [END pubsub_publish_messages_error_handler]


Expand Down Expand Up @@ -208,6 +206,11 @@ def publish_messages_with_batch_settings(project, topic_name):
help=publish_messages_with_futures.__doc__)
publish_with_futures_parser.add_argument('topic_name')

publish_with_error_handler_parser = subparsers.add_parser(
'publish-with-error-handler',
help=publish_messages_with_error_handler.__doc__)
publish_with_error_handler_parser.add_argument('topic_name')

publish_with_batch_settings_parser = subparsers.add_parser(
'publish-with-batch-settings',
help=publish_messages_with_batch_settings.__doc__)
Expand All @@ -227,5 +230,7 @@ def publish_messages_with_batch_settings(project, topic_name):
publish_messages_with_custom_attributes(args.project, args.topic_name)
elif args.command == 'publish-with-futures':
publish_messages_with_futures(args.project, args.topic_name)
elif args.command == 'publish-with-error-handler':
publish_messages_with_error_handler(args.project, args.topic_name)
elif args.command == 'publish-with-batch-settings':
publish_messages_with_batch_settings(args.project, args.topic_name)
7 changes: 7 additions & 0 deletions pubsub/cloud-client/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ def test_publish_with_batch_settings(topic, capsys):
assert 'Published' in out


def test_publish_with_error_handler(topic, capsys):
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)

out, _ = capsys.readouterr()
assert 'Published' in out


def test_publish_with_futures(topic, capsys):
publisher.publish_messages_with_futures(PROJECT, TOPIC)

Expand Down
5 changes: 0 additions & 5 deletions pubsub/cloud-client/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1 @@
<<<<<<< HEAD
google-cloud-pubsub==0.33.0
=======
google-cloud-pubsub==0.32.1
futures==3.1.1; python_version < '3'
>>>>>>> master

0 comments on commit 8179bda

Please sign in to comment.