diff --git a/build.sbt b/build.sbt index dd70a6b07..6fb562dc2 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ val kafkaVersion = "3.0.0" val testcontainersScalaVersion = "0.40.0" -val vulcanVersion = "1.7.1" +val vulcanVersion = "1.8.0" val munitVersion = "0.7.29" diff --git a/docs/src/main/mdoc/transactions.md b/docs/src/main/mdoc/transactions.md index 8163a5057..448dd062b 100644 --- a/docs/src/main/mdoc/transactions.md +++ b/docs/src/main/mdoc/transactions.md @@ -13,6 +13,11 @@ Kafka transactions are supported through a [`TransactionalKafkaProducer`][transa - Create `CommittableProducerRecords` and wrap them in `TransactionalProducerRecords`. +> Note that calls to `produce` are sequenced in the `TransactionalKafkaProducer` to ensure that, when used concurrently, transactions don't run into each other resulting in an invalid transaction transition exception. +> +> Because the `TransactionalKafkaProducer` waits for the record batch to be flushed and the transaction committed on the broker, this could lead to performance bottlenecks where a single producer is shared among many threads. +> To ensure the performance of `TransactionalKafkaProducer` aligns with your performance expectations when used concurrently, it is recommended you create a pool of transactional producers. + Following is an example where transactions are used to consume, process, produce, and commit. ```scala mdoc diff --git a/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala b/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala index fdc16c523..cce06abeb 100644 --- a/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala +++ b/modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala @@ -6,7 +6,8 @@ package fs2.kafka -import cats.Applicative +import cats.syntax.all._ +import cats.{Applicative, Functor} /** * Deserializer which may vary depending on whether a record @@ -17,6 +18,14 @@ sealed abstract class RecordDeserializer[F[_], A] { def forKey: F[Deserializer[F, A]] def forValue: F[Deserializer[F, A]] + + /** + * Returns a new [[RecordDeserializer]] instance that will catch deserialization + * errors and return them as a value, allowing user code to handle them without + * causing the consumer to fail. + */ + final def attempt(implicit F: Functor[F]): RecordDeserializer[F, Either[Throwable, A]] = + RecordDeserializer.instance(forKey.map(_.attempt), forValue.map(_.attempt)) } object RecordDeserializer { diff --git a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala index 12a000dea..189fe9040 100644 --- a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala @@ -58,6 +58,22 @@ object TransactionalKafkaProducer { def metrics: F[Map[MetricName, Metric]] } + /** + * [[TransactionalKafkaProducer.WithoutOffsets]] extends [[TransactionalKafkaProducer.Metrics]] + * to allow producing of records without corresponding upstream offsets. + */ + abstract class WithoutOffsets[F[_], K, V] extends Metrics[F, K, V] { + + /** + * Produces the `ProducerRecord`s in the specified [[ProducerRecords]] + * in three steps: first a transaction is initialized, then the records are placed + * in the buffer of the producer, and lastly the transaction is committed. If errors + * or cancellation occurs, the transaction is aborted. The returned effect succeeds + * if the whole transaction completes successfully. + */ + def produceWithoutOffsets[P](records: ProducerRecords[P, K, V]): F[ProducerResult[P, K, V]] + } + /** * Creates a new [[TransactionalKafkaProducer]] in the `Resource` context, * using the specified [[TransactionalProducerSettings]]. Note that there @@ -73,20 +89,20 @@ object TransactionalKafkaProducer { )( implicit F: Async[F], mk: MkProducer[F] - ): Resource[F, TransactionalKafkaProducer.Metrics[F, K, V]] = + ): Resource[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] = ( Resource.eval(settings.producerSettings.keySerializer), Resource.eval(settings.producerSettings.valueSerializer), - WithProducer(mk, settings) + WithTransactionalProducer(mk, settings) ).mapN { (keySerializer, valueSerializer, withProducer) => - new TransactionalKafkaProducer.Metrics[F, K, V] { + new TransactionalKafkaProducer.WithoutOffsets[F, K, V] { override def produce[P]( records: TransactionalProducerRecords[F, P, K, V] ): F[ProducerResult[P, K, V]] = - produceTransaction(records) + produceTransactionWithOffsets(records) .map(ProducerResult(_, records.passthrough)) - private[this] def produceTransaction[P]( + private[this] def produceTransactionWithOffsets[P]( records: TransactionalProducerRecords[F, P, K, V] ): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] = if (records.records.isEmpty) F.pure(Chunk.empty) @@ -100,34 +116,50 @@ object TransactionalKafkaProducer { else F.pure(batch.consumerGroupIds.head) consumerGroupId.flatMap { groupId => - withProducer { (producer, blocking) => - blocking(producer.beginTransaction()) - .bracketCase { _ => - records.records - .flatMap(_.records) - .traverse( - KafkaProducer - .produceRecord(keySerializer, valueSerializer, producer, blocking) - ) - .map(_.sequence) - .flatTap { _ => - blocking { - producer.sendOffsetsToTransaction( - batch.offsets.asJava, - new ConsumerGroupMetadata(groupId) - ) - } - } - } { - case (_, Outcome.Succeeded(_)) => - blocking(producer.commitTransaction()) - case (_, Outcome.Canceled() | Outcome.Errored(_)) => - blocking(producer.abortTransaction()) - } - }.flatten + val sendOffsets: (KafkaByteProducer, Blocking[F]) => F[Unit] = (producer, blocking) => + blocking { + producer.sendOffsetsToTransaction( + batch.offsets.asJava, + new ConsumerGroupMetadata(groupId) + ) + } + + produceTransaction(records.records.flatMap(_.records), Some(sendOffsets)) } } + override def produceWithoutOffsets[P]( + records: ProducerRecords[P, K, V] + ): F[ProducerResult[P, K, V]] = + produceTransaction(records.records, None).map(ProducerResult(_, records.passthrough)) + + private[this] def produceTransaction[P]( + records: Chunk[ProducerRecord[K, V]], + sendOffsets: Option[(KafkaByteProducer, Blocking[F]) => F[Unit]] + ): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] = + if (records.isEmpty) F.pure(Chunk.empty) + else { + + withProducer.exclusiveAccess { (producer, blocking) => + blocking(producer.beginTransaction()) + .bracketCase { _ => + val produce = records + .traverse( + KafkaProducer + .produceRecord(keySerializer, valueSerializer, producer, blocking) + ) + .map(_.sequence) + + sendOffsets.fold(produce)(f => produce.flatTap(_ => f(producer, blocking))) + } { + case (_, Outcome.Succeeded(_)) => + blocking(producer.commitTransaction()) + case (_, Outcome.Canceled() | Outcome.Errored(_)) => + blocking(producer.abortTransaction()) + } + }.flatten + } + override def metrics: F[Map[MetricName, Metric]] = withProducer.blocking { _.metrics().asScala.toMap } @@ -151,7 +183,7 @@ object TransactionalKafkaProducer { )( implicit F: Async[F], mk: MkProducer[F] - ): Stream[F, TransactionalKafkaProducer.Metrics[F, K, V]] = + ): Stream[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] = Stream.resource(resource(settings)) def apply[F[_]]: TransactionalProducerPartiallyApplied[F] = @@ -173,7 +205,7 @@ object TransactionalKafkaProducer { def resource[K, V](settings: TransactionalProducerSettings[F, K, V])( implicit F: Async[F], mk: MkProducer[F] - ): Resource[F, TransactionalKafkaProducer.Metrics[F, K, V]] = + ): Resource[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] = TransactionalKafkaProducer.resource(settings) /** @@ -189,7 +221,7 @@ object TransactionalKafkaProducer { def stream[K, V](settings: TransactionalProducerSettings[F, K, V])( implicit F: Async[F], mk: MkProducer[F] - ): Stream[F, TransactionalKafkaProducer.Metrics[F, K, V]] = + ): Stream[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] = TransactionalKafkaProducer.stream(settings) override def toString: String = diff --git a/modules/core/src/main/scala/fs2/kafka/internal/WithProducer.scala b/modules/core/src/main/scala/fs2/kafka/internal/WithProducer.scala index dd6fd4f11..1b10dddc4 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/WithProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/WithProducer.scala @@ -6,11 +6,10 @@ package fs2.kafka.internal -import fs2.kafka.producer.MkProducer import cats.effect.{Async, Resource} -import cats.syntax.all._ -import fs2.kafka.{KafkaByteProducer, ProducerSettings, TransactionalProducerSettings} +import fs2.kafka.{KafkaByteProducer, ProducerSettings} import scala.jdk.DurationConverters._ +import fs2.kafka.producer.MkProducer private[kafka] sealed abstract class WithProducer[F[_]] { def apply[A](f: (KafkaByteProducer, Blocking[F]) => F[A]): F[A] @@ -40,29 +39,6 @@ private[kafka] object WithProducer { .map(create(_, blockingG)) } - def apply[F[_], K, V]( - mk: MkProducer[F], - settings: TransactionalProducerSettings[F, K, V] - )( - implicit F: Async[F] - ): Resource[F, WithProducer[F]] = - Resource[F, WithProducer[F]] { - mk(settings.producerSettings).flatMap { producer => - val blocking = settings.producerSettings.customBlockingContext - .fold(Blocking.fromSync[F])(Blocking.fromExecutionContext) - - val withProducer = create(producer, blocking) - - val initTransactions = withProducer.blocking { _.initTransactions() } - - val close = withProducer.blocking { - _.close(settings.producerSettings.closeTimeout.toJava) - } - - initTransactions.as((withProducer, close)) - } - } - private def create[F[_]]( producer: KafkaByteProducer, _blocking: Blocking[F] diff --git a/modules/core/src/main/scala/fs2/kafka/internal/WithTransactionalProducer.scala b/modules/core/src/main/scala/fs2/kafka/internal/WithTransactionalProducer.scala new file mode 100644 index 000000000..42b240c93 --- /dev/null +++ b/modules/core/src/main/scala/fs2/kafka/internal/WithTransactionalProducer.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2018-2022 OVO Energy Limited + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package fs2.kafka.internal + +import cats.effect.std.Semaphore +import cats.effect.{Async, MonadCancelThrow, Resource} +import cats.implicits._ +import scala.jdk.DurationConverters._ +import fs2.kafka.producer.MkProducer +import fs2.kafka.{KafkaByteProducer, TransactionalProducerSettings} + +private[kafka] sealed abstract class WithTransactionalProducer[F[_]] { + def apply[A](f: (KafkaByteProducer, Blocking[F], ExclusiveAccess[F, A]) => F[A]): F[A] + + def exclusiveAccess[A](f: (KafkaByteProducer, Blocking[F]) => F[A]): F[A] = apply { + case (producer, blocking, exclusive) => exclusive(f(producer, blocking)) + } + + def blocking[A](f: KafkaByteProducer => A): F[A] = apply { + case (producer, blocking, _) => blocking(f(producer)) + } +} + +private[kafka] object WithTransactionalProducer { + def apply[F[_], K, V]( + mk: MkProducer[F], + settings: TransactionalProducerSettings[F, K, V] + )( + implicit F: Async[F] + ): Resource[F, WithTransactionalProducer[F]] = + Resource[F, WithTransactionalProducer[F]] { + (mk(settings.producerSettings), Semaphore(1)).tupled.flatMap { + case (producer, semaphore) => + val blocking = settings.producerSettings.customBlockingContext + .fold(Blocking.fromSync[F])(Blocking.fromExecutionContext) + + val withProducer = create(producer, blocking, semaphore) + + val initTransactions = withProducer.blocking { _.initTransactions() } + + /* + Deliberately does not use the exclusive access functionality to close the producer. The close method on + the underlying client waits until the buffer has been flushed to the broker or the timeout is exceeded. + Because the transactional producer _always_ waits until the buffer is flushed and the transaction + committed on the broker before proceeding, upon gaining exclusive access to the producer the buffer will + always be empty. Therefore if we used exclusive access to close the underlying producer, the buffer + would already be empty and the close timeout setting would be redundant. + + TLDR: not using exclusive access here preserves the behaviour of the underlying close method and timeout + setting + */ + val close = withProducer.blocking { + _.close(settings.producerSettings.closeTimeout.toJava) + } + + initTransactions.as((withProducer, close)) + } + } + + private def create[F[_]: MonadCancelThrow]( + producer: KafkaByteProducer, + _blocking: Blocking[F], + transactionSemaphore: Semaphore[F] + ): WithTransactionalProducer[F] = new WithTransactionalProducer[F] { + override def apply[A]( + f: (KafkaByteProducer, Blocking[F], ExclusiveAccess[F, A]) => F[A] + ): F[A] = + f(producer, _blocking, transactionSemaphore.permit.surround) + } +} diff --git a/modules/core/src/main/scala/fs2/kafka/internal/package.scala b/modules/core/src/main/scala/fs2/kafka/internal/package.scala new file mode 100644 index 000000000..47633f0d5 --- /dev/null +++ b/modules/core/src/main/scala/fs2/kafka/internal/package.scala @@ -0,0 +1,12 @@ +/* + * Copyright 2018-2022 OVO Energy Limited + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package fs2.kafka + +package object internal { + private[kafka] type ExclusiveAccess[F[_], A] = F[A] => F[A] + +} diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala index 6484a6530..2b7b3d03c 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala @@ -9,7 +9,11 @@ import cats.effect.unsafe.implicits.global import fs2.Stream import fs2.concurrent.SignallingRef import scala.jdk.CollectionConverters._ -import org.apache.kafka.clients.consumer.{ConsumerConfig, CooperativeStickyAssignor, NoOffsetForPartitionException} +import org.apache.kafka.clients.consumer.{ + ConsumerConfig, + CooperativeStickyAssignor, + NoOffsetForPartitionException +} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.TimeoutException import org.scalatest.Assertion @@ -559,7 +563,9 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { .stream( consumerSettings[IO] .withProperties( - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[ + CooperativeStickyAssignor + ].getName ) ) .subscribeTo(topic) @@ -801,9 +807,10 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { .stream( consumerSettings[IO] .withProperties( - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[ + CooperativeStickyAssignor + ].getName ) - ) .subscribeTo(topic) .evalMap { consumer => diff --git a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala index 0e6d4ed41..928b9bb0a 100644 --- a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala @@ -34,68 +34,186 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { } } - it("should be able to produce single records in a transaction") { + it("should be able to produce single records with offsets in a transaction") { withTopic { topic => - createCustomTopic(topic, partitions = 3) - val toProduce = (0 to 10).map(n => s"key-$n" -> s"value-$n") - - val produced = - (for { - producer <- TransactionalKafkaProducer.stream( - TransactionalProducerSettings( - "id", - producerSettings[IO] - .withRetries(Int.MaxValue) + testSingle( + topic, + Some( + i => + CommittableOffset[IO]( + new TopicPartition(topic, (i % 3).toInt), + new OffsetAndMetadata(i), + Some("group"), + _ => IO.unit ) + ) + ) + } + } + + it("should be able to produce single records without offsets in a transaction") { + withTopic { topic => + testSingle( + topic, + None + ) + } + } + + private def testSingle(topic: String, makeOffset: Option[Long => CommittableOffset[IO]]) = { + createCustomTopic(topic, partitions = 3) + val toProduce = (0 to 10).map(n => s"key-$n" -> s"value-$n") + + val produced = + (for { + producer <- TransactionalKafkaProducer.stream( + TransactionalProducerSettings( + "id", + producerSettings[IO] + .withRetries(Int.MaxValue) ) - _ <- Stream.eval(IO(producer.toString should startWith("TransactionalKafkaProducer$"))) - records <- Stream.chunk(Chunk.seq(toProduce)).zipWithIndex.map { - case ((key, value), i) => - val offset = - CommittableOffset[IO]( - new TopicPartition(topic, (i % 3).toInt), - new OffsetAndMetadata(i), - Some("group"), - _ => IO.unit + ) + _ <- Stream.eval(IO(producer.toString should startWith("TransactionalKafkaProducer$"))) + records <- Stream.chunk(Chunk.seq(toProduce)).zipWithIndex.map { + case ((key, value), i) => + val record = ProducerRecord(topic, key, value) + + makeOffset.fold[ + Either[ + ProducerRecords[(String, String), String, String], + TransactionalProducerRecords[IO, (String, String), String, String] + ] + ](Left(ProducerRecords.one(record, (key, value))))( + offset => + Right( + TransactionalProducerRecords.one( + CommittableProducerRecords.one( + record, + offset(i) + ), + (key, value) + ) ) + ) - TransactionalProducerRecords.one( - CommittableProducerRecords.one( - ProducerRecord(topic, key, value), - offset - ), - (key, value) - ) - } - passthrough <- Stream - .eval(producer.produce(records)) - .map(_.passthrough) - .buffer(toProduce.size) - } yield passthrough).compile.toVector.unsafeRunSync() + } + passthrough <- Stream + .eval(records.fold(producer.produceWithoutOffsets, producer.produce)) + .map(_.passthrough) + .buffer(toProduce.size) + } yield passthrough).compile.toVector.unsafeRunSync() + + produced should contain theSameElementsAs toProduce + + val consumed = { + consumeNumberKeyedMessagesFrom[String, String]( + topic, + produced.size, + customProperties = Map(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") + ) + } + + consumed should contain theSameElementsAs produced.toList + } + + it("should be able to produce multiple records with offsets in a transaction") { + withTopic { topic => + testMultiple( + topic, + Some( + i => + CommittableOffset[IO]( + new TopicPartition(topic, i % 3), + new OffsetAndMetadata(i.toLong), + Some("group"), + _ => IO.unit + ) + ) + ) + } + } - produced should contain theSameElementsAs toProduce + it("should be able to produce multiple records without offsets in a transaction") { + withTopic { topic => + testMultiple( + topic, + None + ) + } + } + + private def testMultiple(topic: String, makeOffset: Option[Int => CommittableOffset[IO]]) = { + createCustomTopic(topic, partitions = 3) + val toProduce = + Chunk.seq((0 to 100).toList.map(n => s"key-$n" -> s"value-$n")) - val consumed = { - consumeNumberKeyedMessagesFrom[String, String]( - topic, - produced.size, - customProperties = Map(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") + val toPassthrough = "passthrough" + + val produced = + (for { + producer <- TransactionalKafkaProducer.stream( + TransactionalProducerSettings( + "id", + producerSettings[IO] + .withRetries(Int.MaxValue) + ) ) + recordsToProduce = toProduce.map { + case (key, value) => ProducerRecord(topic, key, value) + } + + produce = makeOffset match { + case Some(offset) => + val offsets = toProduce.mapWithIndex { + case (_, i) => offset(i) + } + val records = TransactionalProducerRecords( + recordsToProduce.zip(offsets).map { + case (record, offset) => + CommittableProducerRecords.one( + record, + offset + ) + }, + toPassthrough + ) + producer.produce(records) + case None => + val records = ProducerRecords(recordsToProduce, toPassthrough) + producer.produceWithoutOffsets(records) + } + + result <- Stream.eval(produce) + } yield result).compile.lastOrError.unsafeRunSync() + + val records = + produced.records.map { + case (record, _) => + record.key -> record.value } - consumed should contain theSameElementsAs produced.toList + assert(records == toProduce && produced.passthrough == toPassthrough) + + val consumed = { + val customConsumerProperties = + Map(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") + consumeNumberKeyedMessagesFrom[String, String]( + topic, + records.size, + customProperties = customConsumerProperties + ) } + + consumed should contain theSameElementsAs records.toList } - it("should be able to produce multiple records in a transaction") { + it("should not allow concurrent access to a producer during a transaction") { withTopic { topic => createCustomTopic(topic, partitions = 3) val toProduce = - Chunk.seq((0 to 100).toList.map(n => s"key-$n" -> s"value-$n")) - - val toPassthrough = "passthrough" + Chunk.seq((0 to 1000000).toList.map(n => s"key-$n" -> s"value-$n")) - val produced = + val result = (for { producer <- TransactionalKafkaProducer.stream( TransactionalProducerSettings( @@ -123,31 +241,30 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { record, offset ) - }, - toPassthrough + } ) - result <- Stream.eval(producer.produce(records)) - } yield result).compile.lastOrError.unsafeRunSync() - - val records = - produced.records.map { - case (record, _) => - record.key -> record.value - } - - assert(records == toProduce && produced.passthrough == toPassthrough) - - val consumed = { - val customConsumerProperties = - Map(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed") - consumeNumberKeyedMessagesFrom[String, String]( - topic, - records.size, - customProperties = customConsumerProperties - ) - } + _ <- Stream + .eval(producer.produce(records)) + .concurrently( + Stream.eval( + producer.produce( + TransactionalProducerRecords.one( + CommittableProducerRecords.one( + ProducerRecord[String, String](topic, "test", "test"), + CommittableOffset[IO]( + new TopicPartition(topic, 0), + new OffsetAndMetadata(0), + Some("group"), + _ => IO.unit + ) + ) + ) + ) + ) + ) + } yield ()).compile.lastOrError.attempt.unsafeRunSync() - consumed should contain theSameElementsAs records.toList + assert(result == Right(())) } }