diff --git a/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala b/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala
index 9412aad76..417e4c774 100644
--- a/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala
+++ b/modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala
@@ -110,7 +110,7 @@ object CommittableOffset {
Map(_topicPartition -> _offsetAndMetadata)
override def batch: CommittableOffsetBatch[F] =
- CommittableOffsetBatch(offsets, consumerGroupId.toSet, consumerGroupId.isEmpty, _commit)
+ CommittableOffsetBatch.one(this)
override def commit: F[Unit] =
diff --git a/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala b/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala
index e0e3600c3..1597b6b27 100644
--- a/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala
+++ b/modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala
@@ -6,11 +6,10 @@
package fs2.kafka
-import cats.ApplicativeError
+import cats.{Applicative, ApplicativeError, ApplicativeThrow, Foldable, Show}
import cats.instances.list._
import cats.syntax.foldable._
import cats.syntax.show._
-import cats.{Applicative, Foldable, Show}
import fs2.kafka.instances._
import fs2.kafka.internal.syntax._
import org.apache.kafka.clients.consumer.OffsetAndMetadata
@@ -93,40 +92,48 @@ sealed abstract class CommittableOffsetBatch[F[_]] {
* is nothing to commit.
def commit: F[Unit]
+ private[kafka] def committableOffsetsMap
+ : Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]]
object CommittableOffsetBatch {
- private[kafka] def apply[F[_]](
+ private[kafka] def ofMultiTopic[F[_]](
offsets: Map[TopicPartition, OffsetAndMetadata],
consumerGroupIds: Set[String],
consumerGroupIdsMissing: Boolean,
- commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit]
+ commitOffsetsMap: Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]]
)(implicit F: ApplicativeError[F, Throwable]): CommittableOffsetBatch[F] = {
val _offsets = offsets
val _consumerGroupIds = consumerGroupIds
val _consumerGroupIdsMissing = consumerGroupIdsMissing
- val _commit = commit
+ val _commitOffsetsMap = commitOffsetsMap
new CommittableOffsetBatch[F] {
override def updated(that: CommittableOffset[F]): CommittableOffsetBatch[F] =
- CommittableOffsetBatch(
+ CommittableOffsetBatch.ofMultiTopic(
_offsets.updated(that.topicPartition, that.offsetAndMetadata),
that.consumerGroupId.fold(_consumerGroupIds)(_consumerGroupIds + _),
_consumerGroupIdsMissing || that.consumerGroupId.isEmpty,
- _commit
+ _commitOffsetsMap.updated(that.topicPartition.topic(), that.commitOffsets)
override def updated(that: CommittableOffsetBatch[F]): CommittableOffsetBatch[F] =
- CommittableOffsetBatch(
+ CommittableOffsetBatch.ofMultiTopic(
_offsets ++ that.offsets,
_consumerGroupIds ++ that.consumerGroupIds,
_consumerGroupIdsMissing || that.consumerGroupIdsMissing,
- _commit
+ (committableOffsetsMap.toList ++ that.committableOffsetsMap.toList).toMap
override val offsets: Map[TopicPartition, OffsetAndMetadata] =
+ override val committableOffsetsMap
+ : Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]] =
+ _commitOffsetsMap
override val consumerGroupIds: Set[String] =
@@ -134,15 +141,71 @@ object CommittableOffsetBatch {
override def commit: F[Unit] =
- if (_consumerGroupIdsMissing || _consumerGroupIds.size != 1)
- F.raiseError(ConsumerGroupException(consumerGroupIds))
- else _commit(offsets)
+ if (_consumerGroupIdsMissing)
+ ApplicativeThrow[F].raiseError(ConsumerGroupException(consumerGroupIds))
+ else {
+ offsets
+ .groupBy(_._1.topic())
+ .map {
+ case (topicName, info) =>
+ committableOffsetsMap
+ .getOrElse[Map[TopicPartition, OffsetAndMetadata] => F[Unit]](
+ topicName,
+ _ =>
+ ApplicativeThrow[F].raiseError(
+ new RuntimeException(s"Cannot perform commit for topic: $topicName")
+ )
+ )
+ .apply(info)
+ }
+ .toList
+ .sequence_
+ }
override def toString: String =
+ @deprecated("Use CommittableOffsetBatch.apply with commitMap instead.", since = "2.5.1")
+ private[kafka] def apply[F[_]](
+ offsets: Map[TopicPartition, OffsetAndMetadata],
+ consumerGroupIds: Set[String],
+ consumerGroupIdsMissing: Boolean,
+ commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit]
+ )(implicit F: ApplicativeError[F, Throwable]): CommittableOffsetBatch[F] =
+ ofMultiTopic[F](
+ offsets,
+ consumerGroupIds,
+ consumerGroupIdsMissing,
+ offsets.headOption
+ .map(_._1.topic())
+ .map(topicName => Map(topicName -> commit))
+ .getOrElse(Map.empty)
+ )
+ /**
+ * A [[CommittableOffsetBatch]] which does include only one offset for a single topic.
+ *
+ * @tparam F effect type to use to perform the commit effect
+ * @return A [[CommittableOffsetBatch]] which does include only one offset for a single topic.
+ *
+ * @see [[CommittableOffsetBatch#fromFoldable]]
+ * @see [[CommittableOffsetBatch#fromFoldableOption]]
+ */
+ def one[F[_]: ApplicativeThrow](
+ committableOffset: CommittableOffset[F]
+ ): CommittableOffsetBatch[F] =
+ CommittableOffsetBatch.ofMultiTopic[F](
+ Map(committableOffset.topicPartition -> committableOffset.offsetAndMetadata),
+ committableOffset.consumerGroupId.toSet,
+ committableOffset.consumerGroupId.isEmpty,
+ Map(
+ committableOffset.topicPartition
+ .topic() -> Map(committableOffset.topicPartition -> committableOffset.commit)
+ )
+ )
* Creates a [[CommittableOffsetBatch]] from some [[CommittableOffset]]s,
* where the containing type has a `Foldable` instance. Guaranteed to be
@@ -183,32 +246,35 @@ object CommittableOffsetBatch {
def fromFoldableMap[F[_], G[_], A](ga: G[A])(f: A => CommittableOffset[F])(
implicit F: ApplicativeError[F, Throwable],
G: Foldable[G]
- ): CommittableOffsetBatch[F] = {
- var commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] = null
- var offsetsMap: Map[TopicPartition, OffsetAndMetadata] = Map.empty
- var consumerGroupIds: Set[String] = Set.empty
- var consumerGroupIdsMissing: Boolean = false
- var empty: Boolean = true
- ga.foldLeft(()) { (_, a) =>
- val offset = f(a)
- if (empty) {
- commit = offset.commitOffsets
- empty = false
+ ): CommittableOffsetBatch[F] =
+ if (ga.isEmpty)
+ CommittableOffsetBatch.empty[F]
+ else {
+ var commitMap: Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]] = Map.empty
+ var offsetsMap: Map[TopicPartition, OffsetAndMetadata] = Map.empty
+ var consumerGroupIds: Set[String] = Set.empty
+ var consumerGroupIdsMissing: Boolean = false
+ ga.foldLeft(()) { (_, a) =>
+ val offset: CommittableOffset[F] = f(a)
+ val topicPartition = offset.topicPartition
+ commitMap = commitMap.updatedIfAbsent(topicPartition.topic(), offset.commitOffsets)
+ offsetsMap = offsetsMap.updated(topicPartition, offset.offsetAndMetadata)
+ offset.consumerGroupId match {
+ case Some(consumerGroupId) => consumerGroupIds = consumerGroupIds + consumerGroupId
+ case None => consumerGroupIdsMissing = true
+ }
- offsetsMap = offsetsMap.updated(offset.topicPartition, offset.offsetAndMetadata)
- offset.consumerGroupId match {
- case Some(consumerGroupId) => consumerGroupIds = consumerGroupIds + consumerGroupId
- case None => consumerGroupIdsMissing = true
- }
+ CommittableOffsetBatch.ofMultiTopic(
+ offsetsMap,
+ consumerGroupIds,
+ consumerGroupIdsMissing,
+ commitMap
+ )
- if (empty) CommittableOffsetBatch.empty[F]
- else CommittableOffsetBatch(offsetsMap, consumerGroupIds, consumerGroupIdsMissing, commit)
- }
* Creates a [[CommittableOffsetBatch]] from some [[CommittableOffset]]s wrapped
* in `Option`, where the containing type has a `Foldable` instance. Guaranteed
@@ -231,20 +297,18 @@ object CommittableOffsetBatch {
implicit F: ApplicativeError[F, Throwable],
G: Foldable[G]
): CommittableOffsetBatch[F] = {
- var commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit] = null
+ var commitMap: Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]] = Map.empty
var offsetsMap: Map[TopicPartition, OffsetAndMetadata] = Map.empty
var consumerGroupIds: Set[String] = Set.empty
var consumerGroupIdsMissing: Boolean = false
- var empty: Boolean = true
offsets.foldLeft(()) {
case (_, Some(offset)) =>
- if (empty) {
- commit = offset.commitOffsets
- empty = false
- }
+ val topicPartition = offset.topicPartition
- offsetsMap = offsetsMap.updated(offset.topicPartition, offset.offsetAndMetadata)
+ commitMap = commitMap.updatedIfAbsent(topicPartition.topic(), offset.commitOffsets)
+ offsetsMap = offsetsMap.updated(topicPartition, offset.offsetAndMetadata)
offset.consumerGroupId match {
case Some(consumerGroupId) => consumerGroupIds = consumerGroupIds + consumerGroupId
case None => consumerGroupIdsMissing = true
@@ -252,8 +316,15 @@ object CommittableOffsetBatch {
case (_, None) => ()
- if (empty) CommittableOffsetBatch.empty[F]
- else CommittableOffsetBatch(offsetsMap, consumerGroupIds, consumerGroupIdsMissing, commit)
+ if (offsets.isEmpty || offsets.exists(_.isEmpty))
+ CommittableOffsetBatch.empty[F]
+ else
+ CommittableOffsetBatch.ofMultiTopic(
+ offsetsMap,
+ consumerGroupIds,
+ consumerGroupIdsMissing,
+ commitMap
+ )
@@ -286,6 +357,10 @@ object CommittableOffsetBatch {
override def toString: String =
+ override private[kafka] def committableOffsetsMap
+ : Map[String, Map[TopicPartition, OffsetAndMetadata] => F[Unit]] =
+ Map.empty
implicit def committableOffsetBatchShow[F[_]]: Show[CommittableOffsetBatch[F]] =
diff --git a/modules/core/src/main/scala/fs2/kafka/ConsumerGroupException.scala b/modules/core/src/main/scala/fs2/kafka/ConsumerGroupException.scala
index c6e671a10..889e0a3f5 100644
--- a/modules/core/src/main/scala/fs2/kafka/ConsumerGroupException.scala
+++ b/modules/core/src/main/scala/fs2/kafka/ConsumerGroupException.scala
@@ -13,12 +13,11 @@ import org.apache.kafka.common.KafkaException
* while attempting to commit offsets.
* - There were [[CommittableOffset]]s without a consumer group ID.
- * - There were [[CommittableOffset]]s for multiple consumer group IDs.
sealed abstract class ConsumerGroupException(groupIds: Set[String])
extends KafkaException({
val groupIdsString = groupIds.toList.sorted.mkString(", ")
- s"multiple or missing consumer group ids [$groupIdsString]"
+ s"missing consumer group ids [$groupIdsString]"
private[kafka] object ConsumerGroupException {
diff --git a/modules/core/src/test/scala/fs2/kafka/BaseGenerators.scala b/modules/core/src/test/scala/fs2/kafka/BaseGenerators.scala
index e6a0837eb..b3cc69424 100644
--- a/modules/core/src/test/scala/fs2/kafka/BaseGenerators.scala
+++ b/modules/core/src/test/scala/fs2/kafka/BaseGenerators.scala
@@ -89,7 +89,17 @@ trait BaseGenerators {
implicit F: ApplicativeError[F, Throwable]
): Gen[CommittableOffsetBatch[F]] =
arbitrary[Map[TopicPartition, OffsetAndMetadata]]
- .map(CommittableOffsetBatch[F](_, Set.empty, false, _ => F.unit))
+ .map(_.toList)
+ .map(
+ CommittableOffsetBatch.fromFoldableMap(_)(a => {
+ CommittableOffset(
+ topicPartition = a._1,
+ offsetAndMetadata = a._2,
+ consumerGroupId = None,
+ commit = _ => F.unit
+ )
+ })
+ )
implicit def arbCommittableOffsetBatch[F[_]](
implicit F: ApplicativeError[F, Throwable]