From bdebd6331f9c0d3d1a8ceaf274f07d2ed75bfe92 Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo <63511057+acocuzzo@users.noreply.github.com> Date: Thu, 7 Sep 2023 14:04:43 -0400 Subject: [PATCH] feat: Add grpc Compression argument to channels and methods (#451) * (feat): Add grpc Compression argument * Add compression arg to channel creation * fix linter errors * fix linter errors * refactor with new lib * reformat * fix tests * add compression after refactor: * fix lint * fix unit tests * fix unit tests * fix operation * remove unused import * remove compression for grpc_gcp.secure_channel call * fix method.py comment * Update grpc_helpers.py Remove experimental disclaimer * Update grpc_helpers_async.py Remove experimental disclaimer * Update google/api_core/operations_v1/operations_client.py Co-authored-by: Anthonios Partheniou * Update google/api_core/operations_v1/operations_client.py Co-authored-by: Anthonios Partheniou * Update google/api_core/operations_v1/operations_client.py Co-authored-by: Anthonios Partheniou * Update google/api_core/operations_v1/operations_async_client.py Co-authored-by: Anthonios Partheniou * Update google/api_core/operations_v1/operations_async_client.py Co-authored-by: Anthonios Partheniou * Update google/api_core/operations_v1/operations_async_client.py Co-authored-by: Anthonios Partheniou * Update google/api_core/operations_v1/operations_async_client.py Co-authored-by: Anthonios Partheniou * Update google/api_core/operations_v1/operations_client.py Co-authored-by: Anthonios Partheniou --------- Co-authored-by: Anthonios Partheniou --- google/api_core/gapic_v1/method.py | 50 +++++++++++---- google/api_core/gapic_v1/method_async.py | 17 +++-- google/api_core/grpc_helpers.py | 28 +++++++-- google/api_core/grpc_helpers_async.py | 7 ++- google/api_core/operation.py | 18 ++++-- google/api_core/operation_async.py | 8 ++- .../abstract_operations_client.py | 9 +++ .../operations_v1/operations_async_client.py | 43 +++++++++++-- .../operations_v1/operations_client.py | 47 ++++++++++++-- .../api_core/operations_v1/transports/base.py | 5 ++ .../api_core/operations_v1/transports/rest.py | 5 ++ tests/asyncio/gapic/test_method_async.py | 58 ++++++++++++++--- .../test_operations_async_client.py | 22 +++++-- tests/asyncio/test_grpc_helpers_async.py | 62 ++++++++++++++----- tests/unit/gapic/test_method.py | 27 +++++--- tests/unit/test_grpc_helpers.py | 60 +++++++++++++----- 16 files changed, 377 insertions(+), 89 deletions(-) diff --git a/google/api_core/gapic_v1/method.py b/google/api_core/gapic_v1/method.py index 0c1624a3..e6df1332 100644 --- a/google/api_core/gapic_v1/method.py +++ b/google/api_core/gapic_v1/method.py @@ -15,7 +15,7 @@ """Helpers for wrapping low-level gRPC methods with common functionality. This is used by gapic clients to provide common error mapping, retry, timeout, -pagination, and long-running operations to gRPC methods. +compression, pagination, and long-running operations to gRPC methods. """ import enum @@ -38,7 +38,7 @@ class _MethodDefault(enum.Enum): DEFAULT = _MethodDefault._DEFAULT_VALUE -"""Sentinel value indicating that a retry or timeout argument was unspecified, +"""Sentinel value indicating that a retry, timeout, or compression argument was unspecified, so the default should be used.""" @@ -72,20 +72,33 @@ class _GapicCallable(object): after its start, not to be confused with deadline). If ``None``, this callable will not specify a timeout argument to the low-level RPC method. + compression (grpc.Compression): The default compression for the callable. + If ``None``, this callable will not specify a compression argument + to the low-level RPC method. metadata (Sequence[Tuple[str, str]]): Additional metadata that is provided to the RPC method on every invocation. This is merged with any metadata specified during invocation. If ``None``, no additional metadata will be passed to the RPC method. """ - def __init__(self, target, retry, timeout, metadata=None): + def __init__( + self, + target, + retry, + timeout, + compression, + metadata=None, + ): self._target = target self._retry = retry self._timeout = timeout + self._compression = compression self._metadata = metadata - def __call__(self, *args, timeout=DEFAULT, retry=DEFAULT, **kwargs): - """Invoke the low-level RPC with retry, timeout, and metadata.""" + def __call__( + self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs + ): + """Invoke the low-level RPC with retry, timeout, compression, and metadata.""" if retry is DEFAULT: retry = self._retry @@ -93,6 +106,9 @@ def __call__(self, *args, timeout=DEFAULT, retry=DEFAULT, **kwargs): if timeout is DEFAULT: timeout = self._timeout + if compression is DEFAULT: + compression = self._compression + if isinstance(timeout, (int, float)): timeout = TimeToDeadlineTimeout(timeout=timeout) @@ -109,6 +125,8 @@ def __call__(self, *args, timeout=DEFAULT, retry=DEFAULT, **kwargs): metadata = list(metadata) metadata.extend(self._metadata) kwargs["metadata"] = metadata + if self._compression is not None: + kwargs["compression"] = compression return wrapped_func(*args, **kwargs) @@ -117,12 +135,13 @@ def wrap_method( func, default_retry=None, default_timeout=None, + default_compression=None, client_info=client_info.DEFAULT_CLIENT_INFO, ): """Wrap an RPC method with common behavior. - This applies common error wrapping, retry, and timeout behavior a function. - The wrapped function will take optional ``retry`` and ``timeout`` + This applies common error wrapping, retry, timeout, and compression behavior to a function. + The wrapped function will take optional ``retry``, ``timeout``, and ``compression`` arguments. For example:: @@ -130,6 +149,7 @@ def wrap_method( import google.api_core.gapic_v1.method from google.api_core import retry from google.api_core import timeout + from grpc import Compression # The original RPC method. def get_topic(name, timeout=None): @@ -138,6 +158,7 @@ def get_topic(name, timeout=None): default_retry = retry.Retry(deadline=60) default_timeout = timeout.Timeout(deadline=60) + default_compression = Compression.NoCompression wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method( get_topic, default_retry) @@ -186,6 +207,9 @@ def get_topic(name, timeout=None): default_timeout (Optional[google.api_core.Timeout]): The default timeout strategy. Can also be specified as an int or float. If ``None``, the method will not have timeout specified by default. + default_compression (Optional[grpc.Compression]): The default + grpc.Compression. If ``None``, the method will not have + compression specified by default. client_info (Optional[google.api_core.gapic_v1.client_info.ClientInfo]): Client information used to create a user-agent string that's @@ -194,12 +218,12 @@ def get_topic(name, timeout=None): metadata will be provided to the RPC method. Returns: - Callable: A new callable that takes optional ``retry`` and ``timeout`` - arguments and applies the common error mapping, retry, timeout, + Callable: A new callable that takes optional ``retry``, ``timeout``, + and ``compression`` + arguments and applies the common error mapping, retry, timeout, compression, and metadata behavior to the low-level RPC method. """ func = grpc_helpers.wrap_errors(func) - if client_info is not None: user_agent_metadata = [client_info.to_grpc_metadata()] else: @@ -207,6 +231,10 @@ def get_topic(name, timeout=None): return functools.wraps(func)( _GapicCallable( - func, default_retry, default_timeout, metadata=user_agent_metadata + func, + default_retry, + default_timeout, + default_compression, + metadata=user_agent_metadata, ) ) diff --git a/google/api_core/gapic_v1/method_async.py b/google/api_core/gapic_v1/method_async.py index 84c99aa2..24880756 100644 --- a/google/api_core/gapic_v1/method_async.py +++ b/google/api_core/gapic_v1/method_async.py @@ -14,7 +14,7 @@ """AsyncIO helpers for wrapping gRPC methods with common functionality. This is used by gapic clients to provide common error mapping, retry, timeout, -pagination, and long-running operations to gRPC methods. +compression, pagination, and long-running operations to gRPC methods. """ import functools @@ -30,19 +30,26 @@ def wrap_method( func, default_retry=None, default_timeout=None, + default_compression=None, client_info=client_info.DEFAULT_CLIENT_INFO, ): """Wrap an async RPC method with common behavior. Returns: - Callable: A new callable that takes optional ``retry`` and ``timeout`` - arguments and applies the common error mapping, retry, timeout, - and metadata behavior to the low-level RPC method. + Callable: A new callable that takes optional ``retry``, ``timeout``, + and ``compression`` arguments and applies the common error mapping, + retry, timeout, metadata, and compression behavior to the low-level RPC method. """ func = grpc_helpers_async.wrap_errors(func) metadata = [client_info.to_grpc_metadata()] if client_info is not None else None return functools.wraps(func)( - _GapicCallable(func, default_retry, default_timeout, metadata=metadata) + _GapicCallable( + func, + default_retry, + default_timeout, + default_compression, + metadata=metadata, + ) ) diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py index 102dc0a0..f52e180a 100644 --- a/google/api_core/grpc_helpers.py +++ b/google/api_core/grpc_helpers.py @@ -16,6 +16,7 @@ import collections import functools +import logging import warnings import grpc @@ -51,6 +52,8 @@ # The list of gRPC Callable interfaces that return iterators. _STREAM_WRAP_CLASSES = (grpc.UnaryStreamMultiCallable, grpc.StreamStreamMultiCallable) +_LOGGER = logging.getLogger(__name__) + def _patch_callable_name(callable_): """Fix-up gRPC callable attributes. @@ -276,7 +279,8 @@ def create_channel( quota_project_id=None, default_scopes=None, default_host=None, - **kwargs + compression=None, + **kwargs, ): """Create a secure channel with credentials. @@ -297,6 +301,8 @@ def create_channel( default_scopes (Sequence[str]): Default scopes passed by a Google client library. Use 'scopes' for user-defined scopes. default_host (str): The default endpoint. e.g., "pubsub.googleapis.com". + compression (grpc.Compression): An optional value indicating the + compression method to be used over the lifetime of the channel. kwargs: Additional key-word args passed to :func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`. Note: `grpc_gcp` is only supported in environments with protobuf < 4.0.0. @@ -319,12 +325,18 @@ def create_channel( ) if HAS_GRPC_GCP: # pragma: NO COVER + if compression is not None and compression != grpc.Compression.NoCompression: + _LOGGER.debug( + "Compression argument is being ignored for grpc_gcp.secure_channel creation." + ) return grpc_gcp.secure_channel(target, composite_credentials, **kwargs) - return grpc.secure_channel(target, composite_credentials, **kwargs) + return grpc.secure_channel( + target, composite_credentials, compression=compression, **kwargs + ) _MethodCall = collections.namedtuple( - "_MethodCall", ("request", "timeout", "metadata", "credentials") + "_MethodCall", ("request", "timeout", "metadata", "credentials", "compression") ) _ChannelRequest = collections.namedtuple("_ChannelRequest", ("method", "request")) @@ -351,11 +363,15 @@ def __init__(self, method, channel): """List[protobuf.Message]: All requests sent to this callable.""" self.calls = [] """List[Tuple]: All invocations of this callable. Each tuple is the - request, timeout, metadata, and credentials.""" + request, timeout, metadata, compression, and credentials.""" - def __call__(self, request, timeout=None, metadata=None, credentials=None): + def __call__( + self, request, timeout=None, metadata=None, credentials=None, compression=None + ): self._channel.requests.append(_ChannelRequest(self._method, request)) - self.calls.append(_MethodCall(request, timeout, metadata, credentials)) + self.calls.append( + _MethodCall(request, timeout, metadata, credentials, compression) + ) self.requests.append(request) response = self.response diff --git a/google/api_core/grpc_helpers_async.py b/google/api_core/grpc_helpers_async.py index 5a5bf2a6..d1f69d98 100644 --- a/google/api_core/grpc_helpers_async.py +++ b/google/api_core/grpc_helpers_async.py @@ -212,6 +212,7 @@ def create_channel( quota_project_id=None, default_scopes=None, default_host=None, + compression=None, **kwargs ): """Create an AsyncIO secure channel with credentials. @@ -233,6 +234,8 @@ def create_channel( default_scopes (Sequence[str]): Default scopes passed by a Google client library. Use 'scopes' for user-defined scopes. default_host (str): The default endpoint. e.g., "pubsub.googleapis.com". + compression (grpc.Compression): An optional value indicating the + compression method to be used over the lifetime of the channel. kwargs: Additional key-word args passed to :func:`aio.secure_channel`. Returns: @@ -252,7 +255,9 @@ def create_channel( default_host=default_host, ) - return aio.secure_channel(target, composite_credentials, **kwargs) + return aio.secure_channel( + target, composite_credentials, compression=compression, **kwargs + ) class FakeUnaryUnaryCall(_WrappedUnaryUnaryCall): diff --git a/google/api_core/operation.py b/google/api_core/operation.py index 90cbdc99..4b9c9a58 100644 --- a/google/api_core/operation.py +++ b/google/api_core/operation.py @@ -315,10 +315,16 @@ def from_grpc(operation, operations_stub, result_type, grpc_metadata=None, **kwa operation. """ refresh = functools.partial( - _refresh_grpc, operations_stub, operation.name, metadata=grpc_metadata + _refresh_grpc, + operations_stub, + operation.name, + metadata=grpc_metadata, ) cancel = functools.partial( - _cancel_grpc, operations_stub, operation.name, metadata=grpc_metadata + _cancel_grpc, + operations_stub, + operation.name, + metadata=grpc_metadata, ) return Operation(operation, refresh, cancel, result_type, **kwargs) @@ -347,9 +353,13 @@ def from_gapic(operation, operations_client, result_type, grpc_metadata=None, ** operation. """ refresh = functools.partial( - operations_client.get_operation, operation.name, metadata=grpc_metadata + operations_client.get_operation, + operation.name, + metadata=grpc_metadata, ) cancel = functools.partial( - operations_client.cancel_operation, operation.name, metadata=grpc_metadata + operations_client.cancel_operation, + operation.name, + metadata=grpc_metadata, ) return Operation(operation, refresh, cancel, result_type, **kwargs) diff --git a/google/api_core/operation_async.py b/google/api_core/operation_async.py index 6bae8654..2fd341d9 100644 --- a/google/api_core/operation_async.py +++ b/google/api_core/operation_async.py @@ -213,9 +213,13 @@ def from_gapic(operation, operations_client, result_type, grpc_metadata=None, ** operation. """ refresh = functools.partial( - operations_client.get_operation, operation.name, metadata=grpc_metadata + operations_client.get_operation, + operation.name, + metadata=grpc_metadata, ) cancel = functools.partial( - operations_client.cancel_operation, operation.name, metadata=grpc_metadata + operations_client.cancel_operation, + operation.name, + metadata=grpc_metadata, ) return AsyncOperation(operation, refresh, cancel, result_type, **kwargs) diff --git a/google/api_core/operations_v1/abstract_operations_client.py b/google/api_core/operations_v1/abstract_operations_client.py index 78a61746..714c2aae 100644 --- a/google/api_core/operations_v1/abstract_operations_client.py +++ b/google/api_core/operations_v1/abstract_operations_client.py @@ -33,6 +33,7 @@ from google.auth.transport import mtls # type: ignore from google.longrunning import operations_pb2 from google.oauth2 import service_account # type: ignore +import grpc OptionalRetry = Union[retries.Retry, object] @@ -368,6 +369,7 @@ def list_operations( page_token: Optional[str] = None, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, + compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListOperationsPager: r"""Lists operations that match the specified filter in the request. @@ -429,6 +431,7 @@ def list_operations( request, retry=retry, timeout=timeout, + compression=compression, metadata=metadata, ) @@ -450,6 +453,7 @@ def get_operation( *, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, + compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> operations_pb2.Operation: r"""Gets the latest state of a long-running operation. @@ -490,6 +494,7 @@ def get_operation( request, retry=retry, timeout=timeout, + compression=compression, metadata=metadata, ) @@ -502,6 +507,7 @@ def delete_operation( *, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, + compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> None: r"""Deletes a long-running operation. This method indicates that the @@ -541,6 +547,7 @@ def delete_operation( request, retry=retry, timeout=timeout, + compression=compression, metadata=metadata, ) @@ -550,6 +557,7 @@ def cancel_operation( *, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, + compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> None: r"""Starts asynchronous cancellation on a long-running operation. @@ -598,5 +606,6 @@ def cancel_operation( request, retry=retry, timeout=timeout, + compression=compression, metadata=metadata, ) diff --git a/google/api_core/operations_v1/operations_async_client.py b/google/api_core/operations_v1/operations_async_client.py index 81c4513c..72c68c70 100644 --- a/google/api_core/operations_v1/operations_async_client.py +++ b/google/api_core/operations_v1/operations_async_client.py @@ -29,6 +29,7 @@ from google.api_core import retry as retries from google.api_core import timeout as timeouts from google.longrunning import operations_pb2 +from grpc import Compression class OperationsAsyncClient: @@ -59,28 +60,34 @@ def __init__(self, channel, client_config=None): ) default_timeout = timeouts.TimeToDeadlineTimeout(timeout=600.0) + default_compression = Compression.NoCompression + self._get_operation = gapic_v1.method_async.wrap_method( self.operations_stub.GetOperation, default_retry=default_retry, default_timeout=default_timeout, + default_compression=default_compression, ) self._list_operations = gapic_v1.method_async.wrap_method( self.operations_stub.ListOperations, default_retry=default_retry, default_timeout=default_timeout, + default_compression=default_compression, ) self._cancel_operation = gapic_v1.method_async.wrap_method( self.operations_stub.CancelOperation, default_retry=default_retry, default_timeout=default_timeout, + default_compression=default_compression, ) self._delete_operation = gapic_v1.method_async.wrap_method( self.operations_stub.DeleteOperation, default_retry=default_retry, default_timeout=default_timeout, + default_compression=default_compression, ) async def get_operation( @@ -88,6 +95,7 @@ async def get_operation( name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT, + compression=gapic_v1.method_async.DEFAULT, metadata=None, ): """Gets the latest state of a long-running operation. @@ -114,6 +122,8 @@ async def get_operation( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + compression (grpc.Compression): An element of grpc.compression + e.g. grpc.compression.Gzip. metadata (Optional[List[Tuple[str, str]]]): Additional gRPC metadata. @@ -133,7 +143,11 @@ async def get_operation( metadata.append(gapic_v1.routing_header.to_grpc_metadata({"name": name})) return await self._get_operation( - request, retry=retry, timeout=timeout, metadata=metadata + request, + retry=retry, + timeout=timeout, + compression=compression, + metadata=metadata, ) async def list_operations( @@ -142,6 +156,7 @@ async def list_operations( filter_, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT, + compression=gapic_v1.method_async.DEFAULT, metadata=None, ): """ @@ -178,6 +193,8 @@ async def list_operations( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + compression (grpc.Compression): An element of grpc.compression + e.g. grpc.compression.Gzip. metadata (Optional[List[Tuple[str, str]]]): Additional gRPC metadata. @@ -202,7 +219,11 @@ async def list_operations( # Create the method used to fetch pages method = functools.partial( - self._list_operations, retry=retry, timeout=timeout, metadata=metadata + self._list_operations, + retry=retry, + timeout=timeout, + compression=compression, + metadata=metadata, ) iterator = page_iterator_async.AsyncGRPCIterator( @@ -221,6 +242,7 @@ async def cancel_operation( name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT, + compression=gapic_v1.method_async.DEFAULT, metadata=None, ): """Starts asynchronous cancellation on a long-running operation. @@ -261,6 +283,8 @@ async def cancel_operation( google.api_core.exceptions.GoogleAPICallError: If an error occurred while invoking the RPC, the appropriate ``GoogleAPICallError`` subclass will be raised. + compression (grpc.Compression): An element of grpc.compression + e.g. grpc.compression.Gzip. metadata (Optional[List[Tuple[str, str]]]): Additional gRPC metadata. """ @@ -272,7 +296,11 @@ async def cancel_operation( metadata.append(gapic_v1.routing_header.to_grpc_metadata({"name": name})) await self._cancel_operation( - request, retry=retry, timeout=timeout, metadata=metadata + request, + retry=retry, + timeout=timeout, + compression=compression, + metadata=metadata, ) async def delete_operation( @@ -280,6 +308,7 @@ async def delete_operation( name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT, + compression=gapic_v1.method_async.DEFAULT, metadata=None, ): """Deletes a long-running operation. @@ -306,6 +335,8 @@ async def delete_operation( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + compression (grpc.Compression): An element of grpc.compression + e.g. grpc.compression.Gzip. metadata (Optional[List[Tuple[str, str]]]): Additional gRPC metadata. @@ -325,5 +356,9 @@ async def delete_operation( metadata.append(gapic_v1.routing_header.to_grpc_metadata({"name": name})) await self._delete_operation( - request, retry=retry, timeout=timeout, metadata=metadata + request, + retry=retry, + timeout=timeout, + compression=compression, + metadata=metadata, ) diff --git a/google/api_core/operations_v1/operations_client.py b/google/api_core/operations_v1/operations_client.py index 3ddd3c47..d1d3fd55 100644 --- a/google/api_core/operations_v1/operations_client.py +++ b/google/api_core/operations_v1/operations_client.py @@ -43,6 +43,7 @@ from google.api_core import retry as retries from google.api_core import timeout as timeouts from google.longrunning import operations_pb2 +from grpc import Compression class OperationsClient(object): @@ -72,28 +73,34 @@ def __init__(self, channel, client_config=None): ) default_timeout = timeouts.TimeToDeadlineTimeout(timeout=600.0) + default_compression = Compression.NoCompression + self._get_operation = gapic_v1.method.wrap_method( self.operations_stub.GetOperation, default_retry=default_retry, default_timeout=default_timeout, + default_compression=default_compression, ) self._list_operations = gapic_v1.method.wrap_method( self.operations_stub.ListOperations, default_retry=default_retry, default_timeout=default_timeout, + default_compression=default_compression, ) self._cancel_operation = gapic_v1.method.wrap_method( self.operations_stub.CancelOperation, default_retry=default_retry, default_timeout=default_timeout, + default_compression=default_compression, ) self._delete_operation = gapic_v1.method.wrap_method( self.operations_stub.DeleteOperation, default_retry=default_retry, default_timeout=default_timeout, + default_compression=default_compression, ) # Service calls @@ -102,6 +109,7 @@ def get_operation( name, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, + compression=gapic_v1.method.DEFAULT, metadata=None, ): """Gets the latest state of a long-running operation. @@ -128,6 +136,8 @@ def get_operation( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + compression (grpc.Compression): An element of grpc.compression + e.g. grpc.compression.Gzip. metadata (Optional[List[Tuple[str, str]]]): Additional gRPC metadata. @@ -147,7 +157,11 @@ def get_operation( metadata.append(gapic_v1.routing_header.to_grpc_metadata({"name": name})) return self._get_operation( - request, retry=retry, timeout=timeout, metadata=metadata + request, + retry=retry, + timeout=timeout, + compression=compression, + metadata=metadata, ) def list_operations( @@ -156,6 +170,7 @@ def list_operations( filter_, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, + compression=gapic_v1.method.DEFAULT, metadata=None, ): """ @@ -192,6 +207,8 @@ def list_operations( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + compression (grpc.Compression): An element of grpc.compression + e.g. grpc.compression.Gzip. metadata (Optional[List[Tuple[str, str]]]): Additional gRPC metadata. @@ -216,7 +233,11 @@ def list_operations( # Create the method used to fetch pages method = functools.partial( - self._list_operations, retry=retry, timeout=timeout, metadata=metadata + self._list_operations, + retry=retry, + timeout=timeout, + compression=compression, + metadata=metadata, ) iterator = page_iterator.GRPCIterator( @@ -235,6 +256,7 @@ def cancel_operation( name, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, + compression=gapic_v1.method.DEFAULT, metadata=None, ): """Starts asynchronous cancellation on a long-running operation. @@ -267,6 +289,8 @@ def cancel_operation( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + compression (grpc.Compression): An element of grpc.compression + e.g. grpc.compression.Gzip. metadata (Optional[List[Tuple[str, str]]]): Additional gRPC metadata. @@ -285,13 +309,20 @@ def cancel_operation( metadata = metadata or [] metadata.append(gapic_v1.routing_header.to_grpc_metadata({"name": name})) - self._cancel_operation(request, retry=retry, timeout=timeout, metadata=metadata) + self._cancel_operation( + request, + retry=retry, + timeout=timeout, + compression=compression, + metadata=metadata, + ) def delete_operation( self, name, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, + compression=gapic_v1.method.DEFAULT, metadata=None, ): """Deletes a long-running operation. @@ -318,6 +349,8 @@ def delete_operation( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + compression (grpc.Compression): An element of grpc.compression + e.g. grpc.compression.Gzip. metadata (Optional[List[Tuple[str, str]]]): Additional gRPC metadata. @@ -336,4 +369,10 @@ def delete_operation( metadata = metadata or [] metadata.append(gapic_v1.routing_header.to_grpc_metadata({"name": name})) - self._delete_operation(request, retry=retry, timeout=timeout, metadata=metadata) + self._delete_operation( + request, + retry=retry, + timeout=timeout, + compression=compression, + metadata=metadata, + ) diff --git a/google/api_core/operations_v1/transports/base.py b/google/api_core/operations_v1/transports/base.py index e19bc3e8..98cf7896 100644 --- a/google/api_core/operations_v1/transports/base.py +++ b/google/api_core/operations_v1/transports/base.py @@ -26,6 +26,7 @@ from google.longrunning import operations_pb2 from google.oauth2 import service_account # type: ignore from google.protobuf import empty_pb2 # type: ignore +from grpc import Compression DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( @@ -129,6 +130,7 @@ def _prep_wrapped_messages(self, client_info): deadline=10.0, ), default_timeout=10.0, + default_compression=Compression.NoCompression, client_info=client_info, ), self.get_operation: gapic_v1.method.wrap_method( @@ -143,6 +145,7 @@ def _prep_wrapped_messages(self, client_info): deadline=10.0, ), default_timeout=10.0, + default_compression=Compression.NoCompression, client_info=client_info, ), self.delete_operation: gapic_v1.method.wrap_method( @@ -157,6 +160,7 @@ def _prep_wrapped_messages(self, client_info): deadline=10.0, ), default_timeout=10.0, + default_compression=Compression.NoCompression, client_info=client_info, ), self.cancel_operation: gapic_v1.method.wrap_method( @@ -171,6 +175,7 @@ def _prep_wrapped_messages(self, client_info): deadline=10.0, ), default_timeout=10.0, + default_compression=Compression.NoCompression, client_info=client_info, ), } diff --git a/google/api_core/operations_v1/transports/rest.py b/google/api_core/operations_v1/transports/rest.py index bb9000f4..49f99d21 100644 --- a/google/api_core/operations_v1/transports/rest.py +++ b/google/api_core/operations_v1/transports/rest.py @@ -29,6 +29,7 @@ from google.longrunning import operations_pb2 # type: ignore from google.protobuf import empty_pb2 # type: ignore from google.protobuf import json_format # type: ignore +import grpc from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO, OperationsTransport OptionalRetry = Union[retries.Retry, object] @@ -149,6 +150,7 @@ def _list_operations( *, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, + compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> operations_pb2.ListOperationsResponse: r"""Call the list operations method over HTTP. @@ -228,6 +230,7 @@ def _get_operation( *, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, + compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> operations_pb2.Operation: r"""Call the get operation method over HTTP. @@ -308,6 +311,7 @@ def _delete_operation( *, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, + compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> empty_pb2.Empty: r"""Call the delete operation method over HTTP. @@ -378,6 +382,7 @@ def _cancel_operation( *, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, + compression: Optional[grpc.Compression] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> empty_pb2.Empty: r"""Call the cancel operation method over HTTP. diff --git a/tests/asyncio/gapic/test_method_async.py b/tests/asyncio/gapic/test_method_async.py index 02d883f6..ee206979 100644 --- a/tests/asyncio/gapic/test_method_async.py +++ b/tests/asyncio/gapic/test_method_async.py @@ -18,7 +18,7 @@ import pytest try: - from grpc import aio + from grpc import aio, Compression except ImportError: pytest.skip("No GRPC", allow_module_level=True) @@ -92,6 +92,35 @@ async def test_wrap_method_with_custom_client_info(): assert client_info.to_grpc_metadata() in metadata +@pytest.mark.asyncio +async def test_wrap_method_with_no_compression(): + fake_call = grpc_helpers_async.FakeUnaryUnaryCall() + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call) + + wrapped_method = gapic_v1.method_async.wrap_method(method) + + await wrapped_method(1, 2, meep="moop", compression=None) + + method.assert_called_once_with(1, 2, meep="moop", metadata=mock.ANY) + + +@pytest.mark.asyncio +async def test_wrap_method_with_custom_compression(): + compression = Compression.Gzip + fake_call = grpc_helpers_async.FakeUnaryUnaryCall() + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call) + + wrapped_method = gapic_v1.method_async.wrap_method( + method, default_compression=compression + ) + + await wrapped_method(1, 2, meep="moop", compression=Compression.Deflate) + + method.assert_called_once_with( + 1, 2, meep="moop", metadata=mock.ANY, compression=Compression.Deflate + ) + + @pytest.mark.asyncio async def test_invoke_wrapped_method_with_metadata(): fake_call = grpc_helpers_async.FakeUnaryUnaryCall() @@ -126,7 +155,7 @@ async def test_invoke_wrapped_method_with_metadata_as_none(): @mock.patch("asyncio.sleep") @pytest.mark.asyncio -async def test_wrap_method_with_default_retry_and_timeout(unused_sleep): +async def test_wrap_method_with_default_retry_timeout_and_compression(unused_sleep): fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42) method = mock.Mock( spec=aio.UnaryUnaryMultiCallable, @@ -135,15 +164,18 @@ async def test_wrap_method_with_default_retry_and_timeout(unused_sleep): default_retry = retry_async.AsyncRetry() default_timeout = timeout.ConstantTimeout(60) + default_compression = Compression.Gzip wrapped_method = gapic_v1.method_async.wrap_method( - method, default_retry, default_timeout + method, default_retry, default_timeout, default_compression ) result = await wrapped_method() assert result == 42 assert method.call_count == 2 - method.assert_called_with(timeout=60, metadata=mock.ANY) + method.assert_called_with( + timeout=60, compression=default_compression, metadata=mock.ANY + ) @mock.patch("asyncio.sleep") @@ -157,23 +189,27 @@ async def test_wrap_method_with_default_retry_and_timeout_using_sentinel(unused_ default_retry = retry_async.AsyncRetry() default_timeout = timeout.ConstantTimeout(60) + default_compression = Compression.Gzip wrapped_method = gapic_v1.method_async.wrap_method( - method, default_retry, default_timeout + method, default_retry, default_timeout, default_compression ) result = await wrapped_method( retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT, + compression=gapic_v1.method_async.DEFAULT, ) assert result == 42 assert method.call_count == 2 - method.assert_called_with(timeout=60, metadata=mock.ANY) + method.assert_called_with( + timeout=60, compression=Compression.Gzip, metadata=mock.ANY + ) @mock.patch("asyncio.sleep") @pytest.mark.asyncio -async def test_wrap_method_with_overriding_retry_and_timeout(unused_sleep): +async def test_wrap_method_with_overriding_retry_timeout_and_compression(unused_sleep): fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42) method = mock.Mock( spec=aio.UnaryUnaryMultiCallable, @@ -182,8 +218,9 @@ async def test_wrap_method_with_overriding_retry_and_timeout(unused_sleep): default_retry = retry_async.AsyncRetry() default_timeout = timeout.ConstantTimeout(60) + default_compression = Compression.Gzip wrapped_method = gapic_v1.method_async.wrap_method( - method, default_retry, default_timeout + method, default_retry, default_timeout, default_compression ) result = await wrapped_method( @@ -191,11 +228,14 @@ async def test_wrap_method_with_overriding_retry_and_timeout(unused_sleep): retry_async.if_exception_type(exceptions.NotFound) ), timeout=timeout.ConstantTimeout(22), + compression=Compression.Deflate, ) assert result == 42 assert method.call_count == 2 - method.assert_called_with(timeout=22, metadata=mock.ANY) + method.assert_called_with( + timeout=22, compression=Compression.Deflate, metadata=mock.ANY + ) @pytest.mark.asyncio diff --git a/tests/asyncio/operations_v1/test_operations_async_client.py b/tests/asyncio/operations_v1/test_operations_async_client.py index 34236da7..19ac9b56 100644 --- a/tests/asyncio/operations_v1/test_operations_async_client.py +++ b/tests/asyncio/operations_v1/test_operations_async_client.py @@ -16,7 +16,7 @@ import pytest try: - from grpc import aio + from grpc import aio, Compression except ImportError: # pragma: NO COVER pytest.skip("No GRPC", allow_module_level=True) @@ -42,10 +42,13 @@ async def test_get_operation(): ) client = operations_v1.OperationsAsyncClient(mocked_channel) - response = await client.get_operation("name", metadata=[("header", "foo")]) + response = await client.get_operation( + "name", metadata=[("header", "foo")], compression=Compression.Gzip + ) assert method.call_count == 1 assert tuple(method.call_args_list[0])[0][0].name == "name" assert ("header", "foo") in tuple(method.call_args_list[0])[1]["metadata"] + assert tuple(method.call_args_list[0])[1]["compression"] == Compression.Gzip assert ("x-goog-request-params", "name=name") in tuple(method.call_args_list[0])[1][ "metadata" ] @@ -63,7 +66,9 @@ async def test_list_operations(): mocked_channel, method, fake_call = _mock_grpc_objects(list_response) client = operations_v1.OperationsAsyncClient(mocked_channel) - pager = await client.list_operations("name", "filter", metadata=[("header", "foo")]) + pager = await client.list_operations( + "name", "filter", metadata=[("header", "foo")], compression=Compression.Gzip + ) assert isinstance(pager, page_iterator_async.AsyncIterator) responses = [] @@ -74,6 +79,7 @@ async def test_list_operations(): assert method.call_count == 1 assert ("header", "foo") in tuple(method.call_args_list[0])[1]["metadata"] + assert tuple(method.call_args_list[0])[1]["compression"] == Compression.Gzip assert ("x-goog-request-params", "name=name") in tuple(method.call_args_list[0])[1][ "metadata" ] @@ -88,11 +94,14 @@ async def test_delete_operation(): mocked_channel, method, fake_call = _mock_grpc_objects(empty_pb2.Empty()) client = operations_v1.OperationsAsyncClient(mocked_channel) - await client.delete_operation("name", metadata=[("header", "foo")]) + await client.delete_operation( + "name", metadata=[("header", "foo")], compression=Compression.Gzip + ) assert method.call_count == 1 assert tuple(method.call_args_list[0])[0][0].name == "name" assert ("header", "foo") in tuple(method.call_args_list[0])[1]["metadata"] + assert tuple(method.call_args_list[0])[1]["compression"] == Compression.Gzip assert ("x-goog-request-params", "name=name") in tuple(method.call_args_list[0])[1][ "metadata" ] @@ -103,11 +112,14 @@ async def test_cancel_operation(): mocked_channel, method, fake_call = _mock_grpc_objects(empty_pb2.Empty()) client = operations_v1.OperationsAsyncClient(mocked_channel) - await client.cancel_operation("name", metadata=[("header", "foo")]) + await client.cancel_operation( + "name", metadata=[("header", "foo")], compression=Compression.Gzip + ) assert method.call_count == 1 assert tuple(method.call_args_list[0])[0][0].name == "name" assert ("header", "foo") in tuple(method.call_args_list[0])[1]["metadata"] + assert tuple(method.call_args_list[0])[1]["compression"] == Compression.Gzip assert ("x-goog-request-params", "name=name") in tuple(method.call_args_list[0])[1][ "metadata" ] diff --git a/tests/asyncio/test_grpc_helpers_async.py b/tests/asyncio/test_grpc_helpers_async.py index 33ce8212..95242f6b 100644 --- a/tests/asyncio/test_grpc_helpers_async.py +++ b/tests/asyncio/test_grpc_helpers_async.py @@ -292,7 +292,9 @@ def test_create_channel_implicit(grpc_secure_channel, default, composite_creds_c assert channel is grpc_secure_channel.return_value default.assert_called_once_with(scopes=None, default_scopes=None) - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("google.auth.transport.grpc.AuthMetadataPlugin", autospec=True) @@ -323,7 +325,9 @@ def test_create_channel_implicit_with_default_host( auth_metadata_plugin.assert_called_once_with( mock.sentinel.credentials, mock.sentinel.Request, default_host=default_host ) - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -344,7 +348,9 @@ def test_create_channel_implicit_with_ssl_creds( default.assert_called_once_with(scopes=None, default_scopes=None) composite_creds_call.assert_called_once_with(ssl_creds, mock.ANY) composite_creds = composite_creds_call.return_value - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -365,7 +371,9 @@ def test_create_channel_implicit_with_scopes( assert channel is grpc_secure_channel.return_value default.assert_called_once_with(scopes=["one", "two"], default_scopes=None) - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -382,13 +390,15 @@ def test_create_channel_implicit_with_default_scopes( composite_creds = composite_creds_call.return_value channel = grpc_helpers_async.create_channel( - target, default_scopes=["three", "four"] + target, default_scopes=["three", "four"], compression=grpc.Compression.Gzip ) assert channel is grpc_secure_channel.return_value default.assert_called_once_with(scopes=None, default_scopes=["three", "four"]) - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=grpc.Compression.Gzip + ) def test_create_channel_explicit_with_duplicate_credentials(): @@ -412,14 +422,16 @@ def test_create_channel_explicit(grpc_secure_channel, auth_creds, composite_cred composite_creds = composite_creds_call.return_value channel = grpc_helpers_async.create_channel( - target, credentials=mock.sentinel.credentials + target, credentials=mock.sentinel.credentials, compression=grpc.Compression.Gzip ) auth_creds.assert_called_once_with( mock.sentinel.credentials, scopes=None, default_scopes=None ) assert channel is grpc_secure_channel.return_value - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=grpc.Compression.Gzip + ) @mock.patch("grpc.composite_channel_credentials") @@ -433,12 +445,17 @@ def test_create_channel_explicit_scoped(grpc_secure_channel, composite_creds_cal credentials.requires_scopes = True channel = grpc_helpers_async.create_channel( - target, credentials=credentials, scopes=scopes + target, + credentials=credentials, + scopes=scopes, + compression=grpc.Compression.Gzip, ) credentials.with_scopes.assert_called_once_with(scopes, default_scopes=None) assert channel is grpc_secure_channel.return_value - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=grpc.Compression.Gzip + ) @mock.patch("grpc.composite_channel_credentials") @@ -454,14 +471,19 @@ def test_create_channel_explicit_default_scopes( credentials.requires_scopes = True channel = grpc_helpers_async.create_channel( - target, credentials=credentials, default_scopes=default_scopes + target, + credentials=credentials, + default_scopes=default_scopes, + compression=grpc.Compression.Gzip, ) credentials.with_scopes.assert_called_once_with( scopes=None, default_scopes=default_scopes ) assert channel is grpc_secure_channel.return_value - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=grpc.Compression.Gzip + ) @mock.patch("grpc.composite_channel_credentials") @@ -482,7 +504,9 @@ def test_create_channel_explicit_with_quota_project( credentials.with_quota_project.assert_called_once_with("project-foo") assert channel is grpc_secure_channel.return_value - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -508,7 +532,9 @@ def test_create_channel_with_credentials_file( credentials_file, scopes=None, default_scopes=None ) assert channel is grpc_secure_channel.return_value - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -535,7 +561,9 @@ def test_create_channel_with_credentials_file_and_scopes( credentials_file, scopes=scopes, default_scopes=None ) assert channel is grpc_secure_channel.return_value - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -562,7 +590,9 @@ def test_create_channel_with_credentials_file_and_default_scopes( credentials_file, scopes=None, default_scopes=default_scopes ) assert channel is grpc_secure_channel.return_value - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.aio.secure_channel") diff --git a/tests/unit/gapic/test_method.py b/tests/unit/gapic/test_method.py index 03c2260d..0623a5bc 100644 --- a/tests/unit/gapic/test_method.py +++ b/tests/unit/gapic/test_method.py @@ -121,21 +121,24 @@ def test_invoke_wrapped_method_with_metadata_as_none(): @mock.patch("time.sleep") -def test_wrap_method_with_default_retry_and_timeout(unused_sleep): +def test_wrap_method_with_default_retry_and_timeout_and_compression(unused_sleep): method = mock.Mock( spec=["__call__"], side_effect=[exceptions.InternalServerError(None), 42] ) default_retry = retry.Retry() default_timeout = timeout.ConstantTimeout(60) + default_compression = grpc.Compression.Gzip wrapped_method = google.api_core.gapic_v1.method.wrap_method( - method, default_retry, default_timeout + method, default_retry, default_timeout, default_compression ) result = wrapped_method() assert result == 42 assert method.call_count == 2 - method.assert_called_with(timeout=60, metadata=mock.ANY) + method.assert_called_with( + timeout=60, compression=default_compression, metadata=mock.ANY + ) @mock.patch("time.sleep") @@ -145,37 +148,45 @@ def test_wrap_method_with_default_retry_and_timeout_using_sentinel(unused_sleep) ) default_retry = retry.Retry() default_timeout = timeout.ConstantTimeout(60) + default_compression = grpc.Compression.Gzip wrapped_method = google.api_core.gapic_v1.method.wrap_method( - method, default_retry, default_timeout + method, default_retry, default_timeout, default_compression ) result = wrapped_method( retry=google.api_core.gapic_v1.method.DEFAULT, timeout=google.api_core.gapic_v1.method.DEFAULT, + compression=google.api_core.gapic_v1.method.DEFAULT, ) assert result == 42 assert method.call_count == 2 - method.assert_called_with(timeout=60, metadata=mock.ANY) + method.assert_called_with( + timeout=60, compression=default_compression, metadata=mock.ANY + ) @mock.patch("time.sleep") -def test_wrap_method_with_overriding_retry_and_timeout(unused_sleep): +def test_wrap_method_with_overriding_retry_timeout_compression(unused_sleep): method = mock.Mock(spec=["__call__"], side_effect=[exceptions.NotFound(None), 42]) default_retry = retry.Retry() default_timeout = timeout.ConstantTimeout(60) + default_compression = grpc.Compression.Gzip wrapped_method = google.api_core.gapic_v1.method.wrap_method( - method, default_retry, default_timeout + method, default_retry, default_timeout, default_compression ) result = wrapped_method( retry=retry.Retry(retry.if_exception_type(exceptions.NotFound)), timeout=timeout.ConstantTimeout(22), + compression=grpc.Compression.Deflate, ) assert result == 42 assert method.call_count == 2 - method.assert_called_with(timeout=22, metadata=mock.ANY) + method.assert_called_with( + timeout=22, compression=grpc.Compression.Deflate, metadata=mock.ANY + ) def test_wrap_method_with_overriding_timeout_as_a_number(): diff --git a/tests/unit/test_grpc_helpers.py b/tests/unit/test_grpc_helpers.py index a99dc280..4eccbcaa 100644 --- a/tests/unit/test_grpc_helpers.py +++ b/tests/unit/test_grpc_helpers.py @@ -359,7 +359,7 @@ def test_create_channel_implicit(grpc_secure_channel, default, composite_creds_c target = "example.com:443" composite_creds = composite_creds_call.return_value - channel = grpc_helpers.create_channel(target) + channel = grpc_helpers.create_channel(target, compression=grpc.Compression.Gzip) assert channel is grpc_secure_channel.return_value @@ -368,7 +368,9 @@ def test_create_channel_implicit(grpc_secure_channel, default, composite_creds_c if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=grpc.Compression.Gzip + ) @mock.patch("google.auth.transport.grpc.AuthMetadataPlugin", autospec=True) @@ -403,7 +405,9 @@ def test_create_channel_implicit_with_default_host( if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -430,7 +434,9 @@ def test_create_channel_implicit_with_ssl_creds( if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -455,7 +461,9 @@ def test_create_channel_implicit_with_scopes( if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -480,7 +488,9 @@ def test_create_channel_implicit_with_default_scopes( if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) def test_create_channel_explicit_with_duplicate_credentials(): @@ -512,7 +522,9 @@ def test_create_channel_explicit(grpc_secure_channel, auth_creds, composite_cred if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -536,7 +548,9 @@ def test_create_channel_explicit_scoped(grpc_secure_channel, composite_creds_cal if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -564,7 +578,9 @@ def test_create_channel_explicit_default_scopes( if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -590,7 +606,9 @@ def test_create_channel_explicit_with_quota_project( if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -619,7 +637,9 @@ def test_create_channel_with_credentials_file( if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -651,7 +671,9 @@ def test_create_channel_with_credentials_file_and_scopes( if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @mock.patch("grpc.composite_channel_credentials") @@ -683,7 +705,9 @@ def test_create_channel_with_credentials_file_and_default_scopes( if grpc_helpers.HAS_GRPC_GCP: # pragma: NO COVER grpc_secure_channel.assert_called_once_with(target, composite_creds, None) else: - grpc_secure_channel.assert_called_once_with(target, composite_creds) + grpc_secure_channel.assert_called_once_with( + target, composite_creds, compression=None + ) @pytest.mark.skipif( @@ -813,6 +837,7 @@ def test_call_info(self): stub = operations_pb2.OperationsStub(channel) expected_request = operations_pb2.GetOperationRequest(name="meep") expected_response = operations_pb2.Operation(name="moop") + expected_compression = grpc.Compression.NoCompression expected_metadata = [("red", "blue"), ("two", "shoe")] expected_credentials = mock.sentinel.credentials channel.GetOperation.response = expected_response @@ -820,6 +845,7 @@ def test_call_info(self): response = stub.GetOperation( expected_request, timeout=42, + compression=expected_compression, metadata=expected_metadata, credentials=expected_credentials, ) @@ -827,7 +853,13 @@ def test_call_info(self): assert response == expected_response assert channel.requests == [("GetOperation", expected_request)] assert channel.GetOperation.calls == [ - (expected_request, 42, expected_metadata, expected_credentials) + ( + expected_request, + 42, + expected_metadata, + expected_credentials, + expected_compression, + ) ] def test_unary_unary(self):