Skip to content

Commit

Permalink
Use mapAccumulate instead of foldLeft
Browse files Browse the repository at this point in the history
  • Loading branch information
L7R7 committed Jan 31, 2024
1 parent b2b8a65 commit 88bbbbb
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ trait KafkaConsumeChunk[F[_], K, V] extends KafkaConsume[F, K, V] {
private def consume(processor: Chunk[ConsumerRecord[K, V]] => F[CommitNow])(
chunk: Chunk[CommittableConsumerRecord[F, K, V]]
)(implicit F: Monad[F]): F[Unit] = {
val (offsets, records) = chunk.foldLeft(
(CommittableOffsetBatch.empty, Chunk.empty[ConsumerRecord[K, V]])
)((acc, record) => (acc._1.updated(record.offset), acc._2 ++ Chunk(record.record)))
val (offsets, records) = chunk
.mapAccumulate(CommittableOffsetBatch.empty)((offsetBatch, committableRecord) =>
(offsetBatch.updated(committableRecord.offset), committableRecord.record)
)

processor(records) >> offsets.commit
}
Expand Down

0 comments on commit 88bbbbb

Please sign in to comment.