From 103d1d832ca20551cddea23825371f7270464cd7 Mon Sep 17 00:00:00 2001 From: Mukund Ananthu Date: Tue, 4 Jun 2024 20:21:21 +0000 Subject: [PATCH 1/2] docs(samples): Add code sample for optimistic subscribe Related documentation: https://cloud.google.com/pubsub/docs/troubleshooting#using_excessive_administrative_operations --- samples/snippets/subscriber.py | 82 +++++++++++++++++++++++++++++ samples/snippets/subscriber_test.py | 54 +++++++++++++++++++ 2 files changed, 136 insertions(+) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index ab0c8aafa..daeaf08db 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -94,6 +94,77 @@ def create_subscription(project_id: str, topic_id: str, subscription_id: str) -> # [END pubsub_create_pull_subscription] +def optimistic_subscribe( + project_id: str, topic_id: str, subscription_id: str, timeout: Optional[float] = None +) -> None: + """Optimistically subscribe to messages instead of making calls to verify existence + of a subscription first and then subscribing to messages from it. This avoids admin + operation calls to verify the existence of a subscription and reduces the probability + of running out of quota for admin operations.""" + # [START pubsub_optimistic_subscribe] + from google.api_core.exceptions import NotFound + from google.cloud import pubsub_v1 + from concurrent.futures import TimeoutError + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + # topic_id = "your-topic-id" + + # Create a subscriber client. + subscriber = pubsub_v1.SubscriberClient() + + # The `subscription_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/subscriptions/{subscription_id}` + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + # Define callback to be called when a message is received. + def callback(message: pubsub_v1.subscriber.message.Message) -> None: + # Ack message after processing it. + message.ack() + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # Optimistically subscribe to messages on the subscription. + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + print("Successfully subscribed until the timeout passed.") + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + except NotFound: + print(f"Subscription {subscription_path} not found, creating it.") + + try: + # If the subscription does not exist, then create it. + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + subscription = subscriber.create_subscription( + request={"name": subscription_path, "topic": topic_path} + ) + + if subscription: + print(f"Subscription {subscription.name} created") + else: + raise ValueError("Subscription creation failed.") + + # Subscribe on the created subscription. + try: + streaming_pull_future = subscriber.subscribe(subscription.name, callback=callback) + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + except Exception as e: + print(f"Exception occurred when creating subscription and subscribing to it: {e}") + except Exception as e: + print(f"Exception occurred when attempting optimistic subscribe: {e}") + # [END pubsub_optimistic_subscribe] + + def create_subscription_with_dead_letter_topic( project_id: str, topic_id: str, @@ -1157,6 +1228,15 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: remove_dead_letter_policy_parser.add_argument("topic_id") remove_dead_letter_policy_parser.add_argument("subscription_id") + optimistic_subscribe_parser = subparsers.add_parser( + "optimistic-subscribe", help=optimistic_subscribe.__doc__ + ) + optimistic_subscribe_parser.add_argument("topic_id") + optimistic_subscribe_parser.add_argument("subscription_id") + optimistic_subscribe_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__) receive_parser.add_argument("subscription_id") receive_parser.add_argument("timeout", default=None, type=float, nargs="?") @@ -1299,6 +1379,8 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: ) elif args.command == "remove-dead-letter-policy": remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id) + elif args.command == "optimistic-subscribe": + optimistic_subscribe(args.project_id, args.topic_id, args.subscription_id, args.timeout) elif args.command == "receive": receive_messages(args.project_id, args.subscription_id, args.timeout) elif args.command == "receive-custom-attributes": diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 53fefa109..3b549c836 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -234,6 +234,60 @@ def test_create_subscription( subscriber_client.delete_subscription(request={"subscription": subscription_path}) +def test_optimistic_subscribe( + subscriber_client: pubsub_v1.SubscriberClient, + topic: str, + publisher_client: pubsub_v1.PublisherClient, + capsys: CaptureFixture[str] +) -> None: + subscription_id = ( + f"subscription_for_optimistic_subscribe-{PY_VERSION}-{UUID}" + ) + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_id + ) + # Ensure there is no pre-existing subscription. + # So that we can test the case where optimistic subscribe fails. + try: + subscriber_client.delete_subscription( + request={"subscription": subscription_path} + ) + except NotFound: + pass + + # Invoke optimistic_subscribe when the subscription is not present. + # This tests scenario where optimistic subscribe fails. + subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5) + out, _ = capsys.readouterr() + # Verify optimistic subscription failed. + assert f"Subscription {subscription_path} not found, creating it." in out + # Verify that subscription created due to optimistic subscribe failure. + assert f"Subscription {subscription_path} created" in out + # Verify that subscription didn't already exist. + assert "Successfully subscribed until the timeout passed." not in out + + # Invoke optimistic_subscribe when the subscription is present. + # This tests scenario where optimistic subscribe succeeds. + subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5) + + out, _ = capsys.readouterr() + # Verify optimistic subscription succeeded. + assert f"Subscription {subscription_path} not found, creating it." not in out + # Verify that subscription was not created due to optimistic subscribe failure. + assert f"Subscription {subscription_path} created" not in out + # Verify that subscription already existed. + assert "Successfully subscribed until the timeout passed." in out + + # Test case where optimistic subscribe throws an exception other than NotFound + # or TimeoutError. + subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, "123", 5) + out, _ = capsys.readouterr() + assert "Exception occurred when attempting optimistic subscribe:" in out + + # Clean up resources created during test. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) + + def test_create_subscription_with_dead_letter_policy( subscriber_client: pubsub_v1.SubscriberClient, dead_letter_topic: str, From 5682a8b468f0b63b83d1f31840a453e155bea40b Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Mon, 17 Jun 2024 19:57:58 +0000 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- samples/snippets/subscriber.py | 21 ++++++++++++++++----- samples/snippets/subscriber_test.py | 10 +++------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index daeaf08db..45a3019b9 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -95,7 +95,10 @@ def create_subscription(project_id: str, topic_id: str, subscription_id: str) -> def optimistic_subscribe( - project_id: str, topic_id: str, subscription_id: str, timeout: Optional[float] = None + project_id: str, + topic_id: str, + subscription_id: str, + timeout: Optional[float] = None, ) -> None: """Optimistically subscribe to messages instead of making calls to verify existence of a subscription first and then subscribing to messages from it. This avoids admin @@ -129,7 +132,9 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: with subscriber: try: # Optimistically subscribe to messages on the subscription. - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback + ) streaming_pull_future.result(timeout=timeout) except TimeoutError: print("Successfully subscribed until the timeout passed.") @@ -153,13 +158,17 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: # Subscribe on the created subscription. try: - streaming_pull_future = subscriber.subscribe(subscription.name, callback=callback) + streaming_pull_future = subscriber.subscribe( + subscription.name, callback=callback + ) streaming_pull_future.result(timeout=timeout) except TimeoutError: streaming_pull_future.cancel() # Trigger the shutdown. streaming_pull_future.result() # Block until the shutdown is complete. except Exception as e: - print(f"Exception occurred when creating subscription and subscribing to it: {e}") + print( + f"Exception occurred when creating subscription and subscribing to it: {e}" + ) except Exception as e: print(f"Exception occurred when attempting optimistic subscribe: {e}") # [END pubsub_optimistic_subscribe] @@ -1380,7 +1389,9 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: elif args.command == "remove-dead-letter-policy": remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id) elif args.command == "optimistic-subscribe": - optimistic_subscribe(args.project_id, args.topic_id, args.subscription_id, args.timeout) + optimistic_subscribe( + args.project_id, args.topic_id, args.subscription_id, args.timeout + ) elif args.command == "receive": receive_messages(args.project_id, args.subscription_id, args.timeout) elif args.command == "receive-custom-attributes": diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 3b549c836..86f7a94ce 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -238,14 +238,10 @@ def test_optimistic_subscribe( subscriber_client: pubsub_v1.SubscriberClient, topic: str, publisher_client: pubsub_v1.PublisherClient, - capsys: CaptureFixture[str] + capsys: CaptureFixture[str], ) -> None: - subscription_id = ( - f"subscription_for_optimistic_subscribe-{PY_VERSION}-{UUID}" - ) - subscription_path = subscriber_client.subscription_path( - PROJECT_ID, subscription_id - ) + subscription_id = f"subscription_for_optimistic_subscribe-{PY_VERSION}-{UUID}" + subscription_path = subscriber_client.subscription_path(PROJECT_ID, subscription_id) # Ensure there is no pre-existing subscription. # So that we can test the case where optimistic subscribe fails. try: