Skip to content

Commit

Permalink
Add Scaladoc for consumeChunk
Browse files Browse the repository at this point in the history
  • Loading branch information
Leonhard Riedisser committed Dec 11, 2023
1 parent 4c18522 commit e5cb844
Showing 1 changed file with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,44 @@ import fs2.kafka.ConsumerRecord

trait KafkaConsumeChunk[F[_], K, V] extends KafkaConsume[F, K, V] {

/**
* Consume from all assigned partitions concurrently, processing the messages in `Chunk`s. For
* each `Chunk`, the provided `processor` is called, after that has finished the offsets for all
* messages in the chunks are committed.<br><br>
*
* This method is intended to be used in cases where messages have to be processed before offsets
* are committed. By relying on the methods like [[partitionedStream]], [[records]], and similar,
* you have to correctly implement not only your processing logic but also the correct mechanism
* for committing offsets. This can be tricky to do in a correct and efficient way
*
* This approach has several benefits:<br>
* - As a user, you don't have to care about committing offsets correctly. You can focus on
* implementing your business logic<br>
* - It's very straightforward to batch several messages from a `Chunk` together, e.g. for
* efficient writes to a persistent storage<br>
* - You can liberally use logic that involves concurrency, filtering, and re-ordering of
* messages without having to worry about incorrect offset commits<br>
*
* <br>
*
* The `processor` is a function that takes a `Chunk[ConsumerRecord[K, V]]` and returns a
* `F[CommitNow]`. [[CommitNow]] is isomorphic to `Unit`, but helps in transporting the intention
* that processing of a `Chunk` is done and offsets should be committed.<br><br>
*
* The returned value is `F[Nothing]`, because it's a never-ending process that doesn't
* terminate, and therefore doesn't return a result.
*
* @note
* This method does not make any use of Kafka's auto-commit feature, it implements "manual"
* commits in a way that suits most of the common use cases.
* @note
* you have to first use `subscribe` or `assign` the consumer before using this `Stream`. If
* you forgot to subscribe, there will be a [[NotSubscribedException]] raised in the `Stream`.
* @see
* [[partitionedStream]]
* @see
* [[CommitNow]]
*/
final def consumeChunk(
processor: Chunk[ConsumerRecord[K, V]] => F[CommitNow]
)(implicit F: Concurrent[F]): F[Nothing] = partitionedStream
Expand All @@ -36,6 +74,13 @@ trait KafkaConsumeChunk[F[_], K, V] extends KafkaConsume[F, K, V] {
object KafkaConsumeChunk {

type CommitNow = CommitNow.type

/**
* Token to indicate that a `Chunk` has been processed and the corresponding offsets are ready to
* be committed.<br>
*
* Isomorphic to `Unit`, but more intention revealing.
*/
object CommitNow

}

0 comments on commit e5cb844

Please sign in to comment.