Skip to content

Commit

Permalink
Pub/Sub end-to-end sample [(#1800)](GoogleCloudPlatform/python-docs-s…
Browse files Browse the repository at this point in the history
…amples#1800)

* Created new end-to-end sample, moved old sample

* Add space around operator
  • Loading branch information
anguillanneuf authored and plamut committed Jul 10, 2020
1 parent 7bd5efa commit c173295
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 28 deletions.
2 changes: 2 additions & 0 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def list_topics(project_id):

def create_topic(project_id, topic_name):
"""Create a new Pub/Sub topic."""
# [START pubsub_quickstart_create_topic]
# [START pubsub_create_topic]
from google.cloud import pubsub_v1

Expand All @@ -53,6 +54,7 @@ def create_topic(project_id, topic_name):
topic = publisher.create_topic(topic_path)

print('Topic created: {}'.format(topic))
# [END pubsub_quickstart_create_topic]
# [END pubsub_create_topic]


Expand Down
91 changes: 80 additions & 11 deletions samples/snippets/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,94 @@
# limitations under the License.


def run_quickstart():
# [START pubsub_quickstart_create_topic]
# Imports the Google Cloud client library
import argparse


def end_to_end(project_id, topic_name, subscription_name, num_messages):
# [START pubsub_end_to_end]
import time

from google.cloud import pubsub_v1

# Instantiates a client
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
# TODO num_messages = number of messages to test end-to-end

# Instantiates a publisher and subscriber client
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = subscriber.topic_path(project_id, topic_name)

# The resource path for the new topic contains the project ID
# and the topic name.
topic_path = publisher.topic_path(
'my-project', 'my-new-topic')
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
project_id, subscription_name)

# Create the topic.
topic = publisher.create_topic(topic_path)
print('\nTopic created: {}'.format(topic.name))

# Create a subscription.
subscription = subscriber.create_subscription(
subscription_path, topic_path)
print('\nSubscription created: {}\n'.format(subscription.name))

publish_begin = time.time()

# Publish messages.
for n in range(num_messages):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
print('Published {} of message ID {}.'.format(data, future.result()))

publish_time = time.time() - publish_begin

print('Topic created: {}'.format(topic))
# [END pubsub_quickstart_create_topic]
messages = set()

def callback(message):
print('Received message: {}'.format(message))
# Unacknowledged messages will be sent again.
message.ack()
messages.add(message)

subscribe_begin = time.time()

# Receive messages. The subscriber is nonblocking.
subscriber.subscribe(subscription_path, callback=callback)

print('\nListening for messages on {}...\n'.format(subscription_path))

while True:
if len(messages) == num_messages:
subscribe_time = time.time() - subscribe_begin
print("\nReceived all messages.")
print("Publish time lapsed: {:.2f}s.".format(publish_time))
print("Subscribe time lapsed: {:.2f}s.".format(subscribe_time))
break
else:
# Sleeps the thread at 50Hz to save on resources.
time.sleep(1. / 50)
# [END pubsub_end_to_end]


if __name__ == '__main__':
run_quickstart()

parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('project_id', help='Your Google Cloud project ID')
parser.add_argument('topic_name', help='Your topic name')
parser.add_argument('subscription_name', help='Your subscription name')
parser.add_argument('num_msgs', type=int, help='Number of test messages')

args = parser.parse_args()

end_to_end(args.project_id, args.topic_name, args.subscription_name,
args.num_msgs)
58 changes: 41 additions & 17 deletions samples/snippets/quickstart_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#!/usr/bin/env python

# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,33 +17,55 @@
import os

from google.cloud import pubsub_v1
import mock
import pytest

import quickstart

PROJECT = os.environ['GCLOUD_PROJECT']
# Must match the dataset listed in quickstart.py
TOPIC_NAME = 'my-new-topic'
TOPIC_PATH = 'projects/{}/topics/{}'.format(PROJECT, TOPIC_NAME)
TOPIC = 'end-to-end-test-topic'
SUBSCRIPTION = 'end-to-end-test-topic-sub'
N = 10


@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()


@pytest.fixture
def temporary_topic():
"""Fixture that ensures the test topic does not exist before the test."""
publisher = pubsub_v1.PublisherClient()
@pytest.fixture(scope='module')
def topic(publisher_client):
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher.delete_topic(TOPIC_PATH)
publisher_client.delete_topic(topic_path)
except Exception:
pass

yield
yield TOPIC


@mock.patch.object(
pubsub_v1.PublisherClient, 'topic_path', return_value=TOPIC_PATH)
def test_quickstart(unused_topic_path, temporary_topic, capsys):
quickstart.run_quickstart()
@pytest.fixture(scope='module')
def subscriber_client():
yield pubsub_v1.SubscriberClient()


@pytest.fixture(scope='module')
def subscription(subscriber_client, topic):
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION)

try:
subscriber_client.delete_subscription(subscription_path)
except Exception:
pass

yield SUBSCRIPTION


def test_end_to_end(topic, subscription, capsys):

quickstart.end_to_end(PROJECT, topic, subscription, N)
out, _ = capsys.readouterr()
assert TOPIC_NAME in out

assert "Received all messages" in out
assert "Publish time lapsed" in out
assert "Subscribe time lapsed" in out

0 comments on commit c173295

Please sign in to comment.