Skip to content

Commit

Permalink
Refactor of retryWhen to switch to a Spec/Builder model (#1979)
Browse files Browse the repository at this point in the history
This big commit is a large refactor of the `retryWhen` operator in order
to add several features.

Fixes #1978
Fixes #1905
Fixes #2063
Fixes #2052
Fixes #2064

 * Expose more state to `retryWhen` companion (#1978)

This introduces a retryWhen variant based on a `Retry` functional
interface. This "function" deals not with a Flux of `Throwable` but of
`RetrySignal`. This allows retry function to check if there was some
success (onNext) since last retry attempt, in which case the current
attempt can be interpreted as if this was the first ever error.

This is especially useful for cases where exponential backoff delays
should be reset, for long lived sequences that only see intermittent
bursts of errors (transient errors).

We take that opportunity to offer a builder for such a function that
could take transient errors into account.

 * the `Retry` builders

Inspired by the `Retry` builder in addons, we introduce two classes:
`RetrySpec` and `RetryBackoffSpec`. We name them Spec and not Builder
because they don't require to call a `build()` method. Rather, each
configuration step produces A) a new instance (copy on write) that B)
is by itself already a `Retry`.

The `Retry` + `xxxSpec` approach allows us to offer 2 standard
strategies that both support transient error handling, while letting
users write their own strategy (either as a standalone `Retry` concrete
implementation, or as a builder/spec that builds one).

