From 68de15eade12b40d7cd60180bcdffc6c70f123c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Abecasis?= Date: Sat, 14 Dec 2024 02:09:49 +0000 Subject: [PATCH 1/2] chore: single definition of pending commit effect Whether immediately committing offsets, or queueing the request in `State.pendingCommits`, the necessary effect is built at once. This avoids the need to repeat similar logic in disparate call sites, and makes visible in a single place how the request will be processed and logged. --- .../kafka/internal/KafkaConsumerActor.scala | 40 +++++++------------ .../scala/fs2/kafka/internal/LogEntry.scala | 10 ++--- 2 files changed, 18 insertions(+), 32 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 6a1f4fe02..41ac8dd94 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -91,17 +91,16 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( .handleErrorWith(e => F.delay(callback(Left(e)))) private[this] def commit(request: Request.Commit[F]): F[Unit] = - ref - .modify { state => - if (state.rebalancing) { - val newState = state.withPendingCommit(request) - (newState, Some(StoredPendingCommit(request, newState))) - } else (state, None) - } - .flatMap { - case Some(log) => logging.log(log) - case None => commitAsync(request.offsets, request.callback) - } + ref.flatModify { state => + val commitF = commitAsync(request.offsets, request.callback) + if (state.rebalancing) { + val newState = state.withPendingCommit( + commitF >> logging.log(CommittedPendingCommit(request)) + ) + (newState, logging.log(StoredPendingCommit(request, newState))) + } else + (state, commitF) + } private[this] def manualCommitSync(request: Request.ManualCommitSync[F]): F[Unit] = { val commit = @@ -388,10 +387,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( ( newState, Some( - HandlePollResult.PendingCommits( - commits = state.pendingCommits, - log = CommittedPendingCommits(state.pendingCommits, newState) - ) + HandlePollResult.PendingCommits(commits = state.pendingCommits) ) ) } else (state, None) @@ -448,15 +444,9 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( private[this] object HandlePollResult { - case class PendingCommits( - commits: Chain[Request.Commit[F]], - log: CommittedPendingCommits[F] - ) { + case class PendingCommits(commits: Chain[F[Unit]]) { - def commit: F[Unit] = - commits.traverse { commitRequest => - commitAsync(commitRequest.offsets, commitRequest.callback) - } >> logging.log(log) + def commit: F[Unit] = commits.sequence_ } @@ -506,7 +496,7 @@ private[kafka] object KafkaConsumerActor { final case class State[F[_], K, V]( fetches: Map[TopicPartition, Map[StreamId, FetchRequest[F, K, V]]], records: Map[TopicPartition, NonEmptyVector[CommittableConsumerRecord[F, K, V]]], - pendingCommits: Chain[Request.Commit[F]], + pendingCommits: Chain[F[Unit]], onRebalances: Chain[OnRebalance[F]], rebalancing: Boolean, subscribed: Boolean, @@ -562,7 +552,7 @@ private[kafka] object KafkaConsumerActor { def withoutRecords(partitions: Set[TopicPartition]): State[F, K, V] = copy(records = records.filterKeysStrict(!partitions.contains(_))) - def withPendingCommit(pendingCommit: Request.Commit[F]): State[F, K, V] = + def withPendingCommit(pendingCommit: F[Unit]): State[F, K, V] = copy(pendingCommits = pendingCommits.append(pendingCommit)) def withoutPendingCommits: State[F, K, V] = diff --git a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala index 782945a1d..adc26ed39 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala @@ -10,7 +10,7 @@ import java.util.regex.Pattern import scala.collection.immutable.SortedSet -import cats.data.{Chain, NonEmptyList, NonEmptySet, NonEmptyVector} +import cats.data.{NonEmptyList, NonEmptySet, NonEmptyVector} import cats.syntax.all.* import fs2.kafka.instances.* import fs2.kafka.internal.syntax.* @@ -211,15 +211,11 @@ private[kafka] object LogEntry { } - final case class CommittedPendingCommits[F[_]]( - pendingCommits: Chain[Request.Commit[F]], - state: State[F, ?, ?] - ) extends LogEntry { + final case class CommittedPendingCommit[F[_]](pendingCommit: Request.Commit[F]) extends LogEntry { override def level: LogLevel = Debug - override def message: String = - s"Committed pending commits [$pendingCommits]. Current state [$state]." + override def message: String = s"Committed pending commit [$pendingCommit]." } From 067d60ec2d18c7d33c9681a9f805d521a5c8f615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Abecasis?= Date: Sat, 14 Dec 2024 20:02:16 +0000 Subject: [PATCH 2/2] chore: ensure pending commits order Commits would be added to the chain of pending commits based only on the rebalancing state. Given that the rebalancing state is updated (via `ConsumerRebalanceListener.onPartitionsAssigned`) separate from when pending commits are processed (after `poll`), it could happen that commits emitted later would be processed before earlier pending ones. This updates the condition for queueing commits to take into account the prior existence of pending commits. In addition, the condition for processing pending commits in `poll` is also updated to disregard whether a rebalance operation was ongoing at the start of the poll. Instead, the existence of pending commits along with a non-`rebalancing` state are a sufficient trigger. This ensures that rebalance operations that might conclude within a single consumer poll do not leave behind any pending commits. At the moment, these possibilities are theoretical as commit operations are serialized via `KafkaConsumerActor`'s request queue, and don't happen concurrently to polls. That said, the cost of the fixes is trivial and being explicit about the conditions may prevent future bugs, if the surrounding context changes. --- .../fs2/kafka/internal/KafkaConsumerActor.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 41ac8dd94..c67ef462f 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -93,7 +93,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( private[this] def commit(request: Request.Commit[F]): F[Unit] = ref.flatModify { state => val commitF = commitAsync(request.offsets, request.callback) - if (state.rebalancing) { + if (state.rebalancing || state.pendingCommits.nonEmpty) { val newState = state.withPendingCommit( commitF >> logging.log(CommittedPendingCommit(request)) ) @@ -301,7 +301,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( } .flatMap(records) - def handlePoll(newRecords: ConsumerRecords, initialRebalancing: Boolean): F[Unit] = { + def handlePoll(newRecords: ConsumerRecords): F[Unit] = { def handleBatch( state: State[F, K, V], pendingCommits: Option[HandlePollResult.PendingCommits] @@ -380,9 +380,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( } def handlePendingCommits(state: State[F, K, V]) = { - val currentRebalancing = state.rebalancing - - if (initialRebalancing && !currentRebalancing && state.pendingCommits.nonEmpty) { + if (!state.rebalancing && state.pendingCommits.nonEmpty) { val newState = state.withoutPendingCommits ( newState, @@ -414,10 +412,9 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V]( ref .get .flatMap { state => - if (state.subscribed && state.streaming) { - val initialRebalancing = state.rebalancing - pollConsumer(state).flatMap(handlePoll(_, initialRebalancing)) - } else F.unit + if (state.subscribed && state.streaming) + pollConsumer(state).flatMap(handlePoll(_)) + else F.unit } }