diff --git a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsume.scala b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsume.scala index 255f2f9de..cccd56347 100644 --- a/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsume.scala +++ b/modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsume.scala @@ -22,7 +22,7 @@ trait KafkaConsume[F[_], K, V] { * Alias for `partitionedStream.parJoinUnbounded`. * See [[partitionedRecords]] for more information. * - * @note you have to first use `subscribe` to subscribe the consumer + * @note you have to first use `subscribe` or `assign` the consumer * before using this `Stream`. If you forgot to subscribe, there * will be a [[NotSubscribedException]] raised in the `Stream`. */ @@ -47,35 +47,37 @@ trait KafkaConsume[F[_], K, V] { * can use [[records]] instead, where records for all partitions are in * a single `Stream`. * - * @note you have to first use `subscribe` to subscribe the consumer + * @note you have to first use `subscribe` or `assign` the consumer * before using this `Stream`. If you forgot to subscribe, there * will be a [[NotSubscribedException]] raised in the `Stream`. */ def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]] /** - * `Stream` where each element contains an assignment. Each assignment is - * `Map`, where keys are `TopicPartition`s, and values are record streams for - * the `TopicPartition`.
+ * `Stream` where each element contains a `Map` with all newly assigned partitions. + * Keys of this `Map` are `TopicPartition`s, and values are record streams for + * the particular `TopicPartition`. These streams will be closed only when + * a partition is revoked.
*
- * With the default assignor, previous partition assignments are revoked at - * once, and a new set of partitions assigned to a consumer on each - * rebalance. In this case, each returned map contains the full partition - * assignment for the consumer. `partitionsMapStream` reflects the assignment - * process in a streaming manner.
+ * With the default assignor, all previous partitions are revoked at + * once, and a new set of partitions is assigned to a consumer on each + * rebalance. In this case, each returned `Map` contains the full partition + * assignment for the consumer. And all streams from the previous assignment are closed. + * It means, that `partitionsMapStream` reflects + * the default assignment process in a streaming manner.
*
* This may not be the case when a custom assignor is configured in the * consumer. When using the `CooperativeStickyAssignor`, for instance, - * partition assignments may be revoked individually. In this case, each - * element in the stream will contain only streams for newly assigned - * partitions. Streams returned previously will remain active until the - * assignment is revoked.
+ * partitions may be revoked individually. In this case, each + * element in the stream (each`Map`) will contain only streams for newly assigned + * partitions. Previously returned streams for partitions that are retained + * will remain active. Only streams for revoked partitions will be closed.
*
* This is the most generic `Stream` method. If you don't need such control, * consider using `partitionedStream` or `stream` methods. They are both * based on a `partitionsMapStream`. * - * @note you have to first use `subscribe` to subscribe the consumer + * @note you have to first use `subscribe` or `assign` to subscribe the consumer * before using this `Stream`. If you forgot to subscribe, there * will be a [[NotSubscribedException]] raised in the `Stream`. * @see [[records]]