Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry and Timeout policies for grpc and http #679

Merged
merged 29 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6c3e04b
Brings back the link we removed because it wasn’t online yet
elena-kolevska Mar 6, 2024
ecdab26
Retry decorator
elena-kolevska Feb 27, 2024
9923db1
Use a retry function instead of a decorator
elena-kolevska Feb 28, 2024
402fd41
adds retries for async
elena-kolevska Feb 28, 2024
9b6a762
Fixes types
elena-kolevska Feb 28, 2024
9e3e0d6
Timeout policy
elena-kolevska Feb 29, 2024
856c5f9
Cleanup
elena-kolevska Feb 29, 2024
fade34b
retry for http wip.
elena-kolevska Mar 1, 2024
a64bcaf
tests for http retries
elena-kolevska Mar 2, 2024
7fb45f9
New ruff version has some formatting changes
elena-kolevska Mar 2, 2024
ba398d5
Revert "New ruff version has some formatting changes"
elena-kolevska Mar 2, 2024
fc8d2f5
Fixes setting values with a default of None
elena-kolevska Mar 2, 2024
aacdd6f
Moves timeout
elena-kolevska Mar 3, 2024
34be94f
Adds a deprecation warning for the wait() method (#682)
elena-kolevska Mar 6, 2024
23c27b0
Pin ruff version
berndverst Mar 6, 2024
51d75ff
Pin ruff version in tox.ini
berndverst Mar 6, 2024
66d1736
Lint fix
elena-kolevska Apr 24, 2024
afd32c2
Merge branch 'main' into retry-timeout
elena-kolevska Apr 24, 2024
fbf883d
Merge branch 'main' into retry-timeout
berndverst May 1, 2024
f68a08b
Reorganises tests. Adds retry for actors.
elena-kolevska May 1, 2024
d034a0d
Linters
elena-kolevska May 1, 2024
32ecf28
Merge remote-tracking branch 'upstream/main' into retry-timeout
elena-kolevska May 1, 2024
ebc0e7e
Merge branch 'main' of github.com:elena-kolevska/python-sdk into retr…
elena-kolevska May 1, 2024
f98f36c
Merge branch 'retry-timeout' of github.com:elena-kolevska/python-sdk …
elena-kolevska May 1, 2024
b0ed644
Merge branch 'main' into retry-timeout
elena-kolevska Jun 3, 2024
c22b2ee
Retry all calls
elena-kolevska Jun 4, 2024
37d8509
Small fix and docs update
elena-kolevska Jun 4, 2024
8c1891a
Adds a note to docs, for clarity
elena-kolevska Jun 17, 2024
63548df
Merge branch 'main' into retry-timeout
berndverst Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dapr/actor/client/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dapr.actor.id import ActorId
from dapr.actor.runtime._type_utils import get_dispatchable_attrs_from_interface
from dapr.clients import DaprActorClientBase, DaprActorHttpClient
from dapr.clients.retry import RetryPolicy
from dapr.serializers import Serializer, DefaultJSONSerializer
from dapr.conf import settings

Expand Down Expand Up @@ -50,9 +51,12 @@
self,
message_serializer=DefaultJSONSerializer(),
http_timeout_seconds: int = settings.DAPR_HTTP_TIMEOUT_SECONDS,
retry_policy: Optional[RetryPolicy] = None,
):
# TODO: support serializer for state store later
self._dapr_client = DaprActorHttpClient(message_serializer, timeout=http_timeout_seconds)
self._dapr_client = DaprActorHttpClient(

Check warning on line 57 in dapr/actor/client/proxy.py

View check run for this annotation

Codecov / codecov/patch

dapr/actor/client/proxy.py#L57

Added line #L57 was not covered by tests
message_serializer, timeout=http_timeout_seconds, retry_policy=retry_policy
)
self._message_serializer = message_serializer