Both specs allow to handle `transientErrors(boolean)`, which when true
relies on the extra state exposed by the `RetrySignal`. For the simple
case, this means that the remaining number of retries is reset in case
of onNext. For the exponential case, this means retry delay is reset to
minimum after an onNext (#1978).

Additionally, the introduction of the specs allows us to add more
features and support some features on more combinations, see below.

 * `filter` exceptions (#1905)

 Previously we could only filter exceptions to be retried on the simple
 long-based `retry` methods. With the specs we can `filter` in both
 immediate and exponential backoff retry strategies.

 * Add pre/post attempt hooks (#2063)

The specs let the user configure two types of pre/post hooks.
Note that if the retry attempt is denied (eg. we've reached the maximum
number of attempts), these hooks are NOT executed.

Synchronous hooks (`doBeforeRetry` and `doAfterRetry`) are side effects
that should not block for too long and are executed right before and
right after the retry trigger is sent by the companion publisher.

Asynchronous hooks (`doBeforeRetryAsync` and `doAfterRetryAsync`) are
composed into the companion publisher which generates the triggers, and
they both delay the emission of said trigger in non-blocking and
asynchronous fashion. Having pre and post hooks allows a user to better
manage the order in which these asynchronous side effect should be
performed.

 * Retry exhausted meaningful exception (#2052)

The `Retry` function implemented by both spec throw a `RuntimeException`
with a meaningful message when the configured maximum amount of attempts
is reached. That exception can be pinpointed by calling the utility
`Exceptions.isRetryExhausted` method.

For further customization, users can replace that default with their
own custom exception via `onRetryExhaustedThrow`. The BiFunction lets
user access the Spec, which has public final fields that can be
used to produce a meaningful message.

 * Ensure retry hooks completion is taken into account (#2064)

The old `retryBackoff` would internally use a `flatMap`, which can
cause issues. The Spec functions use `concatMap`.

 /!\ CAVEAT

This commit deprecates all of the retryBackoff methods as well as the
original `retryWhen` (based on Throwable companion publisher) in order
to introduce the new `RetrySignal` based signature.

The use of `Retry` explicit type lifts any ambiguity when using the Spec
but using a lambda instead will raise some ambiguity at call sites of
`retryWhen`.

We deem that acceptable given that the migration is quite easy
(turn `e -> whatever(e)` to `(Retry) rs -> whatever(rs.failure())`).
Furthermore, `retryWhen` is an advanced operator, and we expect most
uses to be combined with the retry builder in reactor-extra, which lifts
the ambiguity itself.
  • Loading branch information
simonbasle committed Mar 18, 2020
1 parent ce58185 commit 9e68bca
Show file tree
Hide file tree
Showing 24 changed files with 3,163 additions and 235 deletions.
5 changes: 3 additions & 2 deletions docs/asciidoc/apdx-operatorChoice.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,10 @@ I want to deal with:
** by falling back:
*** to a value: `onErrorReturn`
*** to a `Publisher` or `Mono`, possibly different ones depending on the error: `Flux#onErrorResume` and `Mono#onErrorResume`
** by retrying: `retry`
** by retrying...
*** ...with a simple policy (max number of attempts): `retry()`, `retry(long)`
*** ...triggered by a companion control Flux: `retryWhen`
*** ... using a standard backoff strategy (exponential backoff with jitter): `retryBackoff`
*** ...using a standard backoff strategy (exponential backoff with jitter): `retryWhen(Retry.backoff(...))`

* I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)...
** by throwing a special `IllegalStateException`: `Flux#onBackpressureError`
Expand Down
3 changes: 3 additions & 0 deletions docs/asciidoc/apdx-reactorExtra.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ Since 3.2.0, one of the most advanced retry strategies offered by these utilitie
also part of the `reactor-core` main artifact directly. Exponential backoff is
available as the `Flux#retryBackoff` operator.

Since 3.3.4, the `Retry` builder is offered directly in core and has a few more possible
customizations, being based on a `RetrySignal` that encapsulates additional state than the
error.

[[extra-schedulers]]
== Schedulers
Expand Down
62 changes: 58 additions & 4 deletions docs/asciidoc/coreFeatures.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -879,8 +879,8 @@ There is a more advanced version of `retry` (called `retryWhen`) that uses a "`c
created by the operator but decorated by the user, in order to customize the retry
condition.

The companion `Flux` is a `Flux<Throwable>` that gets passed to a `Function`, the sole
parameter of `retryWhen`. As the user, you define that function and make it return a new
The companion `Flux` is a `Flux<RetrySignal>` that gets passed to a `Retry` strategy/function,
supplied as the sole parameter of `retryWhen`. As the user, you define that function and make it return a new
`Publisher<?>`. Retry cycles go as follows:

. Each time an error happens (giving potential for a retry), the error is emitted into the
Expand All @@ -902,11 +902,13 @@ companion would effectively swallow an error. Consider the following way of emul
Flux<String> flux = Flux
.<String>error(new IllegalArgumentException()) // <1>
.doOnError(System.out::println) // <2>
.retryWhen(companion -> companion.take(3)); // <3>
.retryWhen(() -> // <3>
companion -> companion.take(3)); // <4>
----
<1> This continuously produces errors, calling for retry attempts.
<2> `doOnError` before the retry lets us log and see all failures.
<3> Here, we consider the first three errors as retry-able (`take(3)`) and then give up.
<3> The `Retry` function is passed as a `Supplier`
<4> Here, we consider the first three errors as retry-able (`take(3)`) and then give up.
====

In effect, the preceding example results in an empty `Flux`, but it completes successfully. Since
Expand All @@ -916,9 +918,61 @@ In effect, the preceding example results in an empty `Flux`, but it completes su
Getting to the same behavior involves a few additional tricks:
include::snippetRetryWhenRetry.adoc[]

TIP: One can use the builders exposed in `Retry` to achieve the same in a more fluent manner, as
well as more finely tuned retry strategies: `errorFlux.retryWhen(Retry.max(3));`.

TIP: You can use similar code to implement an "`exponential backoff and retry`" pattern,
as shown in the <<faq.exponentialBackoff,FAQ>>.

The core-provided `Retry` builders, `RetrySpec` and `RetryBackoffSpec`, both allow advanced customizations like:

- setting the `filter(Predicate)` for the exceptions that can trigger a retry
- modifying such a previously set filter through `modifyErrorFilter(Function)`
- triggering a side effect like logging around the retry trigger (ie for backoff before and after the delay), provided the retry is validated (`doBeforeRetry()` and `doAfterRetry()` are additive)
- triggering an asynchronous `Mono<Void>` around the retry trigger, which allows to add asynchronous behavior on top of the base delay but thus further delay the trigger (`doBeforeRetryAsync` and `doAfterRetryAsync` are additive)
- customizing the exception in case the maximum number of attempts has been reached, through `onRetryExhaustedThrow(BiFunction)`.
By default, `Exceptions.retryExhausted(...)` is used, which can be distinguished with `Exceptions.isRetryExhausted(Throwable)`
- activating the handling of _transient errors_ (see below)

Transient error handling in the `Retry` specs makes use of `RetrySignal#totalRetriesInARow()`: to check whether to retry or not and to compute the retry delays, the index used is an alternative one that is reset to 0 each time an `onNext` is emitted.
This has the consequence that if a re-subscribed source generates some data before failing again, previous failures don't count toward the maximum number of retry attempts.
In the case of exponential backoff strategy, this also means that the next attempt will be back to using the minimum `Duration` backoff instead of a longer one.
This can be especially useful for long-lived sources that see sporadic bursts of errors (or _transient_ errors), where each burst should be retried with its own backoff.

====
[source,java]
----
AtomicInteger errorCount = new AtomicInteger(); // <1>
AtomicInteger transientHelper = new AtomicInteger();
Flux<Integer> transientFlux = Flux.<Integer>generate(sink -> {
int i = transientHelper.getAndIncrement();
if (i == 10) { // <2>
sink.next(i);
sink.complete();
}
else if (i % 3 == 0) { // <3>
sink.next(i);
}
else {
sink.error(new IllegalStateException("Transient error at " + i)); // <4>
}
})
.doOnError(e -> errorCount.incrementAndGet());
transientFlux.retryWhen(Retry.max(2).transientErrors(true)) // <5>
.blockLast();
assertThat(errorCount).hasValue(6); // <6>
----
<1> We will count the number of errors in the retried sequence.
<2> We `generate` a source that has bursts of errors. It will successfully complete when the counter reaches 10.
<3> If the `transientHelper` atomic is at a multiple of `3`, we emit `onNext` and thus end the current burst.
<4> In other cases we emit an `onError`. That's 2 out of 3 times, so bursts of 2 `onError` interrupted by 1 `onNext`.
<5> We use `retryWhen` on that source, configured for at most 2 retry attempts, but in `transientErrors` mode.
<6> At the end, the sequence reaches `onNext(10)` and completes, after `6` errors have been registered in `errorCount`.
====

Without the `transientErrors(true)`, the configured maximum attempt of `2` would be reached by the second burst and the sequence would fail after having emitted `onNext(3)`.

=== Handling Exceptions in Operators or Functions

In general, all operators can themselves contain code that potentially trigger an
Expand Down
36 changes: 19 additions & 17 deletions docs/asciidoc/faq.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -176,32 +176,34 @@ an unstable state and is not likely to immediately recover from it. So blindly
retrying immediately is likely to produce yet another error and add to the
instability.

Since `3.2.0.RELEASE`, Reactor comes with such a retry baked in: `Flux.retryBackoff`.
Since `3.3.4.RELEASE`, Reactor comes with a builder for such a retry baked in: `Retry.backoff`.

The following example shows how to implement an exponential backoff with `retryWhen`.
The following example showcases a simple use of the builder, with hooks logging message right before
and after the retry attempt delays.
It delays retries and increases the delay between each attempt (pseudocode:
delay = attempt number * 100 milliseconds):

====
[source,java]
----
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.doOnNext(s -> System.out.println(s + " at " + LocalTime.now())) // <1>
.zipWith(Flux.range(1, 4), (error, index) -> { // <2>
if (index < 4) return index;
else throw Exceptions.propagate(error);
})
.flatMap(index -> Mono.delay(Duration.ofMillis(index * 100))) // <3>
.doOnNext(s -> System.out.println("retried at " + LocalTime.now())) // <4>
);
Flux.<String>error(new IllegalStateException("boom"))
.doOnError(e -> { // <1>
errorCount.incrementAndGet();
System.out.println(e + " at " + LocalTime.now());
})
.retryWhen(Retry
.backoff(3, Duration.ofMillis(100)).jitter(0d) // <2>
.doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now())) // <3>
.onRetryExhaustedThrow((spec, rs) -> rs.failure()) // <4>
);
----
<1> We log the time of errors.
<2> We use the `retryWhen` + `zipWith` trick to propagate the error after three
retries.
<3> Through `flatMap`, we cause a delay that depends on the attempt's index.
<4> We also log the time at which the retry happens.
<1> We will log the time of errors emitted by the source and count them.
<2> We configure an exponential backoff retry with at most 3 attempts and no jitter.
<3> We also log the time at which the retry happens.
<4> By default an `Exceptions.retryExhausted` exception would be thrown, with the last `failure()` as a cause.
Here we customize that to directly emit the cause as `onError`.
====

When subscribed to, this fails and terminates after printing out the following:
Expand Down
24 changes: 12 additions & 12 deletions docs/asciidoc/snippetRetryWhenRetry.adoc
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
====
[source,java]
----
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.zipWith(Flux.range(1, 4), // <1>
(error, index) -> { // <2>
if (index < 4) return index; // <3>
else throw Exceptions.propagate(error); // <4>
})
);
Flux.<String>error(new IllegalArgumentException())
.doOnError(e -> errorCount.incrementAndGet())
.retryWhen(() -> companion -> // <1>
companion.map(rs -> { // <2>
if (rs.totalRetries() < 3) return rs.totalRetries(); // <3>
else throw Exceptions.propagate(rs.failure()); // <4>
})
);
----
<1> Trick one: use `zip` and a `range` of "number of acceptable retries + 1".
<2> The `zip` function lets you count the retries while keeping track of the original
error.
<3> To allow for three retries, indexes before 4 return a value to emit.
<1> `retryWhen` expects a `Supplier<Retry>`
<2> The companion emits `RetrySignal` objects, which bear number of retries so far and last failure
<3> To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index).
<4> In order to terminate the sequence in error, we throw the original exception after
these three retries.
====
48 changes: 48 additions & 0 deletions reactor-core/src/main/java/reactor/core/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@

