Skip to content

Commit

Permalink
Merge pull request #1100 from chernovsa/1099-fix-transactional-produc…
Browse files Browse the repository at this point in the history
…er-dont-commit-offset-with-empty-records

1099 Fix produce method of TransactionalKafkaProducer, when method is invoked with empty records and non-empty offsets
  • Loading branch information
bplommer authored Dec 5, 2022
2 parents c35f421 + 249792f commit 266df60
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,28 +136,24 @@ object TransactionalKafkaProducer {
records: Chunk[ProducerRecord[K, V]],
sendOffsets: Option[(KafkaByteProducer, Blocking[F]) => F[Unit]]
): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] =
if (records.isEmpty) F.pure(Chunk.empty)
else {

withProducer.exclusiveAccess { (producer, blocking) =>
blocking(producer.beginTransaction())
.bracketCase { _ =>
val produce = records
.traverse(
KafkaProducer
.produceRecord(keySerializer, valueSerializer, producer, blocking)
)
.map(_.sequence)

sendOffsets.fold(produce)(f => produce.flatTap(_ => f(producer, blocking)))
} {
case (_, Outcome.Succeeded(_)) =>
blocking(producer.commitTransaction())
case (_, Outcome.Canceled() | Outcome.Errored(_)) =>
blocking(producer.abortTransaction())
}
}.flatten
}
withProducer.exclusiveAccess { (producer, blocking) =>
blocking(producer.beginTransaction())
.bracketCase { _ =>
val produce = records
.traverse(
KafkaProducer
.produceRecord(keySerializer, valueSerializer, producer, blocking)
)
.map(_.sequence)

sendOffsets.fold(produce)(f => produce.flatTap(_ => f(producer, blocking)))
} {
case (_, Outcome.Succeeded(_)) =>
blocking(producer.commitTransaction())
case (_, Outcome.Canceled() | Outcome.Errored(_)) =>
blocking(producer.abortTransaction())
}
}.flatten

override def metrics: F[Map[MetricName, Metric]] =
withProducer.blocking { _.metrics().asScala.toMap }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package fs2.kafka

import java.util
import java.util.concurrent.atomic.AtomicBoolean

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import cats.syntax.all._
Expand All @@ -12,7 +14,6 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidProducerEpochException
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.EitherValues

import scala.concurrent.duration._

class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
Expand Down Expand Up @@ -141,6 +142,59 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
}
}

it("should be able to commit offset without producing records in a transaction") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
val toPassthrough = "passthrough"
val commitState = new AtomicBoolean(false)
implicit val mk: MkProducer[IO] = new MkProducer[IO] {
def apply[G[_]](settings: ProducerSettings[G, _, _]): IO[KafkaByteProducer] =
IO.delay {
new org.apache.kafka.clients.producer.KafkaProducer[Array[Byte], Array[Byte]](
(settings.properties: Map[String, AnyRef]).asJava,
new ByteArraySerializer,
new ByteArraySerializer
) {
override def sendOffsetsToTransaction(
offsets: util.Map[TopicPartition, OffsetAndMetadata],
consumerGroupId: String
): Unit = {
commitState.set(true)
super.sendOffsetsToTransaction(offsets, consumerGroupId)
}
}
}
}
for {
producer <- TransactionalKafkaProducer.stream(
TransactionalProducerSettings(
s"id-$topic",
producerSettings[IO]
.withRetries(Int.MaxValue)
)
)
offsets = (i: Int) =>
CommittableOffset[IO](
new TopicPartition(topic, i % 3),
new OffsetAndMetadata(i.toLong),
Some("group"),
_ => IO.unit
)

records = TransactionalProducerRecords(
Chunk.seq(0 to 100).map(i => CommittableProducerRecords(Chunk.empty, offsets(i))),
toPassthrough
)

results <- Stream.eval(producer.produce(records))
} yield {
results.passthrough shouldBe toPassthrough
results.records should be(empty)
commitState.get shouldBe true
}
}.compile.lastOrError.unsafeRunSync()
}

private def testMultiple(topic: String, makeOffset: Option[Int => CommittableOffset[IO]]) = {
createCustomTopic(topic, partitions = 3)
val toProduce =
Expand Down

0 comments on commit 266df60

Please sign in to comment.