def create(
Expand Down
43 changes: 28 additions & 15 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
from dapr.conf.helpers import GrpcEndpoint
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
from dapr.version import __version__

from dapr.aio.clients.grpc._asynchelpers import DaprClientInterceptorAsync
from dapr.aio.clients.grpc.interceptors import (
DaprClientInterceptorAsync,
DaprClientTimeoutInterceptorAsync,
)
from dapr.clients.grpc._helpers import (
MetadataTuple,
to_bytes,
Expand Down Expand Up @@ -118,6 +122,7 @@
]
] = None,
max_grpc_message_length: Optional[int] = None,
retry_policy: Optional[RetryPolicy] = None,
):
"""Connects to Dapr Runtime and initialize gRPC client stub.

Expand All @@ -131,6 +136,7 @@
message length in bytes.
"""
DaprHealth.wait_until_ready()
self.retry_policy = retry_policy or RetryPolicy()

useragent = f'dapr-sdk-python/{__version__}'
if not max_grpc_message_length:
Expand All @@ -154,26 +160,32 @@
except ValueError as error:
raise DaprInternalError(f'{error}') from error

if self._uri.tls:
self._channel = grpc.aio.secure_channel(
self._uri.endpoint, credentials=self.get_credentials(), options=options
) # type: ignore
# Prepare interceptors
if interceptors is None:
interceptors = [DaprClientTimeoutInterceptorAsync()]
else:
self._channel = grpc.aio.insecure_channel(self._uri.endpoint, options) # type: ignore
interceptors.append(DaprClientTimeoutInterceptorAsync())

Check warning on line 167 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L167

Added line #L167 was not covered by tests

if settings.DAPR_API_TOKEN:
api_token_interceptor = DaprClientInterceptorAsync(
[
('dapr-api-token', settings.DAPR_API_TOKEN),
]
)
self._channel = grpc.aio.insecure_channel( # type: ignore
address, options=options, interceptors=(api_token_interceptor,)
)
if interceptors:
self._channel = grpc.aio.insecure_channel( # type: ignore
address, options=options, *interceptors
)
interceptors.append(api_token_interceptor)

# Create gRPC channel
if self._uri.tls:
self._channel = grpc.aio.secure_channel(
self._uri.endpoint,
credentials=self.get_credentials(),
options=options,
interceptors=interceptors,
) # type: ignore
else:
self._channel = grpc.aio.insecure_channel(
self._uri.endpoint, options, interceptors=interceptors
) # type: ignore

self._stub = api_service_v1.DaprStub(self._channel)

Expand Down Expand Up @@ -713,8 +725,9 @@

req = api_v1.SaveStateRequest(store_name=store_name, states=[state])
try:
call = self._stub.SaveState(req, metadata=metadata)
await call
result, call = await self.retry_policy.run_rpc_async(
self._stub.SaveState, req, metadata=metadata
)
return DaprResponse(headers=await call.initial_metadata())
except AioRpcError as e:
raise DaprInternalError(e.details()) from e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from grpc.aio import UnaryUnaryClientInterceptor, ClientCallDetails # type: ignore

from dapr.conf import settings


