Skip to content

Commit

Permalink
test(scheduler): add test ensuring other actors can run in between re…
Browse files Browse the repository at this point in the history
…tries

(cherry picked from commit d2999fd)
  • Loading branch information
npepinpe authored and github-actions[bot] committed Oct 31, 2022
1 parent 82a27a4 commit b2722ee
Showing 1 changed file with 58 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,24 @@
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.testing.ControlledActorSchedulerExtension;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

final class RetryStrategyTest {

/** Ensure we use a single thread to better control the scheduling in the tests. */
@RegisterExtension
private final ControlledActorSchedulerExtension schedulerRule =
new ControlledActorSchedulerExtension();
new ControlledActorSchedulerExtension(
builder -> builder.setIoBoundActorThreadCount(0).setCpuBoundActorThreadCount(1));

private ActorFuture<Boolean> resultFuture;

Expand Down Expand Up @@ -66,6 +71,11 @@ void shouldStopWhenAbortConditionReturnsTrue(final TestCase<?> test) {
assertThat(resultFuture).succeedsWithin(Duration.ZERO).isEqualTo(false);
}

/**
* Only the {@link RecoverableRetryStrategy} and {@link AbortableRetryStrategy} stop retrying when
* an unrecoverable exception occurs; the others will always retry. We may want to extract this to
* specific class tests?
*/
@ParameterizedTest
@ValueSource(strings = {"recoverable", "abortable"})
void shouldAbortOnOtherException(final TestCase<?> test) {
Expand All @@ -90,6 +100,11 @@ void shouldAbortOnOtherException(final TestCase<?> test) {
.withCause(failure);
}

/**
* The {@link BackOffRetryStrategy} is excluded here because its usage of timers necessarily allow
* interleaving calls. If we decide to fix it, then we should refactor the strategy and add it as
* a test case here.
*/
@ParameterizedTest
@ValueSource(strings = {"endless", "recoverable", "abortable"})
void shouldNotInterleaveRetry(final TestCase<?> test) {
Expand Down Expand Up @@ -128,13 +143,52 @@ void shouldNotInterleaveRetry(final TestCase<?> test) {
assertThat(secondResult).hasValue(retryCounts);
}

/**
* The {@link BackOffRetryStrategy} is excluded here as it is already yielding implicitly by using
* timers for scheduling.
*/
@ParameterizedTest
@ValueSource(strings = {"endless", "recoverable", "abortable", "backoff"})
void shouldYieldThreadOnRetry() {}
@ValueSource(strings = {"endless", "recoverable", "abortable"})
void shouldYieldThreadOnRetry(final TestCase<?> test) {
// given - all actors share the same thread, force interleaving of their execution to ensure the
// retry strategy yields the thread in between retries
final var barrier = new LinkedTransferQueue<Boolean>();
final var future = new CompletableFuture<Void>();
final var secondActor = new ControllableActor();
schedulerRule.submitActor(test.actor);
schedulerRule.submitActor(secondActor);

// when
test.strategy.runWithRetry(
() -> {
// capture the result before to ensure we're looping
final boolean shouldRetry = !future.isDone();
barrier.offer(true);
return shouldRetry;
});
// toggle the retry strategy to stop retrying, letting workUntilDone finish
secondActor.run(
() -> {
// wait until the test actor ran at least once, guaranteeing it's currently looping
// and retrying
barrier.poll();
future.complete(null);
});
// wrap workUntilDone in a timeout condition, as otherwise the test hangs forever there if the
// actors are not yielding
Awaitility.await("workUntilDone should be finite if each actor yields the thread")
.atMost(Duration.ofSeconds(30))
.untilAsserted(schedulerRule::workUntilDone);

// then
assertThat(future)
.as("future is completed iff second actor can run")
.succeedsWithin(Duration.ofSeconds(2));
}

private record TestCase<T extends RetryStrategy>(ControllableActor actor, T strategy) {

// actually used by junit 5, see
// used to generate test cases in conjunction with @ValueSource
// https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-argument-conversion-implicit
@SuppressWarnings("unused")
static TestCase<?> of(final String type) {
Expand Down

0 comments on commit b2722ee

Please sign in to comment.