Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: synchronous pull with lease management #1701

Merged
merged 5 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pubsub/cloud-client/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
google-cloud-pubsub==0.37.2
google-cloud-pubsub==0.38.0
95 changes: 83 additions & 12 deletions pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import argparse
import time
import logging
import random
import multiprocessing

from google.cloud import pubsub_v1

Expand Down Expand Up @@ -215,7 +218,7 @@ def callback(message):
# [END pubsub_subscriber_flow_settings]


def receive_messages_synchronously(project, subscription_name):
def synchronous_pull(project, subscription_name):
"""Pulling messages synchronously."""
# [START pubsub_subscriber_sync_pull]
# project = "Your Google Cloud Project ID"
Expand All @@ -224,13 +227,10 @@ def receive_messages_synchronously(project, subscription_name):
subscription_path = subscriber.subscription_path(
project, subscription_name)

# Builds a pull request with a specific number of messages to return.
# `return_immediately` is set to False so that the system waits (for a
# bounded amount of time) until at lease one message is available.
response = subscriber.pull(
subscription_path,
max_messages=3,
return_immediately=False)
NUM_MESSAGES=3

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

ack_ids = []
for received_message in response.received_messages:
Expand All @@ -239,9 +239,72 @@ def receive_messages_synchronously(project, subscription_name):

# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))
# [END pubsub_subscriber_sync_pull]


def synchronous_pull_with_lease_management(project, subscription_name):
"""Pulling messages synchronously with lease management"""
# [START pubsub_subscriber_sync_pull_with_lease]
# project = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)

NUM_MESSAGES=2
ACK_DEADLINE=30
SLEEP_TIME=10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
"""Simulates a long-running process."""
RUN_TIME = random.randint(1,60)
logger.info('{}: Running {} for {}s'.format(
time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
process = multiprocessing.Process(target=worker, args=(message,))
processes[process] = (message.ack_id, message.message.data)
process.start()

while processes:
for process, (ack_id, msg_data) in processes.items():
# If the process is still running, reset the ack deadline as
# specified by ACK_DEADLINE once every while as specified
# by SLEEP_TIME.
if process.is_alive():
# `ack_deadline_seconds` must be between 10 to 600.
subscriber.modify_ack_deadline(subscription_path,
[ack_id], ack_deadline_seconds=ACK_DEADLINE)
logger.info('{}: Reset ack deadline for {} for {}s'.format(
time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE))

# If the processs is finished, acknowledges using `ack_id`.
else:
subscriber.acknowledge(subscription_path, [ack_id])
logger.info("{}: Acknowledged {}".format(
time.strftime("%X", time.gmtime()), msg_data))
processes.pop(process)

# If there are still processes running, sleeps the thread.
if processes:
time.sleep(SLEEP_TIME)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))
# [END pubsub_subscriber_sync_pull_with_lease]


def listen_for_errors(project, subscription_name):
"""Receives messages and catches errors from a pull subscription."""
# [START pubsub_subscriber_error_listener]
Expand Down Expand Up @@ -318,10 +381,15 @@ def callback(message):
help=receive_messages_with_flow_control.__doc__)
receive_with_flow_control_parser.add_argument('subscription_name')

receive_messages_synchronously_parser = subparsers.add_parser(
synchronous_pull_parser = subparsers.add_parser(
'receive-synchronously',
help=receive_messages_synchronously.__doc__)
receive_messages_synchronously_parser.add_argument('subscription_name')
help=synchronous_pull.__doc__)
synchronous_pull_parser.add_argument('subscription_name')

synchronous_pull_with_lease_management_parser = subparsers.add_parser(
'receive-synchronously-with-lease',
help=synchronous_pull_with_lease_management.__doc__)
synchronous_pull_with_lease_management_parser.add_argument('subscription_name')

listen_for_errors_parser = subparsers.add_parser(
'listen_for_errors', help=listen_for_errors.__doc__)
Expand Down Expand Up @@ -357,7 +425,10 @@ def callback(message):
receive_messages_with_flow_control(
args.project, args.subscription_name)
elif args.command == 'receive-synchronously':
receive_messages_synchronously(
synchronous_pull(
args.project, args.subscription_name)
elif args.command == 'receive-synchronously-with-lease':
synchronous_pull_with_lease_management(
args.project, args.subscription_name)
elif args.command == 'listen_for_errors':
listen_for_errors(args.project, args.subscription_name)
68 changes: 53 additions & 15 deletions pubsub/cloud-client/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
PROJECT = os.environ['GCLOUD_PROJECT']
TOPIC = 'subscription-test-topic'
SUBSCRIPTION = 'subscription-test-subscription'
SUBSCRIPTION_SYNC1 = 'subscription-test-subscription-sync1'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are synchronous and shouldn't step over each other, can we use the same subscription for both synchronous tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They sometimes did step on each other because the tests are run in parallel.

SUBSCRIPTION_SYNC2 = 'subscription-test-subscription-sync2'
ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT)


Expand Down Expand Up @@ -67,6 +69,36 @@ def subscription(subscriber_client, topic):
yield subscription_path


@pytest.fixture
def subscription_sync1(subscriber_client, topic):
subscription_sync_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION_SYNC1)

try:
subscriber_client.delete_subscription(subscription_sync_path)
except Exception:
pass

subscriber_client.create_subscription(subscription_sync_path, topic=topic)

yield subscription_sync_path


@pytest.fixture
def subscription_sync2(subscriber_client, topic):
subscription_sync_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION_SYNC2)

try:
subscriber_client.delete_subscription(subscription_sync_path)
except Exception:
pass

subscriber_client.create_subscription(subscription_sync_path, topic=topic)

yield subscription_sync_path


def test_list_in_topic(subscription, capsys):
@eventually_consistent.call
def _():
Expand Down Expand Up @@ -160,6 +192,27 @@ def test_receive(publisher_client, topic, subscription, capsys):
assert 'Message 1' in out


def test_receive_synchronously(
publisher_client, topic, subscription_sync1, capsys):
_publish_messages(publisher_client, topic)

subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC1)

out, _ = capsys.readouterr()
assert 'Done.' in out


def test_receive_synchronously_with_lease(
publisher_client, topic, subscription_sync2, capsys):
_publish_messages(publisher_client, topic)

subscriber.synchronous_pull_with_lease_management(
PROJECT, SUBSCRIPTION_SYNC2)

out, _ = capsys.readouterr()
assert 'Done.' in out


def test_receive_with_custom_attributes(
publisher_client, topic, subscription, capsys):
_publish_messages_with_custom_attributes(publisher_client, topic)
Expand Down Expand Up @@ -188,18 +241,3 @@ def test_receive_with_flow_control(
assert 'Listening' in out
assert subscription in out
assert 'Message 1' in out


def test_receive_synchronously(
publisher_client, topic, subscription, capsys):
_publish_messages(publisher_client, topic)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
subscriber.receive_messages_with_flow_control(
PROJECT, SUBSCRIPTION)

out, _ = capsys.readouterr()
assert 'Message 1' in out
assert 'Message 2' in out
assert 'Message 3' in out