From 6fa03be779d6a7105bb7c029b95d4c357d2a49df Mon Sep 17 00:00:00 2001 From: "gcf-owl-bot[bot]" <78513119+gcf-owl-bot[bot]@users.noreply.github.com> Date: Thu, 19 May 2022 08:13:35 -0400 Subject: [PATCH] feat: add BigQuery configuration for subscriptions (#685) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add BigQuery configuration for subscriptions PiperOrigin-RevId: 449031535 Source-Link: https://github.com/googleapis/googleapis/commit/feec34dfac930eb0ab8c3e72ff5794c3f4c5924d Source-Link: https://github.com/googleapis/googleapis-gen/commit/89664e9708c19d532c63f7a16fd79cb631d87aa1 Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiODk2NjRlOTcwOGMxOWQ1MzJjNjNmN2ExNmZkNzljYjYzMWQ4N2FhMSJ9 * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- google/pubsub/__init__.py | 2 + google/pubsub_v1/__init__.py | 2 + .../services/subscriber/async_client.py | 5 +- .../pubsub_v1/services/subscriber/client.py | 5 +- google/pubsub_v1/types/__init__.py | 2 + google/pubsub_v1/types/pubsub.py | 95 ++++++++++++++++++- scripts/fixup_pubsub_v1_keywords.py | 2 +- tests/unit/gapic/pubsub_v1/test_subscriber.py | 12 +++ 8 files changed, 117 insertions(+), 8 deletions(-) diff --git a/google/pubsub/__init__.py b/google/pubsub/__init__.py index 9fc4e6feb..7c94c2308 100644 --- a/google/pubsub/__init__.py +++ b/google/pubsub/__init__.py @@ -24,6 +24,7 @@ from google.pubsub_v1.services.subscriber.async_client import SubscriberAsyncClient from google.pubsub_v1.types.pubsub import AcknowledgeRequest +from google.pubsub_v1.types.pubsub import BigQueryConfig from google.pubsub_v1.types.pubsub import CreateSnapshotRequest from google.pubsub_v1.types.pubsub import DeadLetterPolicy from google.pubsub_v1.types.pubsub import DeleteSnapshotRequest @@ -88,6 +89,7 @@ "SubscriberClient", "SubscriberAsyncClient", "AcknowledgeRequest", + "BigQueryConfig", "CreateSnapshotRequest", "DeadLetterPolicy", "DeleteSnapshotRequest", diff --git a/google/pubsub_v1/__init__.py b/google/pubsub_v1/__init__.py index 75c41bf21..80fc23d59 100644 --- a/google/pubsub_v1/__init__.py +++ b/google/pubsub_v1/__init__.py @@ -22,6 +22,7 @@ from .services.subscriber import SubscriberAsyncClient from .types.pubsub import AcknowledgeRequest +from .types.pubsub import BigQueryConfig from .types.pubsub import CreateSnapshotRequest from .types.pubsub import DeadLetterPolicy from .types.pubsub import DeleteSnapshotRequest @@ -83,6 +84,7 @@ "SchemaServiceAsyncClient", "SubscriberAsyncClient", "AcknowledgeRequest", + "BigQueryConfig", "CreateSchemaRequest", "CreateSnapshotRequest", "DeadLetterPolicy", diff --git a/google/pubsub_v1/services/subscriber/async_client.py b/google/pubsub_v1/services/subscriber/async_client.py index acfc54a12..fa88cc649 100644 --- a/google/pubsub_v1/services/subscriber/async_client.py +++ b/google/pubsub_v1/services/subscriber/async_client.py @@ -295,8 +295,9 @@ async def sample_create_subscription(): should not be set. push_config (:class:`google.pubsub_v1.types.PushConfig`): If push delivery is used with this subscription, this - field is used to configure it. An empty ``pushConfig`` - signifies that the subscriber will pull and ack messages + field is used to configure it. Either ``pushConfig`` or + ``bigQueryConfig`` can be set, but not both. If both are + empty, then the subscriber will pull and ack messages using API methods. This corresponds to the ``push_config`` field diff --git a/google/pubsub_v1/services/subscriber/client.py b/google/pubsub_v1/services/subscriber/client.py index f635b3295..c0af49577 100644 --- a/google/pubsub_v1/services/subscriber/client.py +++ b/google/pubsub_v1/services/subscriber/client.py @@ -578,8 +578,9 @@ def sample_create_subscription(): should not be set. push_config (google.pubsub_v1.types.PushConfig): If push delivery is used with this subscription, this - field is used to configure it. An empty ``pushConfig`` - signifies that the subscriber will pull and ack messages + field is used to configure it. Either ``pushConfig`` or + ``bigQueryConfig`` can be set, but not both. If both are + empty, then the subscriber will pull and ack messages using API methods. This corresponds to the ``push_config`` field diff --git a/google/pubsub_v1/types/__init__.py b/google/pubsub_v1/types/__init__.py index 888e2184a..c0d9c4619 100644 --- a/google/pubsub_v1/types/__init__.py +++ b/google/pubsub_v1/types/__init__.py @@ -17,6 +17,7 @@ from .pubsub import ( AcknowledgeRequest, + BigQueryConfig, CreateSnapshotRequest, DeadLetterPolicy, DeleteSnapshotRequest, @@ -87,6 +88,7 @@ __all__ = ( "TimeoutType", "AcknowledgeRequest", + "BigQueryConfig", "CreateSnapshotRequest", "DeadLetterPolicy", "DeleteSnapshotRequest", diff --git a/google/pubsub_v1/types/pubsub.py b/google/pubsub_v1/types/pubsub.py index 4c5841f5c..73658e9c0 100644 --- a/google/pubsub_v1/types/pubsub.py +++ b/google/pubsub_v1/types/pubsub.py @@ -46,6 +46,7 @@ "DeadLetterPolicy", "ExpirationPolicy", "PushConfig", + "BigQueryConfig", "ReceivedMessage", "GetSubscriptionRequest", "UpdateSubscriptionRequest", @@ -581,9 +582,16 @@ class Subscription(proto.Message): deleted. push_config (google.pubsub_v1.types.PushConfig): If push delivery is used with this subscription, this field - is used to configure it. An empty ``pushConfig`` signifies - that the subscriber will pull and ack messages using API - methods. + is used to configure it. Either ``pushConfig`` or + ``bigQueryConfig`` can be set, but not both. If both are + empty, then the subscriber will pull and ack messages using + API methods. + bigquery_config (google.pubsub_v1.types.BigQueryConfig): + If delivery to BigQuery is used with this subscription, this + field is used to configure it. Either ``pushConfig`` or + ``bigQueryConfig`` can be set, but not both. If both are + empty, then the subscriber will pull and ack messages using + API methods. ack_deadline_seconds (int): The approximate amount of time (on a best-effort basis) Pub/Sub waits for the subscriber to acknowledge receipt @@ -700,8 +708,18 @@ class Subscription(proto.Message): subscribers. See the ``message_retention_duration`` field in ``Topic``. This field is set only in responses from the server; it is ignored if it is set in any requests. + state (google.pubsub_v1.types.Subscription.State): + Output only. An output-only field indicating + whether or not the subscription can receive + messages. """ + class State(proto.Enum): + r"""Possible states for a subscription.""" + STATE_UNSPECIFIED = 0 + ACTIVE = 1 + RESOURCE_ERROR = 2 + name = proto.Field( proto.STRING, number=1, @@ -715,6 +733,11 @@ class Subscription(proto.Message): number=4, message="PushConfig", ) + bigquery_config = proto.Field( + proto.MESSAGE, + number=18, + message="BigQueryConfig", + ) ack_deadline_seconds = proto.Field( proto.INT32, number=5, @@ -769,6 +792,11 @@ class Subscription(proto.Message): number=17, message=duration_pb2.Duration, ) + state = proto.Field( + proto.ENUM, + number=19, + enum=State, + ) class RetryPolicy(proto.Message): @@ -980,6 +1008,67 @@ class OidcToken(proto.Message): ) +class BigQueryConfig(proto.Message): + r"""Configuration for a BigQuery subscription. + + Attributes: + table (str): + The name of the table to which to write data, + of the form {projectId}:{datasetId}.{tableId} + use_topic_schema (bool): + When true, use the topic's schema as the + columns to write to in BigQuery, if it exists. + write_metadata (bool): + When true, write the subscription name, message_id, + publish_time, attributes, and ordering_key to additional + columns in the table. The subscription name, message_id, and + publish_time fields are put in their own columns while all + other message properties (other than data) are written to a + JSON object in the attributes column. + drop_unknown_fields (bool): + When true and use_topic_schema is true, any fields that are + a part of the topic schema that are not part of the BigQuery + table schema are dropped when writing to BigQuery. + Otherwise, the schemas must be kept in sync and any messages + with extra fields are not written and remain in the + subscription's backlog. + state (google.pubsub_v1.types.BigQueryConfig.State): + Output only. An output-only field that + indicates whether or not the subscription can + receive messages. + """ + + class State(proto.Enum): + r"""Possible states for a BigQuery subscription.""" + STATE_UNSPECIFIED = 0 + ACTIVE = 1 + PERMISSION_DENIED = 2 + NOT_FOUND = 3 + SCHEMA_MISMATCH = 4 + + table = proto.Field( + proto.STRING, + number=1, + ) + use_topic_schema = proto.Field( + proto.BOOL, + number=2, + ) + write_metadata = proto.Field( + proto.BOOL, + number=3, + ) + drop_unknown_fields = proto.Field( + proto.BOOL, + number=4, + ) + state = proto.Field( + proto.ENUM, + number=5, + enum=State, + ) + + class ReceivedMessage(proto.Message): r"""A message and its corresponding acknowledgment ID. diff --git a/scripts/fixup_pubsub_v1_keywords.py b/scripts/fixup_pubsub_v1_keywords.py index 3b6d3d378..d1bbcedf9 100644 --- a/scripts/fixup_pubsub_v1_keywords.py +++ b/scripts/fixup_pubsub_v1_keywords.py @@ -42,7 +42,7 @@ class pubsubCallTransformer(cst.CSTTransformer): 'acknowledge': ('subscription', 'ack_ids', ), 'create_schema': ('parent', 'schema', 'schema_id', ), 'create_snapshot': ('name', 'subscription', 'labels', ), - 'create_subscription': ('name', 'topic', 'push_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'enable_exactly_once_delivery', 'topic_message_retention_duration', ), + 'create_subscription': ('name', 'topic', 'push_config', 'bigquery_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'enable_exactly_once_delivery', 'topic_message_retention_duration', 'state', ), 'create_topic': ('name', 'labels', 'message_storage_policy', 'kms_key_name', 'schema_settings', 'satisfies_pzs', 'message_retention_duration', ), 'delete_schema': ('name', ), 'delete_snapshot': ('snapshot', ), diff --git a/tests/unit/gapic/pubsub_v1/test_subscriber.py b/tests/unit/gapic/pubsub_v1/test_subscriber.py index cbccac582..b4f9b3882 100644 --- a/tests/unit/gapic/pubsub_v1/test_subscriber.py +++ b/tests/unit/gapic/pubsub_v1/test_subscriber.py @@ -665,6 +665,7 @@ def test_create_subscription(request_type, transport: str = "grpc"): filter="filter_value", detached=True, enable_exactly_once_delivery=True, + state=pubsub.Subscription.State.ACTIVE, ) response = client.create_subscription(request) @@ -683,6 +684,7 @@ def test_create_subscription(request_type, transport: str = "grpc"): assert response.filter == "filter_value" assert response.detached is True assert response.enable_exactly_once_delivery is True + assert response.state == pubsub.Subscription.State.ACTIVE def test_create_subscription_empty_call(): @@ -731,6 +733,7 @@ async def test_create_subscription_async( filter="filter_value", detached=True, enable_exactly_once_delivery=True, + state=pubsub.Subscription.State.ACTIVE, ) ) response = await client.create_subscription(request) @@ -750,6 +753,7 @@ async def test_create_subscription_async( assert response.filter == "filter_value" assert response.detached is True assert response.enable_exactly_once_delivery is True + assert response.state == pubsub.Subscription.State.ACTIVE @pytest.mark.asyncio @@ -963,6 +967,7 @@ def test_get_subscription(request_type, transport: str = "grpc"): filter="filter_value", detached=True, enable_exactly_once_delivery=True, + state=pubsub.Subscription.State.ACTIVE, ) response = client.get_subscription(request) @@ -981,6 +986,7 @@ def test_get_subscription(request_type, transport: str = "grpc"): assert response.filter == "filter_value" assert response.detached is True assert response.enable_exactly_once_delivery is True + assert response.state == pubsub.Subscription.State.ACTIVE def test_get_subscription_empty_call(): @@ -1025,6 +1031,7 @@ async def test_get_subscription_async( filter="filter_value", detached=True, enable_exactly_once_delivery=True, + state=pubsub.Subscription.State.ACTIVE, ) ) response = await client.get_subscription(request) @@ -1044,6 +1051,7 @@ async def test_get_subscription_async( assert response.filter == "filter_value" assert response.detached is True assert response.enable_exactly_once_delivery is True + assert response.state == pubsub.Subscription.State.ACTIVE @pytest.mark.asyncio @@ -1221,6 +1229,7 @@ def test_update_subscription(request_type, transport: str = "grpc"): filter="filter_value", detached=True, enable_exactly_once_delivery=True, + state=pubsub.Subscription.State.ACTIVE, ) response = client.update_subscription(request) @@ -1239,6 +1248,7 @@ def test_update_subscription(request_type, transport: str = "grpc"): assert response.filter == "filter_value" assert response.detached is True assert response.enable_exactly_once_delivery is True + assert response.state == pubsub.Subscription.State.ACTIVE def test_update_subscription_empty_call(): @@ -1287,6 +1297,7 @@ async def test_update_subscription_async( filter="filter_value", detached=True, enable_exactly_once_delivery=True, + state=pubsub.Subscription.State.ACTIVE, ) ) response = await client.update_subscription(request) @@ -1306,6 +1317,7 @@ async def test_update_subscription_async( assert response.filter == "filter_value" assert response.detached is True assert response.enable_exactly_once_delivery is True + assert response.state == pubsub.Subscription.State.ACTIVE @pytest.mark.asyncio