Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: quickstart sub.py update #2475

Merged
merged 10 commits into from
Oct 21, 2019
Merged
19 changes: 10 additions & 9 deletions pubsub/cloud-client/quickstart/sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

# [START pubsub_quickstart_sub_all]
import argparse
import time
# [START pubsub_quickstart_sub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_sub_deps]
Expand All @@ -34,20 +33,22 @@ def sub(project_id, subscription_name):
project_id, subscription_name)

def callback(message):
print('Received message {} of message ID {}'.format(
print('Received message {} of message ID {}\n'.format(
message, message.message_id))
# Acknowledge the message. Unack'ed messages will be redelivered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional nit)

# Unacknowledged messages will be redelivered when a client subscribes.

I previously intuited that this client would just keep getting the same message if the message was not ack'd which is not the case. Only if this client reconnects will the message get redelivered to this client, which can happen if the loop exits because of an exception, etc...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment Gus. That's something that I have never thought much of. I think Pub/Sub load-balances if there are multiple subscribers, but only if the load is large (like >10Mb). I remember trying with just three messages and they would keep getting delivered to the same subscriber client.

message.ack()
print('Acknowledged message of message ID {}\n'.format(
message.message_id))
print('Acknowledged message {}\n'.format(message.message_id))

client.subscribe(subscription_path, callback=callback)
streaming_pull_future = client.subscribe(
subscription_path, callback=callback)
print('Listening for messages on {}..\n'.format(subscription_path))

# Keep the main thread from exiting so the subscriber can
# process messages in the background.
while True:
time.sleep(60)
# Calling result() on StreamingPullFuture keeps the main thread from
# exiting while messages get processed in the callbacks.
try:
streaming_pull_future.result()
except: # noqa
streaming_pull_future.cancel()


if __name__ == '__main__':
Expand Down
91 changes: 42 additions & 49 deletions pubsub/cloud-client/quickstart/sub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import mock
import os
import pytest
import time

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1
Expand All @@ -29,84 +27,79 @@
TOPIC = 'quickstart-sub-test-topic'
SUBSCRIPTION = 'quickstart-sub-test-topic-sub'


@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()
publisher_client = pubsub_v1.PublisherClient()
subscriber_client = pubsub_v1.SubscriberClient()


@pytest.fixture(scope='module')
def topic_path(publisher_client):
def topic_path():
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher_client.create_topic(topic_path)
topic = publisher_client.create_topic(topic_path)
return topic.name
except AlreadyExists:
pass

yield topic_path


@pytest.fixture(scope='module')
def subscriber_client():
yield pubsub_v1.SubscriberClient()
return topic_path


@pytest.fixture(scope='module')
def subscription(subscriber_client, topic_path):
def subscription_path(topic_path):
hongalex marked this conversation as resolved.
Show resolved Hide resolved
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION)

try:
subscriber_client.create_subscription(subscription_path, topic_path)
subscription = subscriber_client.create_subscription(
subscription_path, topic_path)
return subscription.name
except AlreadyExists:
pass

yield SUBSCRIPTION
return subscription_path


@pytest.fixture
def to_delete(publisher_client, subscriber_client):
doomed = []
yield doomed
for client, item in doomed:
def _to_delete(resource_paths):
for item in resource_paths:
if 'topics' in item:
publisher_client.delete_topic(item)
if 'subscriptions' in item:
subscriber_client.delete_subscription(item)


def _make_sleep_patch():
real_sleep = time.sleep
def _publish_messages(topic_path):
publish_future = publisher_client.publish(topic_path, data=b'Hello World!')
publish_future.result()


def new_sleep(period):
if period == 60:
real_sleep(10)
raise RuntimeError('sigil')
else:
real_sleep(period)
def _sub_timeout(project_id, subscription_name):
# This is an exactly copy of `sub.py` except
# StreamingPullFuture.result() will time out after 10s.
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(
project_id, subscription_name)

return mock.patch('time.sleep', new=new_sleep)
def callback(message):
print('Received message {} of message ID {}\n'.format(
message, message.message_id))
message.ack()
print('Acknowledged message {}\n'.format(message.message_id))

streaming_pull_future = client.subscribe(
subscription_path, callback=callback)
print('Listening for messages on {}..\n'.format(subscription_path))

try:
streaming_pull_future.result(timeout=10)
except: # noqa
streaming_pull_future.cancel()

def test_sub(publisher_client,
topic_path,
subscriber_client,
subscription,
to_delete,
capsys):

publisher_client.publish(topic_path, data=b'Hello, World!')
def test_sub(monkeypatch, topic_path, subscription_path, capsys):
monkeypatch.setattr(sub, 'sub', _sub_timeout)

to_delete.append((publisher_client, topic_path))
_publish_messages(topic_path)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
sub.sub(PROJECT, subscription)
sub.sub(PROJECT, SUBSCRIPTION)

to_delete.append((subscriber_client,
'projects/{}/subscriptions/{}'.format(PROJECT,
SUBSCRIPTION)))
# Clean up resources.
_to_delete([topic_path, subscription_path])

out, _ = capsys.readouterr()
assert "Received message" in out
Expand Down
68 changes: 31 additions & 37 deletions pubsub/cloud-client/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
import time

from gcp_devrel.testing import eventually_consistent
from gcp_devrel.testing.flaky import flaky
from google.cloud import pubsub_v1
import google.api_core.exceptions
import mock
import pytest

Expand All @@ -43,68 +41,61 @@ def topic(publisher_client):
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher_client.delete_topic(topic_path)
except Exception:
pass
response = publisher_client.get_topic(topic_path)
except: # noqa
response = publisher_client.create_topic(topic_path)

publisher_client.create_topic(topic_path)

yield topic_path
yield response.name


@pytest.fixture(scope='module')
def subscriber_client():
yield pubsub_v1.SubscriberClient()


@pytest.fixture
@pytest.fixture(scope='module')
def subscription(subscriber_client, topic):
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION)

