Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(samples): Add code sample for optimistic subscribe #1182

Merged
merged 3 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,86 @@ def create_subscription(project_id: str, topic_id: str, subscription_id: str) ->
# [END pubsub_create_pull_subscription]


def optimistic_subscribe(
project_id: str,
topic_id: str,
subscription_id: str,
timeout: Optional[float] = None,
) -> None:
"""Optimistically subscribe to messages instead of making calls to verify existence
of a subscription first and then subscribing to messages from it. This avoids admin
operation calls to verify the existence of a subscription and reduces the probability
of running out of quota for admin operations."""
# [START pubsub_optimistic_subscribe]
from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Ack message after processing it.
message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# Optimistically subscribe to messages on the subscription.
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
print("Successfully subscribed until the timeout passed.")
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
except NotFound:
print(f"Subscription {subscription_path} not found, creating it.")

try:
# If the subscription does not exist, then create it.
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription = subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)

if subscription:
print(f"Subscription {subscription.name} created")
else:
raise ValueError("Subscription creation failed.")

# Subscribe on the created subscription.
try:
streaming_pull_future = subscriber.subscribe(
subscription.name, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
except Exception as e:
print(
f"Exception occurred when creating subscription and subscribing to it: {e}"
)
except Exception as e:
print(f"Exception occurred when attempting optimistic subscribe: {e}")
# [END pubsub_optimistic_subscribe]


def create_subscription_with_dead_letter_topic(
project_id: str,
topic_id: str,
Expand Down Expand Up @@ -1161,6 +1241,15 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
remove_dead_letter_policy_parser.add_argument("topic_id")
remove_dead_letter_policy_parser.add_argument("subscription_id")

optimistic_subscribe_parser = subparsers.add_parser(
"optimistic-subscribe", help=optimistic_subscribe.__doc__
)
optimistic_subscribe_parser.add_argument("topic_id")
optimistic_subscribe_parser.add_argument("subscription_id")
optimistic_subscribe_parser.add_argument(
"timeout", default=None, type=float, nargs="?"
)

receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__)
receive_parser.add_argument("subscription_id")
receive_parser.add_argument("timeout", default=None, type=float, nargs="?")
Expand Down Expand Up @@ -1303,6 +1392,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
)
elif args.command == "remove-dead-letter-policy":
remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id)
elif args.command == "optimistic-subscribe":
optimistic_subscribe(
args.project_id, args.topic_id, args.subscription_id, args.timeout
)
elif args.command == "receive":
receive_messages(args.project_id, args.subscription_id, args.timeout)
elif args.command == "receive-custom-attributes":
Expand Down
50 changes: 50 additions & 0 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,56 @@ def test_create_subscription(
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_optimistic_subscribe(
subscriber_client: pubsub_v1.SubscriberClient,
topic: str,
publisher_client: pubsub_v1.PublisherClient,
capsys: CaptureFixture[str],
) -> None:
subscription_id = f"subscription_for_optimistic_subscribe-{PY_VERSION}-{UUID}"
subscription_path = subscriber_client.subscription_path(PROJECT_ID, subscription_id)
# Ensure there is no pre-existing subscription.
# So that we can test the case where optimistic subscribe fails.
try:
subscriber_client.delete_subscription(
request={"subscription": subscription_path}
)
except NotFound:
pass

# Invoke optimistic_subscribe when the subscription is not present.
# This tests scenario where optimistic subscribe fails.
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5)
out, _ = capsys.readouterr()
# Verify optimistic subscription failed.
assert f"Subscription {subscription_path} not found, creating it." in out
# Verify that subscription created due to optimistic subscribe failure.
assert f"Subscription {subscription_path} created" in out
# Verify that subscription didn't already exist.
assert "Successfully subscribed until the timeout passed." not in out

# Invoke optimistic_subscribe when the subscription is present.
# This tests scenario where optimistic subscribe succeeds.
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5)

out, _ = capsys.readouterr()
# Verify optimistic subscription succeeded.
assert f"Subscription {subscription_path} not found, creating it." not in out
# Verify that subscription was not created due to optimistic subscribe failure.
assert f"Subscription {subscription_path} created" not in out
# Verify that subscription already existed.
assert "Successfully subscribed until the timeout passed." in out

# Test case where optimistic subscribe throws an exception other than NotFound
# or TimeoutError.
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, "123", 5)
out, _ = capsys.readouterr()
assert "Exception occurred when attempting optimistic subscribe:" in out

# Clean up resources created during test.
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_create_subscription_with_dead_letter_policy(
subscriber_client: pubsub_v1.SubscriberClient,
dead_letter_topic: str,
Expand Down