From f3575a4a869505c5c2ab07b3ebe43a98986334b6 Mon Sep 17 00:00:00 2001 From: sergeychernov Date: Wed, 9 Nov 2022 09:48:17 +0200 Subject: [PATCH 1/2] 1099 fix produce method on TransactionalKafkaProducer, when method is invoked with empty records and not empty offsets. Offsets will be committed. --- .../kafka/TransactionalKafkaProducer.scala | 4 -- .../TransactionalKafkaProducerSpec.scala | 55 ++++++++++++++++++- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala index a808c5499..bdc6ea90c 100644 --- a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala @@ -136,9 +136,6 @@ object TransactionalKafkaProducer { records: Chunk[ProducerRecord[K, V]], sendOffsets: Option[(KafkaByteProducer, Blocking[F]) => F[Unit]] ): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] = - if (records.isEmpty) F.pure(Chunk.empty) - else { - withProducer.exclusiveAccess { (producer, blocking) => blocking(producer.beginTransaction()) .bracketCase { _ => @@ -157,7 +154,6 @@ object TransactionalKafkaProducer { blocking(producer.abortTransaction()) } }.flatten - } override def metrics: F[Map[MetricName, Metric]] = withProducer.blocking { _.metrics().asScala.toMap } diff --git a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala index 03d515404..a9133b708 100644 --- a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala @@ -1,6 +1,8 @@ package fs2.kafka import java.util +import java.util.concurrent.atomic.AtomicBoolean + import cats.effect.IO import cats.effect.unsafe.implicits.global import cats.syntax.all._ @@ -12,7 +14,6 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.InvalidProducerEpochException import org.apache.kafka.common.serialization.ByteArraySerializer import org.scalatest.EitherValues - import scala.concurrent.duration._ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { @@ -141,6 +142,58 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { } } + it("should be able to commit offset without producing records in a transaction") { + withTopic { topic => + createCustomTopic(topic, partitions = 3) + val toPassthrough = "passthrough" + val commitState = new AtomicBoolean(false) + implicit val mk: MkProducer[IO] = new MkProducer[IO] { + def apply[G[_]](settings: ProducerSettings[G, _, _]): IO[KafkaByteProducer] = + IO.delay { + new org.apache.kafka.clients.producer.KafkaProducer[Array[Byte], Array[Byte]]( + (settings.properties: Map[String, AnyRef]).asJava, + new ByteArraySerializer, + new ByteArraySerializer + ) { + override def sendOffsetsToTransaction( + offsets: util.Map[TopicPartition, OffsetAndMetadata], + consumerGroupId: String + ): Unit = + { + commitState.set(true) + super.sendOffsetsToTransaction(offsets, consumerGroupId) + } + } + } + } + for { + producer <- TransactionalKafkaProducer.stream( + TransactionalProducerSettings( + s"id-$topic", + producerSettings[IO] + .withRetries(Int.MaxValue) + ) + ) + offsets = (i: Int) => CommittableOffset[IO]( + new TopicPartition(topic, i % 3), + new OffsetAndMetadata(i.toLong), + Some("group"), + _ => IO.unit) + + records = TransactionalProducerRecords( + Chunk.seq(0 to 100).map(i => CommittableProducerRecords(Chunk.empty, offsets(i))), + toPassthrough + ) + + results <- Stream.eval(producer.produce(records)) + } yield { + results.passthrough shouldBe toPassthrough + results.records should be (empty) + commitState.get shouldBe true + } + }.compile.lastOrError.unsafeRunSync() + } + private def testMultiple(topic: String, makeOffset: Option[Int => CommittableOffset[IO]]) = { createCustomTopic(topic, partitions = 3) val toProduce = From 249792f49b2e370eb970a9a35cd9f13739070f53 Mon Sep 17 00:00:00 2001 From: sergeychernov Date: Sat, 3 Dec 2022 19:46:17 +0700 Subject: [PATCH 2/2] 1099 formatting --- .../kafka/TransactionalKafkaProducer.scala | 36 +++++++++---------- .../TransactionalKafkaProducerSpec.scala | 21 +++++------ 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala index bdc6ea90c..dd57f4823 100644 --- a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala @@ -136,24 +136,24 @@ object TransactionalKafkaProducer { records: Chunk[ProducerRecord[K, V]], sendOffsets: Option[(KafkaByteProducer, Blocking[F]) => F[Unit]] ): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] = - withProducer.exclusiveAccess { (producer, blocking) => - blocking(producer.beginTransaction()) - .bracketCase { _ => - val produce = records - .traverse( - KafkaProducer - .produceRecord(keySerializer, valueSerializer, producer, blocking) - ) - .map(_.sequence) - - sendOffsets.fold(produce)(f => produce.flatTap(_ => f(producer, blocking))) - } { - case (_, Outcome.Succeeded(_)) => - blocking(producer.commitTransaction()) - case (_, Outcome.Canceled() | Outcome.Errored(_)) => - blocking(producer.abortTransaction()) - } - }.flatten + withProducer.exclusiveAccess { (producer, blocking) => + blocking(producer.beginTransaction()) + .bracketCase { _ => + val produce = records + .traverse( + KafkaProducer + .produceRecord(keySerializer, valueSerializer, producer, blocking) + ) + .map(_.sequence) + + sendOffsets.fold(produce)(f => produce.flatTap(_ => f(producer, blocking))) + } { + case (_, Outcome.Succeeded(_)) => + blocking(producer.commitTransaction()) + case (_, Outcome.Canceled() | Outcome.Errored(_)) => + blocking(producer.abortTransaction()) + } + }.flatten override def metrics: F[Map[MetricName, Metric]] = withProducer.blocking { _.metrics().asScala.toMap } diff --git a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala index a9133b708..b2f233267 100644 --- a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala @@ -156,10 +156,9 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { new ByteArraySerializer ) { override def sendOffsetsToTransaction( - offsets: util.Map[TopicPartition, OffsetAndMetadata], - consumerGroupId: String - ): Unit = - { + offsets: util.Map[TopicPartition, OffsetAndMetadata], + consumerGroupId: String + ): Unit = { commitState.set(true) super.sendOffsetsToTransaction(offsets, consumerGroupId) } @@ -174,11 +173,13 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { .withRetries(Int.MaxValue) ) ) - offsets = (i: Int) => CommittableOffset[IO]( - new TopicPartition(topic, i % 3), - new OffsetAndMetadata(i.toLong), - Some("group"), - _ => IO.unit) + offsets = (i: Int) => + CommittableOffset[IO]( + new TopicPartition(topic, i % 3), + new OffsetAndMetadata(i.toLong), + Some("group"), + _ => IO.unit + ) records = TransactionalProducerRecords( Chunk.seq(0 to 100).map(i => CommittableProducerRecords(Chunk.empty, offsets(i))), @@ -188,7 +189,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { results <- Stream.eval(producer.produce(records)) } yield { results.passthrough shouldBe toPassthrough - results.records should be (empty) + results.records should be(empty) commitState.get shouldBe true } }.compile.lastOrError.unsafeRunSync()