try:
subscriber_client.delete_subscription(subscription_path)
except Exception:
pass
response = subscriber_client.get_subscription(subscription_path)
except: # noqa
response = subscriber_client.create_subscription(
subscription_path, topic=topic)

try:
subscriber_client.create_subscription(subscription_path, topic=topic)
except google.api_core.exceptions.AlreadyExists:
pass

yield subscription_path
yield response.name


@pytest.fixture
@pytest.fixture(scope='module')
def subscription_sync1(subscriber_client, topic):
subscription_sync_path = subscriber_client.subscription_path(
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION_SYNC1)

try:
subscriber_client.delete_subscription(subscription_sync_path)
except Exception:
pass

subscriber_client.create_subscription(subscription_sync_path, topic=topic)
response = subscriber_client.get_subscription(subscription_path)
except: # noqa
response = subscriber_client.create_subscription(
subscription_path, topic=topic)

yield subscription_sync_path
yield response.name


@pytest.fixture
@pytest.fixture(scope='module')
def subscription_sync2(subscriber_client, topic):
subscription_sync_path = subscriber_client.subscription_path(
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION_SYNC2)

try:
subscriber_client.delete_subscription(subscription_sync_path)
except Exception:
pass

subscriber_client.create_subscription(subscription_sync_path, topic=topic)
response = subscriber_client.get_subscription(subscription_path)
except: # noqa
response = subscriber_client.create_subscription(
subscription_path, topic=topic)

yield subscription_sync_path
yield response.name


@pytest.fixture(scope='module')
def test_list_in_topic(subscription, capsys):
@eventually_consistent.call
def _():
Expand All @@ -113,6 +104,7 @@ def _():
assert subscription in out


@pytest.fixture(scope='module')
def test_list_in_project(subscription, capsys):
@eventually_consistent.call
def _():
Expand All @@ -121,6 +113,7 @@ def _():
assert subscription in out


@pytest.fixture(scope='module')
def test_create(subscriber_client):
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION)
Expand All @@ -136,6 +129,7 @@ def _():
assert subscriber_client.get_subscription(subscription_path)


@pytest.fixture(scope='module')
def test_create_push(subscriber_client):
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION)
Expand All @@ -151,6 +145,7 @@ def _():
assert subscriber_client.get_subscription(subscription_path)


@pytest.fixture(scope='module')
def test_delete(subscriber_client, subscription):
subscriber.delete_subscription(PROJECT, SUBSCRIPTION)

Expand All @@ -160,6 +155,7 @@ def _():
subscriber_client.get_subscription(subscription)


@pytest.fixture(scope='module')
def test_update(subscriber_client, subscription, capsys):
subscriber.update_subscription(PROJECT, SUBSCRIPTION, NEW_ENDPOINT)

Expand All @@ -170,8 +166,7 @@ def test_update(subscriber_client, subscription, capsys):
def _publish_messages(publisher_client, topic):
for n in range(5):
data = u'Message {}'.format(n).encode('utf-8')
future = publisher_client.publish(
topic, data=data)
future = publisher_client.publish(topic, data=data)
future.result()


Expand All @@ -194,7 +189,6 @@ def new_sleep(period):
return mock.patch('time.sleep', new=new_sleep)


@flaky
def test_receive(publisher_client, topic, subscription, capsys):
_publish_messages(publisher_client, topic)

Expand Down