package reactor.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;

import reactor.core.publisher.Flux;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

/**
* Global Reactor Core Exception handling and utils to operate on.
Expand Down Expand Up @@ -275,6 +279,19 @@ public static RejectedExecutionException failWithRejected(String message) {
return new ReactorRejectedExecutionException(message);
}

/**
* Return a new {@link RuntimeException} that represents too many failures on retry.
* This nature can be detected via {@link #isRetryExhausted(Throwable)}.
* The cause of the last retry attempt is passed and stored as this exception's {@link Throwable#getCause() cause}.
*
* @param message the message
* @param cause the cause of the last retry attempt that failed (or null if irrelevant)
* @return a new {@link RuntimeException} representing retry exhaustion due to too many attempts
*/
public static RuntimeException retryExhausted(String message, @Nullable Throwable cause) {
return cause == null ? new RetryExhaustedException(message) : new RetryExhaustedException(message, cause);
}

/**
* Check if the given exception represents an {@link #failWithOverflow() overflow}.
* @param t the {@link Throwable} error to check
Expand Down Expand Up @@ -324,6 +341,18 @@ public static boolean isMultiple(@Nullable Throwable t) {
return t instanceof CompositeException;
}

/**
* Check a {@link Throwable} to see if it indicates too many retry attempts have failed.
* Such an exception can be created via {@link #retryExhausted(long, Throwable)} or
* {@link #retryExhausted(Duration)}.
*
* @param t the {@link Throwable} to check, {@literal null} always yields {@literal false}
* @return true if the Throwable is an instance representing retry exhaustion, false otherwise
*/
public static boolean isRetryExhausted(@Nullable Throwable t) {
return t instanceof RetryExhaustedException;
}

/**
* Check a {@link Throwable} to see if it is a traceback, as created by the checkpoint operator or debug utilities.
*
Expand Down Expand Up @@ -667,6 +696,25 @@ static final class OverflowException extends IllegalStateException {
}
}

/**
* A specialized {@link IllegalStateException} to signify a {@link Flux#retryWhen(Retry) retry}
* has failed (eg. after N attempts, or a timeout).
*
* @see #retryExhausted(long, Throwable)
* @see #retryExhausted(Duration)
* @see #isRetryExhausted(Throwable)
*/
static final class RetryExhaustedException extends IllegalStateException {

RetryExhaustedException(String message) {
super(message);
}

RetryExhaustedException(String message, Throwable cause) {
super(message, cause);
}
}

static class ReactorRejectedExecutionException extends RejectedExecutionException {

ReactorRejectedExecutionException(String message, Throwable cause) {
Expand Down
Loading

0 comments on commit 9e68bca

Please sign in to comment.