diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index f44f82c4a..dad841600 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -190,7 +190,7 @@ def create_push_subscription( def create_subscription_with_ordering( project_id: str, topic_id: str, subscription_id: str ) -> None: - """Create a subscription with dead letter policy.""" + """Create a subscription with ordering enabled.""" # [START pubsub_enable_subscription_ordering] from google.cloud import pubsub_v1 @@ -216,6 +216,32 @@ def create_subscription_with_ordering( # [END pubsub_enable_subscription_ordering] +def create_subscription_with_filtering( + project_id: str, topic_id: str, subscription_id: str, filter: str, +) -> None: + """Create a subscription with filtering enabled.""" + # [START pubsub_create_subscription_with_filter] + from google.cloud import pubsub_v1 + + # TODO(developer): Choose an existing topic. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # subscription_id = "your-subscription-id" + # filter = "attributes.author=\"unknown\"" + + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + topic_path = publisher.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + with subscriber: + subscription = subscriber.create_subscription( + request={"name": subscription_path, "topic": topic_path, "filter": filter} + ) + print(f"Created subscription with filtering enabled: {subscription}") + # [END pubsub_create_subscription_with_filter] + + def delete_subscription(project_id: str, subscription_id: str) -> None: """Deletes an existing Pub/Sub topic.""" # [START pubsub_delete_subscription] @@ -741,7 +767,7 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: # [END pubsub_dead_letter_delivery_attempt] -if __name__ == "__main__": +if __name__ == "__main__": # noqa parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) @@ -785,6 +811,13 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: create_subscription_with_ordering_parser.add_argument("topic_id") create_subscription_with_ordering_parser.add_argument("subscription_id") + create_subscription_with_filtering_parser = subparsers.add_parser( + "create-with-filtering", help=create_subscription_with_filtering.__doc__ + ) + create_subscription_with_filtering_parser.add_argument("topic_id") + create_subscription_with_filtering_parser.add_argument("subscription_id") + create_subscription_with_filtering_parser.add_argument("filter") + delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__) delete_parser.add_argument("subscription_id") @@ -888,17 +921,21 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: ) elif args.command == "create-push": create_push_subscription( - args.project_id, args.topic_id, args.subscription_id, args.endpoint, + args.project_id, args.topic_id, args.subscription_id, args.endpoint ) elif args.command == "create-with-ordering": create_subscription_with_ordering( args.project_id, args.topic_id, args.subscription_id ) + elif args.command == "create-with-filtering": + create_subscription_with_filtering( + args.project_id, args.topic_id, args.subscription_id, args.filter + ) elif args.command == "delete": delete_subscription(args.project_id, args.subscription_id) elif args.command == "update-push": update_push_subscription( - args.project_id, args.topic_id, args.subscription_id, args.endpoint, + args.project_id, args.topic_id, args.subscription_id, args.endpoint ) elif args.command == "update-dead-letter-policy": update_subscription_with_dead_letter_policy( diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 6ad3da4fa..9fcb1c119 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -43,6 +43,7 @@ NEW_ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push2" DEFAULT_MAX_DELIVERY_ATTEMPTS = 5 UPDATED_MAX_DELIVERY_ATTEMPTS = 20 +FILTER = 'attributes.author="unknown"' C = TypeVar("C", bound=Callable[..., Any]) @@ -385,6 +386,31 @@ def test_create_subscription_with_ordering( assert "enable_message_ordering: true" in out +def test_create_subscription_with_filtering( + subscriber_client: pubsub_v1.SubscriberClient, + subscription_admin: str, + capsys: CaptureFixture[str], +) -> None: + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, SUBSCRIPTION_ADMIN + ) + try: + subscriber_client.delete_subscription( + request={"subscription": subscription_path} + ) + except NotFound: + pass + + subscriber.create_subscription_with_filtering( + PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, FILTER + ) + + out, _ = capsys.readouterr() + assert "Created subscription with filtering enabled" in out + assert f"{subscription_admin}" in out + assert '"attributes.author=\\"unknown\\""' in out + + def test_create_push_subscription( subscriber_client: pubsub_v1.SubscriberClient, subscription_admin: str,