Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry RebalanceInProgressException in CommitRecovery.Default #1312

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions modules/core/src/main/scala/fs2/kafka/CommitRecovery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import cats.syntax.functor.*
import cats.Functor

import org.apache.kafka.clients.consumer.{OffsetAndMetadata, RetriableCommitFailedException}
import org.apache.kafka.common.errors.RebalanceInProgressException
import org.apache.kafka.common.TopicPartition

/**
Expand Down Expand Up @@ -51,18 +52,19 @@ object CommitRecovery {

/**
* The default [[CommitRecovery]] used in [[ConsumerSettings]] unless a different one has been
* specified. The default recovery strategy only retries `RetriableCommitFailedException`s. These
* exceptions are retried with a jittered exponential backoff, where the time in milliseconds
* before retrying is calculated using:
* specified. The default recovery strategy only retries `RetriableCommitFailedException`s and
* `RebalanceInProgressException`s. These exceptions are retried with a jittered exponential
* backoff, where the time in milliseconds before retrying is calculated using:
*
* {{{
* Random.nextDouble() * Math.min(10000, 10 * Math.pow(2, n))
* }}}
*
* where `n` is the retry attempt (first attempt is `n = 1`). This is done for up to 10 attempts,
* after which we change to retry using a fixed time of 10 seconds, for up to another 5 attempts.
* If at that point we are still faced with `RetriableCommitFailedException`, we give up and
* raise a [[CommitRecoveryException]] with the last such error experienced.<br><br>
* If at that point we are still faced with `RetriableCommitFailedException` or
* `RebalanceInProgressException`, we give up and raise a [[CommitRecoveryException]] with the
* last such error experienced.<br><br>
*
* The sum of time spent waiting between retries will always be less than 70 220 milliseconds, or
* ~70 seconds. Note that this does not include the time for attempting to commit offsets. Offset
Expand All @@ -87,7 +89,7 @@ object CommitRecovery {
jitter: Jitter[F]
): Throwable => F[Unit] = {
def retry(attempt: Int): Throwable => F[Unit] = {
case retriable: RetriableCommitFailedException =>
case retriable @ (_: RetriableCommitFailedException | _: RebalanceInProgressException) =>
val commitWithRecovery = commit.handleErrorWith(retry(attempt + 1))
if (attempt <= 10) backoff(attempt).flatMap(F.sleep) >> commitWithRecovery
else if (attempt <= 15) F.sleep(10.seconds) >> commitWithRecovery
Expand Down
75 changes: 40 additions & 35 deletions modules/core/src/test/scala/fs2/kafka/CommitRecoverySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,55 @@ import cats.effect.unsafe.implicits.global
import cats.syntax.functor.*

import org.apache.kafka.clients.consumer.{OffsetAndMetadata, RetriableCommitFailedException}
import org.apache.kafka.common.errors.RebalanceInProgressException
import org.apache.kafka.common.TopicPartition

final class CommitRecoverySpec extends BaseAsyncSpec {

describe("CommitRecovery#Default") {
it("should retry with jittered exponential backoff and fixed rate") {
val (result: Either[Throwable, Unit], sleeps: Chain[FiniteDuration]) =
Ref
.of[IO, Chain[FiniteDuration]](Chain.empty)
.flatMap { ref =>
implicit val temporal: Temporal[IO] = storeSleepsTemporal(ref)
val commit: IO[Unit] = IO.raiseError(new RetriableCommitFailedException("retriable"))
val offsets = Map(new TopicPartition("topic", 0) -> new OffsetAndMetadata(1))
val recovery = CommitRecovery.Default.recoverCommitWith(offsets, commit)
val attempted = commit.handleErrorWith(recovery).attempt
attempted.flatMap(ref.get.tupleLeft)
}
.unsafeRunSync()

assert {
result
.left
.toOption
.map(_.toString)
.contains {
"fs2.kafka.CommitRecoveryException: offset commit is still failing after 15 attempts for offsets: topic-0 -> 1; last exception was: org.apache.kafka.clients.consumer.RetriableCommitFailedException: retriable"
}
}

assert(sleeps.size == 15L)
List(new RetriableCommitFailedException("retriable"), new RebalanceInProgressException())
.foreach { ex =>
it(s"should retry $ex with jittered exponential backoff and fixed rate") {
val (result: Either[Throwable, Unit], sleeps: Chain[FiniteDuration]) =
Ref
.of[IO, Chain[FiniteDuration]](Chain.empty)
.flatMap { ref =>
implicit val temporal: Temporal[IO] = storeSleepsTemporal(ref)
val commit: IO[Unit] = IO.raiseError(ex)
val offsets = Map(new TopicPartition("topic", 0) -> new OffsetAndMetadata(1))
val recovery = CommitRecovery.Default.recoverCommitWith(offsets, commit)
val attempted = commit.handleErrorWith(recovery).attempt
attempted.flatMap(ref.get.tupleLeft)
}
.unsafeRunSync()

assert {
result
.left
.toOption
.map(_.toString)
.contains {
s"fs2.kafka.CommitRecoveryException: offset commit is still failing after 15 attempts for offsets: topic-0 -> 1; last exception was: $ex"
}
}

assert {
sleeps
.toList
.take(10)
.zipWithIndex
.forall { case (sleep, attempt) =>
val max = 10 * Math.pow(2, attempt.toDouble + 1)
0 <= sleep.toMillis && sleep.toMillis < max
assert(sleeps.size == 15L)

assert {
sleeps
.toList
.take(10)
.zipWithIndex
.forall { case (sleep, attempt) =>
val max = 10 * Math.pow(2, attempt.toDouble + 1)
0 <= sleep.toMillis && sleep.toMillis < max
}
}
}

assert(sleeps.toList.drop(10).forall(_.toMillis == 10000L))
}
assert(sleeps.toList.drop(10).forall(_.toMillis == 10000L))
}
}

it("should not recover non-retriable exceptions") {
val commit: IO[Unit] = IO.raiseError(new RuntimeException("commit"))
Expand Down