From 2e29ba1f39f299acf97e543db355bf8ebfcdf121 Mon Sep 17 00:00:00 2001 From: "gcf-owl-bot[bot]" <78513119+gcf-owl-bot[bot]@users.noreply.github.com> Date: Sat, 26 Jun 2021 12:06:11 +0000 Subject: [PATCH] feat: Add SeekSubscription and Operations to API (#169) PiperOrigin-RevId: 380660182 Source-Link: https://github.com/googleapis/googleapis/commit/b601f026f724bbbd25f99273a3fd5e3c3cb8523c Source-Link: https://github.com/googleapis/googleapis-gen/commit/de07f619afee523b3fdb2108324ec7947f96ec22 --- google/cloud/pubsublite_v1/__init__.py | 6 + .../cloud/pubsublite_v1/gapic_metadata.json | 10 + .../services/admin_service/async_client.py | 87 +++++++++ .../services/admin_service/client.py | 88 +++++++++ .../services/admin_service/transports/base.py | 19 ++ .../services/admin_service/transports/grpc.py | 69 +++++++ .../admin_service/transports/grpc_asyncio.py | 71 +++++++ google/cloud/pubsublite_v1/types/__init__.py | 6 + google/cloud/pubsublite_v1/types/admin.py | 61 ++++++ .../gapic/pubsublite_v1/test_admin_service.py | 177 ++++++++++++++++++ 10 files changed, 594 insertions(+) diff --git a/google/cloud/pubsublite_v1/__init__.py b/google/cloud/pubsublite_v1/__init__.py index fbcd53a0..bbec89bf 100644 --- a/google/cloud/pubsublite_v1/__init__.py +++ b/google/cloud/pubsublite_v1/__init__.py @@ -47,6 +47,9 @@ from .types.admin import ListTopicsResponse from .types.admin import ListTopicSubscriptionsRequest from .types.admin import ListTopicSubscriptionsResponse +from .types.admin import OperationMetadata +from .types.admin import SeekSubscriptionRequest +from .types.admin import SeekSubscriptionResponse from .types.admin import TopicPartitions from .types.admin import UpdateReservationRequest from .types.admin import UpdateSubscriptionRequest @@ -147,6 +150,7 @@ "MessagePublishRequest", "MessagePublishResponse", "MessageResponse", + "OperationMetadata", "PartitionAssignment", "PartitionAssignmentAck", "PartitionAssignmentRequest", @@ -159,6 +163,8 @@ "Reservation", "SeekRequest", "SeekResponse", + "SeekSubscriptionRequest", + "SeekSubscriptionResponse", "SequencedCommitCursorRequest", "SequencedCommitCursorResponse", "SequencedMessage", diff --git a/google/cloud/pubsublite_v1/gapic_metadata.json b/google/cloud/pubsublite_v1/gapic_metadata.json index 6e66f383..1e350404 100644 --- a/google/cloud/pubsublite_v1/gapic_metadata.json +++ b/google/cloud/pubsublite_v1/gapic_metadata.json @@ -85,6 +85,11 @@ "list_topics" ] }, + "SeekSubscription": { + "methods": [ + "seek_subscription" + ] + }, "UpdateReservation": { "methods": [ "update_reservation" @@ -180,6 +185,11 @@ "list_topics" ] }, + "SeekSubscription": { + "methods": [ + "seek_subscription" + ] + }, "UpdateReservation": { "methods": [ "update_reservation" diff --git a/google/cloud/pubsublite_v1/services/admin_service/async_client.py b/google/cloud/pubsublite_v1/services/admin_service/async_client.py index cf3e1afb..bd98ad22 100644 --- a/google/cloud/pubsublite_v1/services/admin_service/async_client.py +++ b/google/cloud/pubsublite_v1/services/admin_service/async_client.py @@ -26,6 +26,8 @@ from google.auth import credentials as ga_credentials # type: ignore from google.oauth2 import service_account # type: ignore +from google.api_core import operation # type: ignore +from google.api_core import operation_async # type: ignore from google.cloud.pubsublite_v1.services.admin_service import pagers from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common @@ -1084,6 +1086,91 @@ async def delete_subscription( request, retry=retry, timeout=timeout, metadata=metadata, ) + async def seek_subscription( + self, + request: admin.SeekSubscriptionRequest = None, + *, + retry: retries.Retry = gapic_v1.method.DEFAULT, + timeout: float = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> operation_async.AsyncOperation: + r"""Performs an out-of-band seek for a subscription to a + specified target, which may be timestamps or named + positions within the message backlog. Seek translates + these targets to cursors for each partition and + orchestrates subscribers to start consuming messages + from these seek cursors. + + If an operation is returned, the seek has been + registered and subscribers will eventually receive + messages from the seek cursors (i.e. eventual + consistency), as long as they are using a minimum + supported client library version and not a system that + tracks cursors independently of Pub/Sub Lite (e.g. + Apache Beam, Dataflow, Spark). The seek operation will + fail for unsupported clients. + + If clients would like to know when subscribers react to + the seek (or not), they can poll the operation. The seek + operation will succeed and complete once subscribers are + ready to receive messages from the seek cursors for all + partitions of the topic. This means that the seek + operation will not complete until all subscribers come + online. + + If the previous seek operation has not yet completed, it + will be aborted and the new invocation of seek will + supersede it. + + Args: + request (:class:`google.cloud.pubsublite_v1.types.SeekSubscriptionRequest`): + The request object. Request for SeekSubscription. + retry (google.api_core.retry.Retry): Designation of what errors, if any, + should be retried. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + + Returns: + google.api_core.operation_async.AsyncOperation: + An object representing a long-running operation. + + The result type for the operation will be + :class:`google.cloud.pubsublite_v1.types.SeekSubscriptionResponse` + Response for SeekSubscription long running operation. + + """ + # Create or coerce a protobuf request object. + request = admin.SeekSubscriptionRequest(request) + + # Wrap the RPC method; this adds retry and timeout information, + # and friendly error handling. + rpc = gapic_v1.method_async.wrap_method( + self._client._transport.seek_subscription, + default_timeout=None, + client_info=DEFAULT_CLIENT_INFO, + ) + + # Certain fields should be provided within the metadata header; + # add these here. + metadata = tuple(metadata) + ( + gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), + ) + + # Send the request. + response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) + + # Wrap the response in an operation future. + response = operation_async.from_gapic( + response, + self._client._transport.operations_client, + admin.SeekSubscriptionResponse, + metadata_type=admin.OperationMetadata, + ) + + # Done; return the response. + return response + async def create_reservation( self, request: admin.CreateReservationRequest = None, diff --git a/google/cloud/pubsublite_v1/services/admin_service/client.py b/google/cloud/pubsublite_v1/services/admin_service/client.py index a4926aa1..ee16b493 100644 --- a/google/cloud/pubsublite_v1/services/admin_service/client.py +++ b/google/cloud/pubsublite_v1/services/admin_service/client.py @@ -30,6 +30,8 @@ from google.auth.exceptions import MutualTLSChannelError # type: ignore from google.oauth2 import service_account # type: ignore +from google.api_core import operation # type: ignore +from google.api_core import operation_async # type: ignore from google.cloud.pubsublite_v1.services.admin_service import pagers from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common @@ -1299,6 +1301,92 @@ def delete_subscription( request, retry=retry, timeout=timeout, metadata=metadata, ) + def seek_subscription( + self, + request: admin.SeekSubscriptionRequest = None, + *, + retry: retries.Retry = gapic_v1.method.DEFAULT, + timeout: float = None, + metadata: Sequence[Tuple[str, str]] = (), + ) -> operation.Operation: + r"""Performs an out-of-band seek for a subscription to a + specified target, which may be timestamps or named + positions within the message backlog. Seek translates + these targets to cursors for each partition and + orchestrates subscribers to start consuming messages + from these seek cursors. + + If an operation is returned, the seek has been + registered and subscribers will eventually receive + messages from the seek cursors (i.e. eventual + consistency), as long as they are using a minimum + supported client library version and not a system that + tracks cursors independently of Pub/Sub Lite (e.g. + Apache Beam, Dataflow, Spark). The seek operation will + fail for unsupported clients. + + If clients would like to know when subscribers react to + the seek (or not), they can poll the operation. The seek + operation will succeed and complete once subscribers are + ready to receive messages from the seek cursors for all + partitions of the topic. This means that the seek + operation will not complete until all subscribers come + online. + + If the previous seek operation has not yet completed, it + will be aborted and the new invocation of seek will + supersede it. + + Args: + request (google.cloud.pubsublite_v1.types.SeekSubscriptionRequest): + The request object. Request for SeekSubscription. + retry (google.api_core.retry.Retry): Designation of what errors, if any, + should be retried. + timeout (float): The timeout for this request. + metadata (Sequence[Tuple[str, str]]): Strings which should be + sent along with the request as metadata. + + Returns: + google.api_core.operation.Operation: + An object representing a long-running operation. + + The result type for the operation will be + :class:`google.cloud.pubsublite_v1.types.SeekSubscriptionResponse` + Response for SeekSubscription long running operation. + + """ + # Create or coerce a protobuf request object. + # Minor optimization to avoid making a copy if the user passes + # in a admin.SeekSubscriptionRequest. + # There's no risk of modifying the input as we've already verified + # there are no flattened fields. + if not isinstance(request, admin.SeekSubscriptionRequest): + request = admin.SeekSubscriptionRequest(request) + + # Wrap the RPC method; this adds retry and timeout information, + # and friendly error handling. + rpc = self._transport._wrapped_methods[self._transport.seek_subscription] + + # Certain fields should be provided within the metadata header; + # add these here. + metadata = tuple(metadata) + ( + gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), + ) + + # Send the request. + response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) + + # Wrap the response in an operation future. + response = operation.from_gapic( + response, + self._transport.operations_client, + admin.SeekSubscriptionResponse, + metadata_type=admin.OperationMetadata, + ) + + # Done; return the response. + return response + def create_reservation( self, request: admin.CreateReservationRequest = None, diff --git a/google/cloud/pubsublite_v1/services/admin_service/transports/base.py b/google/cloud/pubsublite_v1/services/admin_service/transports/base.py index 8b73bedf..eecedbe8 100644 --- a/google/cloud/pubsublite_v1/services/admin_service/transports/base.py +++ b/google/cloud/pubsublite_v1/services/admin_service/transports/base.py @@ -23,11 +23,13 @@ from google.api_core import exceptions as core_exceptions # type: ignore from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore +from google.api_core import operations_v1 # type: ignore from google.auth import credentials as ga_credentials # type: ignore from google.oauth2 import service_account # type: ignore from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common +from google.longrunning import operations_pb2 # type: ignore from google.protobuf import empty_pb2 # type: ignore try: @@ -196,6 +198,9 @@ def _prep_wrapped_messages(self, client_info): self.delete_subscription: gapic_v1.method.wrap_method( self.delete_subscription, default_timeout=None, client_info=client_info, ), + self.seek_subscription: gapic_v1.method.wrap_method( + self.seek_subscription, default_timeout=None, client_info=client_info, + ), self.create_reservation: gapic_v1.method.wrap_method( self.create_reservation, default_timeout=None, client_info=client_info, ), @@ -218,6 +223,11 @@ def _prep_wrapped_messages(self, client_info): ), } + @property + def operations_client(self) -> operations_v1.OperationsClient: + """Return the client designed to process long-running operations.""" + raise NotImplementedError() + @property def create_topic( self, @@ -327,6 +337,15 @@ def delete_subscription( ]: raise NotImplementedError() + @property + def seek_subscription( + self, + ) -> Callable[ + [admin.SeekSubscriptionRequest], + Union[operations_pb2.Operation, Awaitable[operations_pb2.Operation]], + ]: + raise NotImplementedError() + @property def create_reservation( self, diff --git a/google/cloud/pubsublite_v1/services/admin_service/transports/grpc.py b/google/cloud/pubsublite_v1/services/admin_service/transports/grpc.py index 6b9641a6..5fb9d4e5 100644 --- a/google/cloud/pubsublite_v1/services/admin_service/transports/grpc.py +++ b/google/cloud/pubsublite_v1/services/admin_service/transports/grpc.py @@ -17,6 +17,7 @@ from typing import Callable, Dict, Optional, Sequence, Tuple, Union from google.api_core import grpc_helpers # type: ignore +from google.api_core import operations_v1 # type: ignore from google.api_core import gapic_v1 # type: ignore import google.auth # type: ignore from google.auth import credentials as ga_credentials # type: ignore @@ -26,6 +27,7 @@ from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common +from google.longrunning import operations_pb2 # type: ignore from google.protobuf import empty_pb2 # type: ignore from .base import AdminServiceTransport, DEFAULT_CLIENT_INFO @@ -111,6 +113,7 @@ def __init__( self._grpc_channel = None self._ssl_channel_credentials = ssl_channel_credentials self._stubs: Dict[str, Callable] = {} + self._operations_client = None if api_mtls_endpoint: warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) @@ -226,6 +229,20 @@ def grpc_channel(self) -> grpc.Channel: """ return self._grpc_channel + @property + def operations_client(self) -> operations_v1.OperationsClient: + """Create the client designed to process long-running operations. + + This property caches on the instance; repeated calls return the same + client. + """ + # Sanity check: Only create a new client if we do not already have one. + if self._operations_client is None: + self._operations_client = operations_v1.OperationsClient(self.grpc_channel) + + # Return the client from cache. + return self._operations_client + @property def create_topic(self) -> Callable[[admin.CreateTopicRequest], common.Topic]: r"""Return a callable for the create topic method over gRPC. @@ -535,6 +552,58 @@ def delete_subscription( ) return self._stubs["delete_subscription"] + @property + def seek_subscription( + self, + ) -> Callable[[admin.SeekSubscriptionRequest], operations_pb2.Operation]: + r"""Return a callable for the seek subscription method over gRPC. + + Performs an out-of-band seek for a subscription to a + specified target, which may be timestamps or named + positions within the message backlog. Seek translates + these targets to cursors for each partition and + orchestrates subscribers to start consuming messages + from these seek cursors. + + If an operation is returned, the seek has been + registered and subscribers will eventually receive + messages from the seek cursors (i.e. eventual + consistency), as long as they are using a minimum + supported client library version and not a system that + tracks cursors independently of Pub/Sub Lite (e.g. + Apache Beam, Dataflow, Spark). The seek operation will + fail for unsupported clients. + + If clients would like to know when subscribers react to + the seek (or not), they can poll the operation. The seek + operation will succeed and complete once subscribers are + ready to receive messages from the seek cursors for all + partitions of the topic. This means that the seek + operation will not complete until all subscribers come + online. + + If the previous seek operation has not yet completed, it + will be aborted and the new invocation of seek will + supersede it. + + Returns: + Callable[[~.SeekSubscriptionRequest], + ~.Operation]: + A function that, when called, will call the underlying RPC + on the server. + """ + # Generate a "stub function" on-the-fly which will actually make + # the request. + # gRPC handles serialization and deserialization, so we just need + # to pass in the functions for each. + if "seek_subscription" not in self._stubs: + self._stubs["seek_subscription"] = self.grpc_channel.unary_unary( + "/google.cloud.pubsublite.v1.AdminService/SeekSubscription", + request_serializer=admin.SeekSubscriptionRequest.serialize, + response_deserializer=operations_pb2.Operation.FromString, + ) + return self._stubs["seek_subscription"] + @property def create_reservation( self, diff --git a/google/cloud/pubsublite_v1/services/admin_service/transports/grpc_asyncio.py b/google/cloud/pubsublite_v1/services/admin_service/transports/grpc_asyncio.py index 31d5aea4..2f2d04f1 100644 --- a/google/cloud/pubsublite_v1/services/admin_service/transports/grpc_asyncio.py +++ b/google/cloud/pubsublite_v1/services/admin_service/transports/grpc_asyncio.py @@ -18,6 +18,7 @@ from google.api_core import gapic_v1 # type: ignore from google.api_core import grpc_helpers_async # type: ignore +from google.api_core import operations_v1 # type: ignore from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore import packaging.version @@ -27,6 +28,7 @@ from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common +from google.longrunning import operations_pb2 # type: ignore from google.protobuf import empty_pb2 # type: ignore from .base import AdminServiceTransport, DEFAULT_CLIENT_INFO from .grpc import AdminServiceGrpcTransport @@ -158,6 +160,7 @@ def __init__( self._grpc_channel = None self._ssl_channel_credentials = ssl_channel_credentials self._stubs: Dict[str, Callable] = {} + self._operations_client = None if api_mtls_endpoint: warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) @@ -229,6 +232,22 @@ def grpc_channel(self) -> aio.Channel: # Return the channel from cache. return self._grpc_channel + @property + def operations_client(self) -> operations_v1.OperationsAsyncClient: + """Create the client designed to process long-running operations. + + This property caches on the instance; repeated calls return the same + client. + """ + # Sanity check: Only create a new client if we do not already have one. + if self._operations_client is None: + self._operations_client = operations_v1.OperationsAsyncClient( + self.grpc_channel + ) + + # Return the client from cache. + return self._operations_client + @property def create_topic( self, @@ -547,6 +566,58 @@ def delete_subscription( ) return self._stubs["delete_subscription"] + @property + def seek_subscription( + self, + ) -> Callable[[admin.SeekSubscriptionRequest], Awaitable[operations_pb2.Operation]]: + r"""Return a callable for the seek subscription method over gRPC. + + Performs an out-of-band seek for a subscription to a + specified target, which may be timestamps or named + positions within the message backlog. Seek translates + these targets to cursors for each partition and + orchestrates subscribers to start consuming messages + from these seek cursors. + + If an operation is returned, the seek has been + registered and subscribers will eventually receive + messages from the seek cursors (i.e. eventual + consistency), as long as they are using a minimum + supported client library version and not a system that + tracks cursors independently of Pub/Sub Lite (e.g. + Apache Beam, Dataflow, Spark). The seek operation will + fail for unsupported clients. + + If clients would like to know when subscribers react to + the seek (or not), they can poll the operation. The seek + operation will succeed and complete once subscribers are + ready to receive messages from the seek cursors for all + partitions of the topic. This means that the seek + operation will not complete until all subscribers come + online. + + If the previous seek operation has not yet completed, it + will be aborted and the new invocation of seek will + supersede it. + + Returns: + Callable[[~.SeekSubscriptionRequest], + Awaitable[~.Operation]]: + A function that, when called, will call the underlying RPC + on the server. + """ + # Generate a "stub function" on-the-fly which will actually make + # the request. + # gRPC handles serialization and deserialization, so we just need + # to pass in the functions for each. + if "seek_subscription" not in self._stubs: + self._stubs["seek_subscription"] = self.grpc_channel.unary_unary( + "/google.cloud.pubsublite.v1.AdminService/SeekSubscription", + request_serializer=admin.SeekSubscriptionRequest.serialize, + response_deserializer=operations_pb2.Operation.FromString, + ) + return self._stubs["seek_subscription"] + @property def create_reservation( self, diff --git a/google/cloud/pubsublite_v1/types/__init__.py b/google/cloud/pubsublite_v1/types/__init__.py index 56e6353e..3ea25f36 100644 --- a/google/cloud/pubsublite_v1/types/__init__.py +++ b/google/cloud/pubsublite_v1/types/__init__.py @@ -34,6 +34,9 @@ ListTopicsResponse, ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, + OperationMetadata, + SeekSubscriptionRequest, + SeekSubscriptionResponse, TopicPartitions, UpdateReservationRequest, UpdateSubscriptionRequest, @@ -114,6 +117,9 @@ "ListTopicsResponse", "ListTopicSubscriptionsRequest", "ListTopicSubscriptionsResponse", + "OperationMetadata", + "SeekSubscriptionRequest", + "SeekSubscriptionResponse", "TopicPartitions", "UpdateReservationRequest", "UpdateSubscriptionRequest", diff --git a/google/cloud/pubsublite_v1/types/admin.py b/google/cloud/pubsublite_v1/types/admin.py index 35edd29e..140b0b9c 100644 --- a/google/cloud/pubsublite_v1/types/admin.py +++ b/google/cloud/pubsublite_v1/types/admin.py @@ -17,6 +17,7 @@ from google.cloud.pubsublite_v1.types import common from google.protobuf import field_mask_pb2 # type: ignore +from google.protobuf import timestamp_pb2 # type: ignore __protobuf__ = proto.module( @@ -38,6 +39,9 @@ "ListSubscriptionsResponse", "UpdateSubscriptionRequest", "DeleteSubscriptionRequest", + "SeekSubscriptionRequest", + "SeekSubscriptionResponse", + "OperationMetadata", "CreateReservationRequest", "GetReservationRequest", "ListReservationsRequest", @@ -341,6 +345,63 @@ class DeleteSubscriptionRequest(proto.Message): name = proto.Field(proto.STRING, number=1,) +class SeekSubscriptionRequest(proto.Message): + r"""Request for SeekSubscription. + Attributes: + name (str): + Required. The name of the subscription to + seek. + named_target (google.cloud.pubsublite_v1.types.SeekSubscriptionRequest.NamedTarget): + Seek to a named position with respect to the + message backlog. + time_target (google.cloud.pubsublite_v1.types.TimeTarget): + Seek to the first message whose publish or + event time is greater than or equal to the + specified query time. If no such message can be + located, will seek to the end of the message + backlog. + """ + + class NamedTarget(proto.Enum): + r"""A named position with respect to the message backlog.""" + NAMED_TARGET_UNSPECIFIED = 0 + TAIL = 1 + HEAD = 2 + + name = proto.Field(proto.STRING, number=1,) + named_target = proto.Field(proto.ENUM, number=2, oneof="target", enum=NamedTarget,) + time_target = proto.Field( + proto.MESSAGE, number=3, oneof="target", message=common.TimeTarget, + ) + + +class SeekSubscriptionResponse(proto.Message): + r"""Response for SeekSubscription long running operation. """ + + +class OperationMetadata(proto.Message): + r"""Metadata for long running operations. + Attributes: + create_time (google.protobuf.timestamp_pb2.Timestamp): + The time the operation was created. + end_time (google.protobuf.timestamp_pb2.Timestamp): + The time the operation finished running. Not + set if the operation has not completed. + target (str): + Resource path for the target of the operation. For example, + targets of seeks are subscription resources, structured + like: + projects/{project_number}/locations/{location}/subscriptions/{subscription_id} + verb (str): + Name of the verb executed by the operation. + """ + + create_time = proto.Field(proto.MESSAGE, number=1, message=timestamp_pb2.Timestamp,) + end_time = proto.Field(proto.MESSAGE, number=2, message=timestamp_pb2.Timestamp,) + target = proto.Field(proto.STRING, number=3,) + verb = proto.Field(proto.STRING, number=4,) + + class CreateReservationRequest(proto.Message): r"""Request for CreateReservation. Attributes: diff --git a/tests/unit/gapic/pubsublite_v1/test_admin_service.py b/tests/unit/gapic/pubsublite_v1/test_admin_service.py index 26c42325..63596d3b 100644 --- a/tests/unit/gapic/pubsublite_v1/test_admin_service.py +++ b/tests/unit/gapic/pubsublite_v1/test_admin_service.py @@ -26,9 +26,12 @@ from google.api_core import client_options from google.api_core import exceptions as core_exceptions +from google.api_core import future from google.api_core import gapic_v1 from google.api_core import grpc_helpers from google.api_core import grpc_helpers_async +from google.api_core import operation_async # type: ignore +from google.api_core import operations_v1 from google.auth import credentials as ga_credentials from google.auth.exceptions import MutualTLSChannelError from google.cloud.pubsublite_v1.services.admin_service import AdminServiceAsyncClient @@ -40,9 +43,11 @@ ) from google.cloud.pubsublite_v1.types import admin from google.cloud.pubsublite_v1.types import common +from google.longrunning import operations_pb2 from google.oauth2 import service_account from google.protobuf import duration_pb2 # type: ignore from google.protobuf import field_mask_pb2 # type: ignore +from google.protobuf import timestamp_pb2 # type: ignore import google.auth @@ -3307,6 +3312,146 @@ async def test_delete_subscription_flattened_error_async(): ) +def test_seek_subscription( + transport: str = "grpc", request_type=admin.SeekSubscriptionRequest +): + client = AdminServiceClient( + credentials=ga_credentials.AnonymousCredentials(), transport=transport, + ) + + # Everything is optional in proto3 as far as the runtime is concerned, + # and we are mocking out the actual API, so just send an empty request. + request = request_type() + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.seek_subscription), "__call__" + ) as call: + # Designate an appropriate return value for the call. + call.return_value = operations_pb2.Operation(name="operations/spam") + response = client.seek_subscription(request) + + # Establish that the underlying gRPC stub method was called. + assert len(call.mock_calls) == 1 + _, args, _ = call.mock_calls[0] + assert args[0] == admin.SeekSubscriptionRequest() + + # Establish that the response is the type that we expect. + assert isinstance(response, future.Future) + + +def test_seek_subscription_from_dict(): + test_seek_subscription(request_type=dict) + + +def test_seek_subscription_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = AdminServiceClient( + credentials=ga_credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.seek_subscription), "__call__" + ) as call: + client.seek_subscription() + call.assert_called() + _, args, _ = call.mock_calls[0] + assert args[0] == admin.SeekSubscriptionRequest() + + +@pytest.mark.asyncio +async def test_seek_subscription_async( + transport: str = "grpc_asyncio", request_type=admin.SeekSubscriptionRequest +): + client = AdminServiceAsyncClient( + credentials=ga_credentials.AnonymousCredentials(), transport=transport, + ) + + # Everything is optional in proto3 as far as the runtime is concerned, + # and we are mocking out the actual API, so just send an empty request. + request = request_type() + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.seek_subscription), "__call__" + ) as call: + # Designate an appropriate return value for the call. + call.return_value = grpc_helpers_async.FakeUnaryUnaryCall( + operations_pb2.Operation(name="operations/spam") + ) + response = await client.seek_subscription(request) + + # Establish that the underlying gRPC stub method was called. + assert len(call.mock_calls) + _, args, _ = call.mock_calls[0] + assert args[0] == admin.SeekSubscriptionRequest() + + # Establish that the response is the type that we expect. + assert isinstance(response, future.Future) + + +@pytest.mark.asyncio +async def test_seek_subscription_async_from_dict(): + await test_seek_subscription_async(request_type=dict) + + +def test_seek_subscription_field_headers(): + client = AdminServiceClient(credentials=ga_credentials.AnonymousCredentials(),) + + # Any value that is part of the HTTP/1.1 URI should be sent as + # a field header. Set these to a non-empty value. + request = admin.SeekSubscriptionRequest() + + request.name = "name/value" + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.seek_subscription), "__call__" + ) as call: + call.return_value = operations_pb2.Operation(name="operations/op") + client.seek_subscription(request) + + # Establish that the underlying gRPC stub method was called. + assert len(call.mock_calls) == 1 + _, args, _ = call.mock_calls[0] + assert args[0] == request + + # Establish that the field header was sent. + _, _, kw = call.mock_calls[0] + assert ("x-goog-request-params", "name=name/value",) in kw["metadata"] + + +@pytest.mark.asyncio +async def test_seek_subscription_field_headers_async(): + client = AdminServiceAsyncClient(credentials=ga_credentials.AnonymousCredentials(),) + + # Any value that is part of the HTTP/1.1 URI should be sent as + # a field header. Set these to a non-empty value. + request = admin.SeekSubscriptionRequest() + + request.name = "name/value" + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.seek_subscription), "__call__" + ) as call: + call.return_value = grpc_helpers_async.FakeUnaryUnaryCall( + operations_pb2.Operation(name="operations/op") + ) + await client.seek_subscription(request) + + # Establish that the underlying gRPC stub method was called. + assert len(call.mock_calls) + _, args, _ = call.mock_calls[0] + assert args[0] == request + + # Establish that the field header was sent. + _, _, kw = call.mock_calls[0] + assert ("x-goog-request-params", "name=name/value",) in kw["metadata"] + + def test_create_reservation( transport: str = "grpc", request_type=admin.CreateReservationRequest ): @@ -4960,6 +5105,7 @@ def test_admin_service_base_transport(): "list_subscriptions", "update_subscription", "delete_subscription", + "seek_subscription", "create_reservation", "get_reservation", "list_reservations", @@ -4971,6 +5117,11 @@ def test_admin_service_base_transport(): with pytest.raises(NotImplementedError): getattr(transport, method)(request=object()) + # Additionally, the LRO client (a property) should + # also raise NotImplementedError + with pytest.raises(NotImplementedError): + transport.operations_client + @requires_google_auth_gte_1_25_0 def test_admin_service_base_transport_with_credentials_file(): @@ -5303,6 +5454,32 @@ def test_admin_service_transport_channel_mtls_with_adc(transport_class): assert transport.grpc_channel == mock_grpc_channel +def test_admin_service_grpc_lro_client(): + client = AdminServiceClient( + credentials=ga_credentials.AnonymousCredentials(), transport="grpc", + ) + transport = client.transport + + # Ensure that we have a api-core operations client. + assert isinstance(transport.operations_client, operations_v1.OperationsClient,) + + # Ensure that subsequent calls to the property send the exact same object. + assert transport.operations_client is transport.operations_client + + +def test_admin_service_grpc_lro_async_client(): + client = AdminServiceAsyncClient( + credentials=ga_credentials.AnonymousCredentials(), transport="grpc_asyncio", + ) + transport = client.transport + + # Ensure that we have a api-core operations client. + assert isinstance(transport.operations_client, operations_v1.OperationsAsyncClient,) + + # Ensure that subsequent calls to the property send the exact same object. + assert transport.operations_client is transport.operations_client + + def test_reservation_path(): project = "squid" location = "clam"