From c156e3d4dc00528c3b5695cc3919fa8e907f8143 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Tue, 8 Feb 2022 13:31:26 -0800 Subject: [PATCH 1/2] samples: create subscription with filtering enabled --- samples/snippets/subscriber.py | 47 +++++++++++++++++++++++++++-- samples/snippets/subscriber_test.py | 24 +++++++++++++++ 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index f44f82c4a..1bb60454d 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -190,7 +190,7 @@ def create_push_subscription( def create_subscription_with_ordering( project_id: str, topic_id: str, subscription_id: str ) -> None: - """Create a subscription with dead letter policy.""" + """Create a subscription with ordering enabled.""" # [START pubsub_enable_subscription_ordering] from google.cloud import pubsub_v1 @@ -216,6 +216,36 @@ def create_subscription_with_ordering( # [END pubsub_enable_subscription_ordering] +def create_subscription_with_filtering( + project_id: str, topic_id: str, subscription_id: str, filter: str, +) -> None: + """Create a subscription with filtering enabled.""" + # [START pubsub_create_subscription_with_filter] + from google.cloud import pubsub_v1 + + # TODO(developer): Choose an existing topic. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # subscription_id = "your-subscription-id" + # filter = "attributes.author=\"unknown\"" + + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + topic_path = publisher.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + with subscriber: + subscription = subscriber.create_subscription( + request={ + "name": subscription_path, + "topic": topic_path, + "filter": filter, + } + ) + print(f"Created subscription with filtering enabled: {subscription}") + # [END pubsub_create_subscription_with_filter] + + def delete_subscription(project_id: str, subscription_id: str) -> None: """Deletes an existing Pub/Sub topic.""" # [START pubsub_delete_subscription] @@ -785,6 +815,13 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: create_subscription_with_ordering_parser.add_argument("topic_id") create_subscription_with_ordering_parser.add_argument("subscription_id") + create_subscription_with_filtering_parser = subparsers.add_parser( + "create-with-filtering", help=create_subscription_with_filtering.__doc__ + ) + create_subscription_with_filtering_parser.add_argument("topic_id") + create_subscription_with_filtering_parser.add_argument("subscription_id") + create_subscription_with_filtering_parser.add_argument("filter") + delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__) delete_parser.add_argument("subscription_id") @@ -888,17 +925,21 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: ) elif args.command == "create-push": create_push_subscription( - args.project_id, args.topic_id, args.subscription_id, args.endpoint, + args.project_id, args.topic_id, args.subscription_id, args.endpoint ) elif args.command == "create-with-ordering": create_subscription_with_ordering( args.project_id, args.topic_id, args.subscription_id ) + elif args.command == "create-with-filtering": + create_subscription_with_filtering( + args.project_id, args.topic_id, args.subscription_id, args.filter + ) elif args.command == "delete": delete_subscription(args.project_id, args.subscription_id) elif args.command == "update-push": update_push_subscription( - args.project_id, args.topic_id, args.subscription_id, args.endpoint, + args.project_id, args.topic_id, args.subscription_id, args.endpoint ) elif args.command == "update-dead-letter-policy": update_subscription_with_dead_letter_policy( diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 6ad3da4fa..2699f698c 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -43,6 +43,7 @@ NEW_ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push2" DEFAULT_MAX_DELIVERY_ATTEMPTS = 5 UPDATED_MAX_DELIVERY_ATTEMPTS = 20 +FILTER = "attributes.author=\"unknown\"" C = TypeVar("C", bound=Callable[..., Any]) @@ -385,6 +386,29 @@ def test_create_subscription_with_ordering( assert "enable_message_ordering: true" in out +def test_create_subscription_with_filtering( + subscriber_client: pubsub_v1.SubscriberClient, + subscription_admin: str, + capsys: CaptureFixture[str], +) -> None: + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, SUBSCRIPTION_ADMIN + ) + try: + subscriber_client.delete_subscription( + request={"subscription": subscription_path} + ) + except NotFound: + pass + + subscriber.create_subscription_with_filtering(PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, FILTER) + + out, _ = capsys.readouterr() + assert "Created subscription with filtering enabled" in out + assert f"{subscription_admin}" in out + assert f"filter: {FILTER}" in out + + def test_create_push_subscription( subscriber_client: pubsub_v1.SubscriberClient, subscription_admin: str, From c5a34c90ddb24bc9194a34c135c05d28612860a7 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Tue, 8 Feb 2022 13:33:57 -0800 Subject: [PATCH 2/2] lint --- samples/snippets/subscriber.py | 8 ++------ samples/snippets/subscriber_test.py | 8 +++++--- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 1bb60454d..dad841600 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -236,11 +236,7 @@ def create_subscription_with_filtering( with subscriber: subscription = subscriber.create_subscription( - request={ - "name": subscription_path, - "topic": topic_path, - "filter": filter, - } + request={"name": subscription_path, "topic": topic_path, "filter": filter} ) print(f"Created subscription with filtering enabled: {subscription}") # [END pubsub_create_subscription_with_filter] @@ -771,7 +767,7 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: # [END pubsub_dead_letter_delivery_attempt] -if __name__ == "__main__": +if __name__ == "__main__": # noqa parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 2699f698c..9fcb1c119 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -43,7 +43,7 @@ NEW_ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push2" DEFAULT_MAX_DELIVERY_ATTEMPTS = 5 UPDATED_MAX_DELIVERY_ATTEMPTS = 20 -FILTER = "attributes.author=\"unknown\"" +FILTER = 'attributes.author="unknown"' C = TypeVar("C", bound=Callable[..., Any]) @@ -401,12 +401,14 @@ def test_create_subscription_with_filtering( except NotFound: pass - subscriber.create_subscription_with_filtering(PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, FILTER) + subscriber.create_subscription_with_filtering( + PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, FILTER + ) out, _ = capsys.readouterr() assert "Created subscription with filtering enabled" in out assert f"{subscription_admin}" in out - assert f"filter: {FILTER}" in out + assert '"attributes.author=\\"unknown\\""' in out def test_create_push_subscription(