Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slighlty improve docs for stream methods #872

Merged
merged 1 commit into from
Feb 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsume.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ trait KafkaConsume[F[_], K, V] {
* Alias for `partitionedStream.parJoinUnbounded`.
* See [[partitionedRecords]] for more information.
*
* @note you have to first use `subscribe` to subscribe the consumer
* @note you have to first use `subscribe` or `assign` the consumer
* before using this `Stream`. If you forgot to subscribe, there
* will be a [[NotSubscribedException]] raised in the `Stream`.
*/
Expand All @@ -47,35 +47,37 @@ trait KafkaConsume[F[_], K, V] {
* can use [[records]] instead, where records for all partitions are in
* a single `Stream`.
*
* @note you have to first use `subscribe` to subscribe the consumer
* @note you have to first use `subscribe` or `assign` the consumer
* before using this `Stream`. If you forgot to subscribe, there
* will be a [[NotSubscribedException]] raised in the `Stream`.
*/
def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]]

/**
* `Stream` where each element contains an assignment. Each assignment is
* `Map`, where keys are `TopicPartition`s, and values are record streams for
* the `TopicPartition`.<br>
* `Stream` where each element contains a `Map` with all newly assigned partitions.
* Keys of this `Map` are `TopicPartition`s, and values are record streams for
* the particular `TopicPartition`. These streams will be closed only when
* a partition is revoked.<br>
* <br>
* With the default assignor, previous partition assignments are revoked at
* once, and a new set of partitions assigned to a consumer on each
* rebalance. In this case, each returned map contains the full partition
* assignment for the consumer. `partitionsMapStream` reflects the assignment
* process in a streaming manner.<br>
* With the default assignor, all previous partitions are revoked at
* once, and a new set of partitions is assigned to a consumer on each
* rebalance. In this case, each returned `Map` contains the full partition
* assignment for the consumer. And all streams from the previous assignment are closed.
* It means, that `partitionsMapStream` reflects
* the default assignment process in a streaming manner.<br>
* <br>
* This may not be the case when a custom assignor is configured in the
* consumer. When using the `CooperativeStickyAssignor`, for instance,
* partition assignments may be revoked individually. In this case, each
* element in the stream will contain only streams for newly assigned
* partitions. Streams returned previously will remain active until the
* assignment is revoked.<br>
* partitions may be revoked individually. In this case, each
* element in the stream (each`Map`) will contain only streams for newly assigned
* partitions. Previously returned streams for partitions that are retained
* will remain active. Only streams for revoked partitions will be closed.<br>
* <br>
* This is the most generic `Stream` method. If you don't need such control,
* consider using `partitionedStream` or `stream` methods. They are both
* based on a `partitionsMapStream`.
*
* @note you have to first use `subscribe` to subscribe the consumer
* @note you have to first use `subscribe` or `assign` to subscribe the consumer
* before using this `Stream`. If you forgot to subscribe, there
* will be a [[NotSubscribedException]] raised in the `Stream`.
* @see [[records]]
Expand Down