class _ClientCallDetailsAsync(
namedtuple(
Expand All @@ -33,6 +35,22 @@ class _ClientCallDetailsAsync(
pass


class DaprClientTimeoutInterceptorAsync(UnaryUnaryClientInterceptor):
def intercept_unary_unary(self, continuation, client_call_details, request):
# If a specific timeout is not set, create a new ClientCallDetails with the default timeout
if client_call_details.timeout is None:
new_client_call_details = _ClientCallDetailsAsync(
client_call_details.method,
settings.DAPR_API_TIMEOUT_SECONDS,
client_call_details.metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
)
return continuation(new_client_call_details, request)

return continuation(client_call_details, request)


class DaprClientInterceptorAsync(UnaryUnaryClientInterceptor):
"""The class implements a UnaryUnaryClientInterceptor from grpc to add an interceptor to add
additional headers to all calls as needed.
Expand Down
4 changes: 3 additions & 1 deletion dapr/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dapr.clients.grpc.client import DaprGrpcClient, MetadataTuple, InvokeMethodResponse
from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient
from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient
from dapr.clients.retry import RetryPolicy
from dapr.conf import settings
from google.protobuf.message import Message as GrpcMessage

Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(
] = None,
http_timeout_seconds: Optional[int] = None,
max_grpc_message_length: Optional[int] = None,
retry_policy: Optional[RetryPolicy] = None,
):
"""Connects to Dapr Runtime via gRPC and HTTP.

Expand All @@ -78,7 +80,7 @@ def __init__(
max_grpc_message_length (int, optional): The maximum grpc send and receive
message length in bytes.
"""
super().__init__(address, interceptors, max_grpc_message_length)
super().__init__(address, interceptors, max_grpc_message_length, retry_policy)
self.invocation_client = None

invocation_protocol = settings.DAPR_API_METHOD_INVOCATION_PROTOCOL.upper()
Expand Down
91 changes: 0 additions & 91 deletions dapr/clients/grpc/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
from collections import namedtuple
from typing import Dict, List, Union, Tuple, Optional
from enum import Enum
from google.protobuf.any_pb2 import Any as GrpcAny
from google.protobuf.message import Message as GrpcMessage
from grpc import UnaryUnaryClientInterceptor, ClientCallDetails # type: ignore

MetadataDict = Dict[str, List[Union[bytes, str]]]
MetadataTuple = Tuple[Tuple[str, Union[bytes, str]], ...]
Expand Down Expand Up @@ -78,96 +76,7 @@ def to_str(data: Union[str, bytes]) -> str:
raise f'invalid data type {type(data)}'


class _ClientCallDetails(
namedtuple(
'_ClientCallDetails',
['method', 'timeout', 'metadata', 'credentials', 'wait_for_ready', 'compression'],
),
ClientCallDetails,
):
"""This is an implementation of the ClientCallDetails interface needed for interceptors.
This class takes six named values and inherits the ClientCallDetails from grpc package.
This class encloses the values that describe a RPC to be invoked.
"""

pass


class DaprClientInterceptor(UnaryUnaryClientInterceptor):
"""The class implements a UnaryUnaryClientInterceptor from grpc to add an interceptor to add
additional headers to all calls as needed.

Examples:

interceptor = HeaderInterceptor([('header', 'value', )])
intercepted_channel = grpc.intercept_channel(grpc_channel, interceptor)

With multiple header values:

interceptor = HeaderInterceptor([('header1', 'value1', ), ('header2', 'value2', )])
intercepted_channel = grpc.intercept_channel(grpc_channel, interceptor)
"""

def __init__(self, metadata: List[Tuple[str, str]]):
"""Initializes the metadata field for the class.

Args:
metadata list[tuple[str, str]]: list of tuple of (key, value) strings
representing header values
"""

self._metadata = metadata

def _intercept_call(self, client_call_details: ClientCallDetails) -> ClientCallDetails:
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
call details.

Args:
client_call_details :class: `ClientCallDetails`: object that describes a RPC
to be invoked

Returns:
:class: `ClientCallDetails` modified call details
"""

metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.extend(self._metadata)

new_call_details = _ClientCallDetails(
client_call_details.method,
client_call_details.timeout,
metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
client_call_details.compression,
)
return new_call_details

def intercept_unary_unary(self, continuation, client_call_details, request):
"""This method intercepts a unary-unary gRPC call. This is the implementation of the
abstract method defined in UnaryUnaryClientInterceptor defined in grpc. This is invoked
automatically by grpc based on the order in which interceptors are added to the channel.

Args:
continuation: a callable to be invoked to continue with the RPC or next interceptor
client_call_details: a ClientCallDetails object describing the outgoing RPC
request: the request value for the RPC

Returns:
A response object after invoking the continuation callable
"""
# Pre-process or intercept call
new_call_details = self._intercept_call(client_call_details)
# Call continuation
response = continuation(new_call_details, request)
return response


# Data validation helpers


def validateNotNone(**kwargs: Optional[str]):
for field_name, value in kwargs.items():
if value is None:
Expand Down
Loading