From 4f02b9c6d5f38d1e1ff33dbe419dd36c5bbea9fb Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Sun, 13 Mar 2022 15:43:22 +0000 Subject: [PATCH 1/6] Remove deserialization from KafkaConsumerActor --- .../main/scala/fs2/kafka/KafkaConsumer.scala | 56 ++++++-- .../kafka/internal/KafkaConsumerActor.scala | 125 ++++++++++-------- .../scala/fs2/kafka/internal/LogEntry.scala | 36 ++--- .../scala/fs2/kafka/KafkaConsumerSpec.scala | 15 ++- 4 files changed, 143 insertions(+), 89 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 5f1895d85..4ba522f12 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -91,10 +91,10 @@ object KafkaConsumer { } }(_.cancel) - private def startConsumerActor[F[_], K, V]( - requests: QueueSource[F, Request[F, K, V]], + private def startConsumerActor[F[_]]( + requests: QueueSource[F, Request[F]], polls: QueueSource[F, Request.Poll[F]], - actor: KafkaConsumerActor[F, K, V] + actor: KafkaConsumerActor[F] )( implicit F: Async[F] ): Resource[F, FakeFiber[F]] = @@ -115,8 +115,10 @@ object KafkaConsumer { } private def createKafkaConsumer[F[_], K, V]( - requests: QueueSink[F, Request[F, K, V]], + requests: QueueSink[F, Request[F]], settings: ConsumerSettings[F, K, V], + keyDes: Deserializer[F, K], + valueDes: Deserializer[F, V], actor: FakeFiber[F], polls: FakeFiber[F], streamIdRef: Ref[F, StreamId], @@ -134,7 +136,7 @@ object KafkaConsumer { Queue.bounded(settings.maxPrefetchBatches - 1) type PartitionRequest = - (Chunk[CommittableConsumerRecord[F, K, V]], FetchCompletedReason) + (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) type PartitionsMap = Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]] type PartitionsMapQueue = Queue[F, Option[PartitionsMap]] @@ -173,7 +175,37 @@ object KafkaConsumer { stopReqs.complete(()).void case Right((chunk, reason)) => - val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty) + val c = chunk.traverse[F, CommittableConsumerRecord[F, K, V]] { ccr => + val cr: F[ConsumerRecord[K, V]] = ccr.record.bitraverse( + key => + keyDes + .deserialize(ccr.offset.topicPartition.topic, ccr.record.headers, key), + value => + valueDes + .deserialize(ccr.offset.topicPartition.topic, ccr.record.headers, value) + ) + + cr.map( + cr => + CommittableConsumerRecord[F, K, V]( + cr, + CommittableOffset( + ccr.offset.topicPartition, + new OffsetAndMetadata( + ccr.offset.offsetAndMetadata.offset(), + ccr.offset.offsetAndMetadata.leaderEpoch(), + settings.recordMetadata(cr) + ), + ccr.offset.consumerGroupId, + ccr.offset.commitOffsets + ) + ) + ) + } + + val enqueueChunk = c.flatMap { chunk => + chunks.offer(Some(chunk)).unlessA(chunk.isEmpty) + } val completeRevoked = stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked) @@ -370,7 +402,7 @@ object KafkaConsumer { } private[this] def request[A]( - request: (Either[Throwable, A] => F[Unit]) => Request[F, K, V] + request: (Either[Throwable, A] => F[Unit]) => Request[F] ): F[A] = Deferred[F, Either[Throwable, A]].flatMap { deferred => requests.offer(request(deferred.complete(_).void)) >> @@ -597,9 +629,9 @@ object KafkaConsumer { id <- Resource.eval(F.delay(new Object().hashCode)) jitter <- Resource.eval(Jitter.default[F]) logging <- Resource.eval(Logging.default[F](id)) - requests <- Resource.eval(Queue.unbounded[F, Request[F, K, V]]) + requests <- Resource.eval(Queue.unbounded[F, Request[F]]) polls <- Resource.eval(Queue.bounded[F, Request.Poll[F]](1)) - ref <- Resource.eval(Ref.of[F, State[F, K, V]](State.empty)) + ref <- Resource.eval(Ref.of[F, State[F]](State.empty)) streamId <- Resource.eval(Ref.of[F, StreamId](0)) dispatcher <- Dispatcher[F] stopConsumingDeferred <- Resource.eval(Deferred[F, Unit]) @@ -609,10 +641,8 @@ object KafkaConsumer { implicit val logging0: Logging[F] = logging implicit val dispatcher0: Dispatcher[F] = dispatcher - new KafkaConsumerActor( + new KafkaConsumerActor[F]( settings = settings, - keyDeserializer = keyDeserializer, - valueDeserializer = valueDeserializer, ref = ref, requests = requests, withConsumer = withConsumer @@ -623,6 +653,8 @@ object KafkaConsumer { } yield createKafkaConsumer( requests, settings, + keyDeserializer, + valueDeserializer, actor, polls, streamId, diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 2dd3770cf..758607ab1 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -46,12 +46,10 @@ import scala.collection.immutable.SortedSet * backpressure, as long as `Fetch` requests are only issued when there * is more demand. */ -private[kafka] final class KafkaConsumerActor[F[_], K, V]( - settings: ConsumerSettings[F, K, V], - keyDeserializer: Deserializer[F, K], - valueDeserializer: Deserializer[F, V], - ref: Ref[F, State[F, K, V]], - requests: Queue[F, Request[F, K, V]], +private[kafka] final class KafkaConsumerActor[F[_]]( + settings: ConsumerSettings[F, _, _], + ref: Ref[F, State[F]], + requests: Queue[F, Request[F]], withConsumer: WithConsumer[F] )( implicit F: Async[F], @@ -62,7 +60,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( import logging._ private[this] type ConsumerRecords = - Map[TopicPartition, NonEmptyVector[CommittableConsumerRecord[F, K, V]]] + Map[TopicPartition, NonEmptyVector[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]]] private[this] val consumerGroupId: Option[String] = settings.properties.get(ConsumerConfig.GROUP_ID_CONFIG) @@ -164,7 +162,9 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( private[this] def fetch( partition: TopicPartition, streamId: StreamId, - callback: ((Chunk[CommittableConsumerRecord[F, K, V]], FetchCompletedReason)) => F[Unit] + callback: ( + (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) + ) => F[Unit] ): F[Unit] = { val assigned = withConsumer.blocking { _.assignment.contains(partition) } @@ -259,7 +259,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( } private[this] def revoked(revoked: SortedSet[TopicPartition]): F[Unit] = { - def withState[A] = StateT.apply[Id, State[F, K, V], A](_) + def withState[A] = StateT.apply[Id, State[F], A](_) def completeWithRecords(withRecords: Set[TopicPartition]) = withState { st => if (withRecords.nonEmpty) { @@ -379,17 +379,16 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( } private[this] def committableConsumerRecord( - record: ConsumerRecord[K, V], + record: ConsumerRecord[Array[Byte], Array[Byte]], partition: TopicPartition - ): CommittableConsumerRecord[F, K, V] = + ): CommittableConsumerRecord[F, Array[Byte], Array[Byte]] = CommittableConsumerRecord( record = record, offset = CommittableOffset( topicPartition = partition, consumerGroupId = consumerGroupId, offsetAndMetadata = new OffsetAndMetadata( - record.offset + 1L, - settings.recordMetadata(record) + record.offset + 1L ), commit = offsetCommit ) @@ -402,7 +401,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( .fromVectorUnsafe(batch.records(partition).toVector) .traverse { record => ConsumerRecord - .fromJava(record, keyDeserializer, valueDeserializer) + .fromJava(record, Deserializer.identity, Deserializer.identity) .map(committableConsumerRecord(_, partition)) } .map((partition, _)) @@ -413,7 +412,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( settings.pollTimeout.asJava private[this] val poll: F[Unit] = { - def pollConsumer(state: State[F, K, V]): F[ConsumerRecords] = + def pollConsumer(state: State[F]): F[ConsumerRecords] = withConsumer .blocking { consumer => val assigned = consumer.assignment.toSet @@ -435,7 +434,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( def handlePoll(newRecords: ConsumerRecords, initialRebalancing: Boolean): F[Unit] = { def handleBatch( - state: State[F, K, V], + state: State[F], pendingCommits: Option[HandlePollResult.PendingCommits] ) = if (state.fetches.isEmpty) { @@ -509,7 +508,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( } } - def handlePendingCommits(state: State[F, K, V]) = { + def handlePendingCommits(state: State[F]) = { val currentRebalancing = state.rebalancing if (initialRebalancing && !currentRebalancing && state.pendingCommits.nonEmpty) { @@ -553,7 +552,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( } } - def handle(request: Request[F, K, V]): F[Unit] = + def handle(request: Request[F]): F[Unit] = request match { case Request.Assignment(callback, onRebalance) => assignment(callback, onRebalance) case Request.Poll() => poll @@ -614,13 +613,19 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( } private[kafka] object KafkaConsumerActor { - final case class FetchRequest[F[_], K, V]( - callback: ((Chunk[CommittableConsumerRecord[F, K, V]], FetchCompletedReason)) => F[Unit] + final case class FetchRequest[F[_]]( + callback: ( + (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) + ) => F[Unit] ) { - def completeRevoked(chunk: Chunk[CommittableConsumerRecord[F, K, V]]): F[Unit] = + def completeRevoked( + chunk: Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]] + ): F[Unit] = callback((chunk, FetchCompletedReason.TopicPartitionRevoked)) - def completeRecords(chunk: Chunk[CommittableConsumerRecord[F, K, V]]): F[Unit] = + def completeRecords( + chunk: Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]] + ): F[Unit] = callback((chunk, FetchCompletedReason.FetchedRecords)) override def toString: String = @@ -629,16 +634,18 @@ private[kafka] object KafkaConsumerActor { type StreamId = Int - final case class State[F[_], K, V]( - fetches: Map[TopicPartition, Map[StreamId, FetchRequest[F, K, V]]], - records: Map[TopicPartition, NonEmptyVector[CommittableConsumerRecord[F, K, V]]], + final case class State[F[_]]( + fetches: Map[TopicPartition, Map[StreamId, FetchRequest[F]]], + records: Map[TopicPartition, NonEmptyVector[ + CommittableConsumerRecord[F, Array[Byte], Array[Byte]] + ]], pendingCommits: Chain[Request.Commit[F]], onRebalances: Chain[OnRebalance[F]], rebalancing: Boolean, subscribed: Boolean, streaming: Boolean ) { - def withOnRebalance(onRebalance: OnRebalance[F]): State[F, K, V] = + def withOnRebalance(onRebalance: OnRebalance[F]): State[F] = copy(onRebalances = onRebalances append onRebalance) /** @@ -647,18 +654,20 @@ private[kafka] object KafkaConsumerActor { def withFetch( partition: TopicPartition, streamId: StreamId, - callback: ((Chunk[CommittableConsumerRecord[F, K, V]], FetchCompletedReason)) => F[Unit] - ): (State[F, K, V], List[FetchRequest[F, K, V]]) = { + callback: ( + (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) + ) => F[Unit] + ): (State[F], List[FetchRequest[F]]) = { val newFetchRequest = FetchRequest(callback) - val oldPartitionFetches: Map[StreamId, FetchRequest[F, K, V]] = + val oldPartitionFetches: Map[StreamId, FetchRequest[F]] = fetches.getOrElse(partition, Map.empty) - val newFetches: Map[TopicPartition, Map[StreamId, FetchRequest[F, K, V]]] = + val newFetches: Map[TopicPartition, Map[StreamId, FetchRequest[F]]] = fetches.updated(partition, oldPartitionFetches.updated(streamId, newFetchRequest)) - val fetchesToRevoke: List[FetchRequest[F, K, V]] = + val fetchesToRevoke: List[FetchRequest[F]] = oldPartitionFetches.get(streamId).toList ( @@ -667,41 +676,43 @@ private[kafka] object KafkaConsumerActor { ) } - def withoutFetches(partitions: Set[TopicPartition]): State[F, K, V] = + def withoutFetches(partitions: Set[TopicPartition]): State[F] = copy( fetches = fetches.filterKeysStrict(!partitions.contains(_)) ) def withRecords( - records: Map[TopicPartition, NonEmptyVector[CommittableConsumerRecord[F, K, V]]] - ): State[F, K, V] = + records: Map[TopicPartition, NonEmptyVector[ + CommittableConsumerRecord[F, Array[Byte], Array[Byte]] + ]] + ): State[F] = copy(records = this.records combine records) - def withoutFetchesAndRecords(partitions: Set[TopicPartition]): State[F, K, V] = + def withoutFetchesAndRecords(partitions: Set[TopicPartition]): State[F] = copy( fetches = fetches.filterKeysStrict(!partitions.contains(_)), records = records.filterKeysStrict(!partitions.contains(_)) ) - def withoutRecords(partitions: Set[TopicPartition]): State[F, K, V] = + def withoutRecords(partitions: Set[TopicPartition]): State[F] = copy(records = records.filterKeysStrict(!partitions.contains(_))) - def withPendingCommit(pendingCommit: Request.Commit[F]): State[F, K, V] = + def withPendingCommit(pendingCommit: Request.Commit[F]): State[F] = copy(pendingCommits = pendingCommits append pendingCommit) - def withoutPendingCommits: State[F, K, V] = + def withoutPendingCommits: State[F] = if (pendingCommits.isEmpty) this else copy(pendingCommits = Chain.empty) - def withRebalancing(rebalancing: Boolean): State[F, K, V] = + def withRebalancing(rebalancing: Boolean): State[F] = if (this.rebalancing == rebalancing) this else copy(rebalancing = rebalancing) - def asSubscribed: State[F, K, V] = + def asSubscribed: State[F] = if (subscribed) this else copy(subscribed = true) - def asUnsubscribed: State[F, K, V] = + def asUnsubscribed: State[F] = if (!subscribed) this else copy(subscribed = false) - def asStreaming: State[F, K, V] = + def asStreaming: State[F] = if (streaming) this else copy(streaming = true) override def toString: String = { @@ -720,7 +731,7 @@ private[kafka] object KafkaConsumerActor { } object State { - def empty[F[_], K, V]: State[F, K, V] = + def empty[F[_]]: State[F] = State( fetches = Map.empty, records = Map.empty, @@ -753,15 +764,15 @@ private[kafka] object KafkaConsumerActor { "OnRebalance$" + System.identityHashCode(this) } - sealed abstract class Request[F[_], -K, -V] + sealed abstract class Request[F[_]] object Request { final case class Assignment[F[_]]( callback: Either[Throwable, SortedSet[TopicPartition]] => F[Unit], onRebalance: Option[OnRebalance[F]] - ) extends Request[F, Any, Any] + ) extends Request[F] - final case class Poll[F[_]]() extends Request[F, Any, Any] + final case class Poll[F[_]]() extends Request[F] private[this] val pollInstance: Poll[Nothing] = Poll[Nothing]() @@ -772,41 +783,43 @@ private[kafka] object KafkaConsumerActor { final case class SubscribeTopics[F[_]]( topics: NonEmptyList[String], callback: Either[Throwable, Unit] => F[Unit] - ) extends Request[F, Any, Any] + ) extends Request[F] final case class Assign[F[_]]( topicPartitions: NonEmptySet[TopicPartition], callback: Either[Throwable, Unit] => F[Unit] - ) extends Request[F, Any, Any] + ) extends Request[F] final case class SubscribePattern[F[_]]( pattern: Pattern, callback: Either[Throwable, Unit] => F[Unit] - ) extends Request[F, Any, Any] + ) extends Request[F] final case class Unsubscribe[F[_]]( callback: Either[Throwable, Unit] => F[Unit] - ) extends Request[F, Any, Any] + ) extends Request[F] - final case class Fetch[F[_], -K, -V]( + final case class Fetch[F[_]]( partition: TopicPartition, streamId: StreamId, - callback: ((Chunk[CommittableConsumerRecord[F, K, V]], FetchCompletedReason)) => F[Unit] - ) extends Request[F, K, V] + callback: ( + (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) + ) => F[Unit] + ) extends Request[F] final case class Commit[F[_]]( offsets: Map[TopicPartition, OffsetAndMetadata], callback: Either[Throwable, Unit] => Unit - ) extends Request[F, Any, Any] + ) extends Request[F] final case class ManualCommitAsync[F[_]]( offsets: Map[TopicPartition, OffsetAndMetadata], callback: Either[Throwable, Unit] => F[Unit] - ) extends Request[F, Any, Any] + ) extends Request[F] final case class ManualCommitSync[F[_]]( offsets: Map[TopicPartition, OffsetAndMetadata], callback: Either[Throwable, Unit] => F[Unit] - ) extends Request[F, Any, Any] + ) extends Request[F] } } diff --git a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala index c0b41726d..9303d6e23 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala @@ -27,7 +27,7 @@ private[kafka] sealed abstract class LogEntry { private[kafka] object LogEntry { final case class SubscribedTopics[F[_]]( topics: NonEmptyList[String], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -36,7 +36,7 @@ private[kafka] object LogEntry { final case class ManuallyAssignedPartitions[F[_]]( partitions: NonEmptySet[TopicPartition], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -45,7 +45,7 @@ private[kafka] object LogEntry { final case class SubscribedPattern[F[_]]( pattern: Pattern, - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -53,17 +53,19 @@ private[kafka] object LogEntry { } final case class Unsubscribed[F[_]]( - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = s"Consumer unsubscribed from all partitions. Current state [$state]." } - final case class StoredFetch[F[_], K, V]( + final case class StoredFetch[F[_]]( partition: TopicPartition, - callback: ((Chunk[CommittableConsumerRecord[F, K, V]], FetchCompletedReason)) => F[Unit], - state: State[F, K, V] + callback: ( + (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) + ) => F[Unit], + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -72,7 +74,7 @@ private[kafka] object LogEntry { final case class StoredOnRebalance[F[_]]( onRebalance: OnRebalance[F], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -81,7 +83,7 @@ private[kafka] object LogEntry { final case class AssignedPartitions[F[_]]( partitions: SortedSet[TopicPartition], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -90,7 +92,7 @@ private[kafka] object LogEntry { final case class RevokedPartitions[F[_]]( partitions: SortedSet[TopicPartition], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -99,7 +101,7 @@ private[kafka] object LogEntry { final case class CompletedFetchesWithRecords[F[_]]( records: Records[F], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -108,7 +110,7 @@ private[kafka] object LogEntry { final case class RevokedFetchesWithRecords[F[_]]( records: Records[F], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -117,7 +119,7 @@ private[kafka] object LogEntry { final case class RevokedFetchesWithoutRecords[F[_]]( partitions: Set[TopicPartition], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -126,7 +128,7 @@ private[kafka] object LogEntry { final case class RemovedRevokedRecords[F[_]]( records: Records[F], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -135,7 +137,7 @@ private[kafka] object LogEntry { final case class StoredRecords[F[_]]( records: Records[F], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -153,7 +155,7 @@ private[kafka] object LogEntry { final case class StoredPendingCommit[F[_]]( commit: Request.Commit[F], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = @@ -162,7 +164,7 @@ private[kafka] object LogEntry { final case class CommittedPendingCommits[F[_]]( pendingCommits: Chain[Request.Commit[F]], - state: State[F, _, _] + state: State[F] ) extends LogEntry { override def level: LogLevel = Debug override def message: String = diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala index d1cbda793..a3c0ff1f7 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala @@ -9,7 +9,11 @@ import cats.effect.unsafe.implicits.global import fs2.Stream import fs2.concurrent.SignallingRef import fs2.kafka.internal.converters.collection._ -import org.apache.kafka.clients.consumer.{ConsumerConfig, CooperativeStickyAssignor, NoOffsetForPartitionException} +import org.apache.kafka.clients.consumer.{ + ConsumerConfig, + CooperativeStickyAssignor, + NoOffsetForPartitionException +} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.TimeoutException import org.scalatest.Assertion @@ -559,7 +563,9 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { .stream( consumerSettings[IO] .withProperties( - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[ + CooperativeStickyAssignor + ].getName ) ) .subscribeTo(topic) @@ -801,9 +807,10 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { .stream( consumerSettings[IO] .withProperties( - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[ + CooperativeStickyAssignor + ].getName ) - ) .subscribeTo(topic) .evalMap { consumer => From 8a59d8eb1113fe675c53cd515c14cdc54c21d0d9 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Sun, 13 Mar 2022 16:30:00 +0000 Subject: [PATCH 2/6] Simplification --- .../main/scala/fs2/kafka/KafkaConsumer.scala | 82 +++++++++------ .../kafka/internal/KafkaConsumerActor.scala | 99 ++++++++----------- .../scala/fs2/kafka/internal/LogEntry.scala | 15 ++- 3 files changed, 105 insertions(+), 91 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 4ba522f12..538cf6ac0 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -28,6 +28,7 @@ import scala.annotation.nowarn import scala.collection.immutable.SortedSet import scala.concurrent.duration.FiniteDuration import scala.util.matching.Regex +import org.apache.kafka.clients.consumer.ConsumerConfig /** * [[KafkaConsumer]] represents a consumer of Kafka records, with the @@ -136,7 +137,11 @@ object KafkaConsumer { Queue.bounded(settings.maxPrefetchBatches - 1) type PartitionRequest = - (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) + ( + Chunk[KafkaByteConsumerRecord], + Map[TopicPartition, OffsetAndMetadata] => F[Unit], + FetchCompletedReason + ) type PartitionsMap = Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]] type PartitionsMapQueue = Queue[F, Option[PartitionsMap]] @@ -163,6 +168,24 @@ object KafkaConsumer { .void stopReqs <- Deferred[F, Unit] } yield Stream.eval { + def committableConsumerRecord( + record: ConsumerRecord[K, V], + offsetCommit: Map[TopicPartition, OffsetAndMetadata] => F[Unit], + partition: TopicPartition + ): CommittableConsumerRecord[F, K, V] = + CommittableConsumerRecord( + record = record, + offset = CommittableOffset( + topicPartition = partition, + consumerGroupId = settings.properties.get(ConsumerConfig.GROUP_ID_CONFIG), + offsetAndMetadata = new OffsetAndMetadata( + record.offset + 1L, + settings.recordMetadata(record) + ), + commit = offsetCommit + ) + ) + def fetchPartition(deferred: Deferred[F, PartitionRequest]): F[Unit] = { val request = Request.Fetch( partition, @@ -174,33 +197,36 @@ object KafkaConsumer { case Left(()) => stopReqs.complete(()).void - case Right((chunk, reason)) => - val c = chunk.traverse[F, CommittableConsumerRecord[F, K, V]] { ccr => - val cr: F[ConsumerRecord[K, V]] = ccr.record.bitraverse( - key => - keyDes - .deserialize(ccr.offset.topicPartition.topic, ccr.record.headers, key), - value => - valueDes - .deserialize(ccr.offset.topicPartition.topic, ccr.record.headers, value) - ) - - cr.map( - cr => - CommittableConsumerRecord[F, K, V]( - cr, - CommittableOffset( - ccr.offset.topicPartition, - new OffsetAndMetadata( - ccr.offset.offsetAndMetadata.offset(), - ccr.offset.offsetAndMetadata.leaderEpoch(), - settings.recordMetadata(cr) - ), - ccr.offset.consumerGroupId, - ccr.offset.commitOffsets - ) - ) - ) + case Right((chunk, offsetCommit, reason)) => + val c = chunk.traverse[F, CommittableConsumerRecord[F, K, V]] { rec => + ConsumerRecord + .fromJava[F, K, V](rec, keyDes, valueDes) + .map(committableConsumerRecord(_, offsetCommit, partition)) + // val cr: F[ConsumerRecord[K, V]] = ccr.record.bitraverse( + // key => + // keyDes + // .deserialize(ccr.offset.topicPartition.topic, ccr.record.headers, key), + // value => + // valueDes + // .deserialize(ccr.offset.topicPartition.topic, ccr.record.headers, value) + // ) + + // cr.map( + // cr => + // CommittableConsumerRecord[F, K, V]( + // cr, + // CommittableOffset( + // ccr.offset.topicPartition, + // new OffsetAndMetadata( + // ccr.offset.offsetAndMetadata.offset(), + // ccr.offset.offsetAndMetadata.leaderEpoch(), + // settings.recordMetadata(cr) + // ), + // ccr.offset.consumerGroupId, + // ccr.offset.commitOffsets + // ) + // ) + // ) } val enqueueChunk = c.flatMap { chunk => diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 758607ab1..62ba76cb6 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -22,11 +22,7 @@ import java.time.Duration import java.util import java.util.regex.Pattern -import org.apache.kafka.clients.consumer.{ - ConsumerConfig, - ConsumerRebalanceListener, - OffsetAndMetadata -} +import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition import scala.collection.immutable.SortedSet @@ -60,10 +56,7 @@ private[kafka] final class KafkaConsumerActor[F[_]]( import logging._ private[this] type ConsumerRecords = - Map[TopicPartition, NonEmptyVector[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]]] - - private[this] val consumerGroupId: Option[String] = - settings.properties.get(ConsumerConfig.GROUP_ID_CONFIG) + Map[TopicPartition, NonEmptyVector[KafkaByteConsumerRecord]] private[this] val consumerRebalanceListener: ConsumerRebalanceListener = new ConsumerRebalanceListener { @@ -163,7 +156,11 @@ private[kafka] final class KafkaConsumerActor[F[_]]( partition: TopicPartition, streamId: StreamId, callback: ( - (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) + ( + Chunk[KafkaByteConsumerRecord], + Map[TopicPartition, OffsetAndMetadata] => F[Unit], + FetchCompletedReason + ) ) => F[Unit] ): F[Unit] = { val assigned = @@ -180,13 +177,13 @@ private[kafka] final class KafkaConsumerActor[F[_]]( case (newState, oldFetches) => log(StoredFetch(partition, callback, newState)) >> oldFetches.traverse_ { fetch => - fetch.completeRevoked(Chunk.empty) >> + fetch.completeRevoked(Chunk.empty, offsetCommit) >> log(RevokedPreviousFetch(partition, streamId)) } } def completeRevoked = - callback((Chunk.empty, FetchCompletedReason.TopicPartitionRevoked)) + callback((Chunk.empty, offsetCommit, FetchCompletedReason.TopicPartitionRevoked)) assigned.ifM(storeFetch, completeRevoked) } @@ -268,7 +265,7 @@ private[kafka] final class KafkaConsumerActor[F[_]]( val action = st.fetches.filterKeysStrictList(withRecords).traverse { case (partition, partitionFetches) => val records = Chunk.vector(st.records(partition).toVector) - partitionFetches.values.toList.traverse(_.completeRevoked(records)) + partitionFetches.values.toList.traverse(_.completeRevoked(records, offsetCommit)) } >> logging.log( RevokedFetchesWithRecords(st.records.filterKeysStrict(withRecords), newState) ) @@ -283,7 +280,7 @@ private[kafka] final class KafkaConsumerActor[F[_]]( val action = st.fetches .filterKeysStrictValuesList(withoutRecords) - .traverse(_.values.toList.traverse(_.completeRevoked(Chunk.empty))) >> + .traverse(_.values.toList.traverse(_.completeRevoked(Chunk.empty, offsetCommit))) >> logging.log(RevokedFetchesWithoutRecords(withoutRecords, newState)) (newState, action) @@ -378,35 +375,11 @@ private[kafka] final class KafkaConsumerActor[F[_]]( } } - private[this] def committableConsumerRecord( - record: ConsumerRecord[Array[Byte], Array[Byte]], - partition: TopicPartition - ): CommittableConsumerRecord[F, Array[Byte], Array[Byte]] = - CommittableConsumerRecord( - record = record, - offset = CommittableOffset( - topicPartition = partition, - consumerGroupId = consumerGroupId, - offsetAndMetadata = new OffsetAndMetadata( - record.offset + 1L - ), - commit = offsetCommit - ) - ) - - private[this] def records(batch: KafkaByteConsumerRecords): F[ConsumerRecords] = - batch.partitions.toVector - .traverse { partition => - NonEmptyVector - .fromVectorUnsafe(batch.records(partition).toVector) - .traverse { record => - ConsumerRecord - .fromJava(record, Deserializer.identity, Deserializer.identity) - .map(committableConsumerRecord(_, partition)) - } - .map((partition, _)) - } - .map(_.toMap) + private[this] def records(batch: KafkaByteConsumerRecords): ConsumerRecords = + batch.partitions.toVector.map { partition => + partition -> NonEmptyVector + .fromVectorUnsafe(batch.records(partition).toVector) + }.toMap private[this] val pollTimeout: Duration = settings.pollTimeout.asJava @@ -430,7 +403,7 @@ private[kafka] final class KafkaConsumerActor[F[_]]( consumer.poll(pollTimeout) } - .flatMap(records) + .map(records) def handlePoll(newRecords: ConsumerRecords, initialRebalancing: Boolean): F[Unit] = { def handleBatch( @@ -457,7 +430,7 @@ private[kafka] final class KafkaConsumerActor[F[_]]( state.fetches.filterKeysStrictList(canBeCompleted).traverse_ { case (partition, fetches) => val records = Chunk.vector(allRecords(partition).toVector) - fetches.values.toList.traverse_(_.completeRecords(records)) + fetches.values.toList.traverse_(_.completeRecords(records, offsetCommit)) } (canBeCompleted.nonEmpty, canBeStored.nonEmpty) match { @@ -615,18 +588,24 @@ private[kafka] final class KafkaConsumerActor[F[_]]( private[kafka] object KafkaConsumerActor { final case class FetchRequest[F[_]]( callback: ( - (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) + ( + Chunk[KafkaByteConsumerRecord], + Map[TopicPartition, OffsetAndMetadata] => F[Unit], + FetchCompletedReason + ) ) => F[Unit] ) { def completeRevoked( - chunk: Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]] + chunk: Chunk[KafkaByteConsumerRecord], + offsetCommit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] ): F[Unit] = - callback((chunk, FetchCompletedReason.TopicPartitionRevoked)) + callback((chunk, offsetCommit, FetchCompletedReason.TopicPartitionRevoked)) def completeRecords( - chunk: Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]] + chunk: Chunk[KafkaByteConsumerRecord], + offsetCommit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] ): F[Unit] = - callback((chunk, FetchCompletedReason.FetchedRecords)) + callback((chunk, offsetCommit, FetchCompletedReason.FetchedRecords)) override def toString: String = "FetchRequest$" + System.identityHashCode(this) @@ -636,9 +615,7 @@ private[kafka] object KafkaConsumerActor { final case class State[F[_]]( fetches: Map[TopicPartition, Map[StreamId, FetchRequest[F]]], - records: Map[TopicPartition, NonEmptyVector[ - CommittableConsumerRecord[F, Array[Byte], Array[Byte]] - ]], + records: Map[TopicPartition, NonEmptyVector[KafkaByteConsumerRecord]], pendingCommits: Chain[Request.Commit[F]], onRebalances: Chain[OnRebalance[F]], rebalancing: Boolean, @@ -655,7 +632,11 @@ private[kafka] object KafkaConsumerActor { partition: TopicPartition, streamId: StreamId, callback: ( - (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) + ( + Chunk[KafkaByteConsumerRecord], + Map[TopicPartition, OffsetAndMetadata] => F[Unit], + FetchCompletedReason + ) ) => F[Unit] ): (State[F], List[FetchRequest[F]]) = { val newFetchRequest = @@ -682,9 +663,7 @@ private[kafka] object KafkaConsumerActor { ) def withRecords( - records: Map[TopicPartition, NonEmptyVector[ - CommittableConsumerRecord[F, Array[Byte], Array[Byte]] - ]] + records: Map[TopicPartition, NonEmptyVector[KafkaByteConsumerRecord]] ): State[F] = copy(records = this.records combine records) @@ -803,7 +782,11 @@ private[kafka] object KafkaConsumerActor { partition: TopicPartition, streamId: StreamId, callback: ( - (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) + ( + Chunk[KafkaByteConsumerRecord], + Map[TopicPartition, OffsetAndMetadata] => F[Unit], + FetchCompletedReason + ) ) => F[Unit] ) extends Request[F] diff --git a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala index 9303d6e23..4ac085152 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala @@ -9,7 +9,7 @@ package fs2.kafka.internal import cats.data.{Chain, NonEmptyList, NonEmptySet, NonEmptyVector} import cats.syntax.all._ import fs2.Chunk -import fs2.kafka.CommittableConsumerRecord +import fs2.kafka._ import fs2.kafka.instances._ import fs2.kafka.internal.KafkaConsumerActor._ import fs2.kafka.internal.LogLevel._ @@ -17,6 +17,7 @@ import fs2.kafka.internal.syntax._ import java.util.regex.Pattern import org.apache.kafka.common.TopicPartition import scala.collection.immutable.SortedSet +import org.apache.kafka.clients.consumer.OffsetAndMetadata private[kafka] sealed abstract class LogEntry { def level: LogLevel @@ -63,7 +64,11 @@ private[kafka] object LogEntry { final case class StoredFetch[F[_]]( partition: TopicPartition, callback: ( - (Chunk[CommittableConsumerRecord[F, Array[Byte], Array[Byte]]], FetchCompletedReason) + ( + Chunk[KafkaByteConsumerRecord], + Map[TopicPartition, OffsetAndMetadata] => F[Unit], + FetchCompletedReason + ) ) => F[Unit], state: State[F] ) extends LogEntry { @@ -180,12 +185,12 @@ private[kafka] object LogEntry { case (append, (tp, ms)) => append(tp.show) append(" -> { first: ") - append(ms.head.offset.offsetAndMetadata.show) + append(ms.head.offset.show) append(", last: ") - append(ms.last.offset.offsetAndMetadata.show) + append(ms.last.offset.show) append(" }") }("", ", ", "") private[this] type Records[F[_]] = - Map[TopicPartition, NonEmptyVector[CommittableConsumerRecord[F, _, _]]] + Map[TopicPartition, NonEmptyVector[KafkaByteConsumerRecord]] } From 75ee2d48b229811b3e2d69a06e56c8ea06085c90 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Sun, 13 Mar 2022 16:31:28 +0000 Subject: [PATCH 3/6] remove commented --- .../main/scala/fs2/kafka/KafkaConsumer.scala | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 538cf6ac0..0f189ba37 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -202,31 +202,6 @@ object KafkaConsumer { ConsumerRecord .fromJava[F, K, V](rec, keyDes, valueDes) .map(committableConsumerRecord(_, offsetCommit, partition)) - // val cr: F[ConsumerRecord[K, V]] = ccr.record.bitraverse( - // key => - // keyDes - // .deserialize(ccr.offset.topicPartition.topic, ccr.record.headers, key), - // value => - // valueDes - // .deserialize(ccr.offset.topicPartition.topic, ccr.record.headers, value) - // ) - - // cr.map( - // cr => - // CommittableConsumerRecord[F, K, V]( - // cr, - // CommittableOffset( - // ccr.offset.topicPartition, - // new OffsetAndMetadata( - // ccr.offset.offsetAndMetadata.offset(), - // ccr.offset.offsetAndMetadata.leaderEpoch(), - // settings.recordMetadata(cr) - // ), - // ccr.offset.consumerGroupId, - // ccr.offset.commitOffsets - // ) - // ) - // ) } val enqueueChunk = c.flatMap { chunk => From d6a4f3dc075af1bf995b35ecd6b1b1488b8aea0c Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Mon, 14 Mar 2022 09:14:51 +0000 Subject: [PATCH 4/6] Don't pass offsetCommit around everywhere --- .../main/scala/fs2/kafka/KafkaConsumer.scala | 23 ++++------ .../kafka/internal/KafkaConsumerActor.scala | 42 ++++++------------- .../scala/fs2/kafka/internal/LogEntry.scala | 9 +--- 3 files changed, 22 insertions(+), 52 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 0f189ba37..78013f186 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -120,8 +120,8 @@ object KafkaConsumer { settings: ConsumerSettings[F, K, V], keyDes: Deserializer[F, K], valueDes: Deserializer[F, V], - actor: FakeFiber[F], - polls: FakeFiber[F], + actor: KafkaConsumerActor[F], + fiber: FakeFiber[F], streamIdRef: Ref[F, StreamId], id: Int, withConsumer: WithConsumer[F], @@ -129,19 +129,13 @@ object KafkaConsumer { )(implicit F: Async[F]): KafkaConsumer[F, K, V] = new KafkaConsumer[F, K, V] { - private val fiber: FakeFiber[F] = actor.combine(polls) - override def partitionsMapStream : Stream[F, Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]] = { val chunkQueue: F[Queue[F, Option[Chunk[CommittableConsumerRecord[F, K, V]]]]] = Queue.bounded(settings.maxPrefetchBatches - 1) type PartitionRequest = - ( - Chunk[KafkaByteConsumerRecord], - Map[TopicPartition, OffsetAndMetadata] => F[Unit], - FetchCompletedReason - ) + (Chunk[KafkaByteConsumerRecord], FetchCompletedReason) type PartitionsMap = Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]] type PartitionsMapQueue = Queue[F, Option[PartitionsMap]] @@ -170,7 +164,6 @@ object KafkaConsumer { } yield Stream.eval { def committableConsumerRecord( record: ConsumerRecord[K, V], - offsetCommit: Map[TopicPartition, OffsetAndMetadata] => F[Unit], partition: TopicPartition ): CommittableConsumerRecord[F, K, V] = CommittableConsumerRecord( @@ -182,7 +175,7 @@ object KafkaConsumer { record.offset + 1L, settings.recordMetadata(record) ), - commit = offsetCommit + commit = actor.offsetCommit ) ) @@ -197,11 +190,11 @@ object KafkaConsumer { case Left(()) => stopReqs.complete(()).void - case Right((chunk, offsetCommit, reason)) => + case Right((chunk, reason)) => val c = chunk.traverse[F, CommittableConsumerRecord[F, K, V]] { rec => ConsumerRecord .fromJava[F, K, V](rec, keyDes, valueDes) - .map(committableConsumerRecord(_, offsetCommit, partition)) + .map(committableConsumerRecord(_, partition)) } val enqueueChunk = c.flatMap { chunk => @@ -649,7 +642,7 @@ object KafkaConsumer { withConsumer = withConsumer ) } - actor <- startConsumerActor(requests, polls, actor) + actorFiber <- startConsumerActor(requests, polls, actor) polls <- startPollScheduler(polls, settings.pollInterval) } yield createKafkaConsumer( requests, @@ -657,7 +650,7 @@ object KafkaConsumer { keyDeserializer, valueDeserializer, actor, - polls, + actorFiber.combine(polls), streamId, id, withConsumer, diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 62ba76cb6..821252640 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -155,18 +155,12 @@ private[kafka] final class KafkaConsumerActor[F[_]]( private[this] def fetch( partition: TopicPartition, streamId: StreamId, - callback: ( - ( - Chunk[KafkaByteConsumerRecord], - Map[TopicPartition, OffsetAndMetadata] => F[Unit], - FetchCompletedReason - ) - ) => F[Unit] + callback: ((Chunk[KafkaByteConsumerRecord], FetchCompletedReason)) => F[Unit] ): F[Unit] = { val assigned = withConsumer.blocking { _.assignment.contains(partition) } - def storeFetch = + def storeFetch: F[Unit] = ref .modify { state => val (newState, oldFetch) = @@ -177,13 +171,13 @@ private[kafka] final class KafkaConsumerActor[F[_]]( case (newState, oldFetches) => log(StoredFetch(partition, callback, newState)) >> oldFetches.traverse_ { fetch => - fetch.completeRevoked(Chunk.empty, offsetCommit) >> + fetch.completeRevoked(Chunk.empty) >> log(RevokedPreviousFetch(partition, streamId)) } } def completeRevoked = - callback((Chunk.empty, offsetCommit, FetchCompletedReason.TopicPartitionRevoked)) + callback((Chunk.empty, FetchCompletedReason.TopicPartitionRevoked)) assigned.ifM(storeFetch, completeRevoked) } @@ -265,7 +259,7 @@ private[kafka] final class KafkaConsumerActor[F[_]]( val action = st.fetches.filterKeysStrictList(withRecords).traverse { case (partition, partitionFetches) => val records = Chunk.vector(st.records(partition).toVector) - partitionFetches.values.toList.traverse(_.completeRevoked(records, offsetCommit)) + partitionFetches.values.toList.traverse(_.completeRevoked(records)) } >> logging.log( RevokedFetchesWithRecords(st.records.filterKeysStrict(withRecords), newState) ) @@ -280,7 +274,7 @@ private[kafka] final class KafkaConsumerActor[F[_]]( val action = st.fetches .filterKeysStrictValuesList(withoutRecords) - .traverse(_.values.toList.traverse(_.completeRevoked(Chunk.empty, offsetCommit))) >> + .traverse(_.values.toList.traverse(_.completeRevoked(Chunk.empty))) >> logging.log(RevokedFetchesWithoutRecords(withoutRecords, newState)) (newState, action) @@ -363,7 +357,7 @@ private[kafka] final class KafkaConsumerActor[F[_]]( } } - private[this] val offsetCommit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] = + private[kafka] val offsetCommit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] = offsets => { val commit = runCommitAsync(offsets) { cb => requests.offer(Request.Commit(offsets, cb)) @@ -430,7 +424,7 @@ private[kafka] final class KafkaConsumerActor[F[_]]( state.fetches.filterKeysStrictList(canBeCompleted).traverse_ { case (partition, fetches) => val records = Chunk.vector(allRecords(partition).toVector) - fetches.values.toList.traverse_(_.completeRecords(records, offsetCommit)) + fetches.values.toList.traverse_(_.completeRecords(records)) } (canBeCompleted.nonEmpty, canBeStored.nonEmpty) match { @@ -590,22 +584,19 @@ private[kafka] object KafkaConsumerActor { callback: ( ( Chunk[KafkaByteConsumerRecord], - Map[TopicPartition, OffsetAndMetadata] => F[Unit], FetchCompletedReason ) ) => F[Unit] ) { def completeRevoked( - chunk: Chunk[KafkaByteConsumerRecord], - offsetCommit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] + chunk: Chunk[KafkaByteConsumerRecord] ): F[Unit] = - callback((chunk, offsetCommit, FetchCompletedReason.TopicPartitionRevoked)) + callback((chunk, FetchCompletedReason.TopicPartitionRevoked)) def completeRecords( - chunk: Chunk[KafkaByteConsumerRecord], - offsetCommit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] + chunk: Chunk[KafkaByteConsumerRecord] ): F[Unit] = - callback((chunk, offsetCommit, FetchCompletedReason.FetchedRecords)) + callback((chunk, FetchCompletedReason.FetchedRecords)) override def toString: String = "FetchRequest$" + System.identityHashCode(this) @@ -634,7 +625,6 @@ private[kafka] object KafkaConsumerActor { callback: ( ( Chunk[KafkaByteConsumerRecord], - Map[TopicPartition, OffsetAndMetadata] => F[Unit], FetchCompletedReason ) ) => F[Unit] @@ -781,13 +771,7 @@ private[kafka] object KafkaConsumerActor { final case class Fetch[F[_]]( partition: TopicPartition, streamId: StreamId, - callback: ( - ( - Chunk[KafkaByteConsumerRecord], - Map[TopicPartition, OffsetAndMetadata] => F[Unit], - FetchCompletedReason - ) - ) => F[Unit] + callback: ((Chunk[KafkaByteConsumerRecord], FetchCompletedReason)) => F[Unit] ) extends Request[F] final case class Commit[F[_]]( diff --git a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala index 4ac085152..ea79bcde2 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala @@ -17,7 +17,6 @@ import fs2.kafka.internal.syntax._ import java.util.regex.Pattern import org.apache.kafka.common.TopicPartition import scala.collection.immutable.SortedSet -import org.apache.kafka.clients.consumer.OffsetAndMetadata private[kafka] sealed abstract class LogEntry { def level: LogLevel @@ -63,13 +62,7 @@ private[kafka] object LogEntry { final case class StoredFetch[F[_]]( partition: TopicPartition, - callback: ( - ( - Chunk[KafkaByteConsumerRecord], - Map[TopicPartition, OffsetAndMetadata] => F[Unit], - FetchCompletedReason - ) - ) => F[Unit], + callback: ((Chunk[KafkaByteConsumerRecord], FetchCompletedReason)) => F[Unit], state: State[F] ) extends LogEntry { override def level: LogLevel = Debug From 87757447899b40cc6a63d3db285c3de0ca3dbe9f Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Mon, 14 Mar 2022 09:18:01 +0000 Subject: [PATCH 5/6] Move consumer group id back to kafka consumer --- modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala | 3 +-- .../main/scala/fs2/kafka/internal/KafkaConsumerActor.scala | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 78013f186..7312ef9c7 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -28,7 +28,6 @@ import scala.annotation.nowarn import scala.collection.immutable.SortedSet import scala.concurrent.duration.FiniteDuration import scala.util.matching.Regex -import org.apache.kafka.clients.consumer.ConsumerConfig /** * [[KafkaConsumer]] represents a consumer of Kafka records, with the @@ -170,7 +169,7 @@ object KafkaConsumer { record = record, offset = CommittableOffset( topicPartition = partition, - consumerGroupId = settings.properties.get(ConsumerConfig.GROUP_ID_CONFIG), + consumerGroupId = actor.consumerGroupId, offsetAndMetadata = new OffsetAndMetadata( record.offset + 1L, settings.recordMetadata(record) diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 821252640..ae99c5321 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, OffsetAndMe import org.apache.kafka.common.TopicPartition import scala.collection.immutable.SortedSet +import org.apache.kafka.clients.consumer.ConsumerConfig /** * [[KafkaConsumerActor]] wraps a Java `KafkaConsumer` and works similar to @@ -58,6 +59,9 @@ private[kafka] final class KafkaConsumerActor[F[_]]( private[this] type ConsumerRecords = Map[TopicPartition, NonEmptyVector[KafkaByteConsumerRecord]] + private[kafka] val consumerGroupId: Option[String] = + settings.properties.get(ConsumerConfig.GROUP_ID_CONFIG) + private[this] val consumerRebalanceListener: ConsumerRebalanceListener = new ConsumerRebalanceListener { override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = From 988b50864b703f747205c3245a337631fb4fd126 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Mon, 14 Mar 2022 09:20:47 +0000 Subject: [PATCH 6/6] Revert formatting changes --- .../fs2/kafka/internal/KafkaConsumerActor.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index ae99c5321..9dc1f5df5 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -585,12 +585,7 @@ private[kafka] final class KafkaConsumerActor[F[_]]( private[kafka] object KafkaConsumerActor { final case class FetchRequest[F[_]]( - callback: ( - ( - Chunk[KafkaByteConsumerRecord], - FetchCompletedReason - ) - ) => F[Unit] + callback: ((Chunk[KafkaByteConsumerRecord], FetchCompletedReason)) => F[Unit] ) { def completeRevoked( chunk: Chunk[KafkaByteConsumerRecord] @@ -626,12 +621,7 @@ private[kafka] object KafkaConsumerActor { def withFetch( partition: TopicPartition, streamId: StreamId, - callback: ( - ( - Chunk[KafkaByteConsumerRecord], - FetchCompletedReason - ) - ) => F[Unit] + callback: ((Chunk[KafkaByteConsumerRecord], FetchCompletedReason)) => F[Unit] ): (State[F], List[FetchRequest[F]]) = { val newFetchRequest = FetchRequest(callback)