Skip to content

Commit

Permalink
Merge pull request #883 from janstenpickle/no-offsets-transactional-p…
Browse files Browse the repository at this point in the history
…roducer
  • Loading branch information
bplommer authored Mar 7, 2022
2 parents 4826432 + 0999df0 commit 8f9c1e9
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
package fs2.kafka

import cats.effect.syntax.all._
import cats.effect.{Async, Resource, Outcome}
import cats.effect.{Async, Outcome, Resource}
import cats.syntax.all._
import fs2.kafka.internal._
import fs2.kafka.internal.converters.collection._
Expand Down Expand Up @@ -57,6 +57,22 @@ object TransactionalKafkaProducer {
def metrics: F[Map[MetricName, Metric]]
}

/**
* [[TransactionalKafkaProducer.WithoutOffsets]] extends [[TransactionalKafkaProducer.Metrics]]
* to allow producing of records without corresponding upstream offsets.
*/
abstract class WithoutOffsets[F[_], K, V] extends Metrics[F, K, V] {

/**
* Produces the `ProducerRecord`s in the specified [[ProducerRecords]]
* in three steps: first a transaction is initialized, then the records are placed
* in the buffer of the producer, and lastly the transaction is committed. If errors
* or cancellation occurs, the transaction is aborted. The returned effect succeeds
* if the whole transaction completes successfully.
*/
def produceWithoutOffsets[P](records: ProducerRecords[P, K, V]): F[ProducerResult[P, K, V]]
}

/**
* Creates a new [[TransactionalKafkaProducer]] in the `Resource` context,
* using the specified [[TransactionalProducerSettings]]. Note that there
Expand All @@ -72,20 +88,20 @@ object TransactionalKafkaProducer {
)(
implicit F: Async[F],
mk: MkProducer[F]
): Resource[F, TransactionalKafkaProducer.Metrics[F, K, V]] =
): Resource[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] =
(
Resource.eval(settings.producerSettings.keySerializer),
Resource.eval(settings.producerSettings.valueSerializer),
WithProducer(mk, settings)
).mapN { (keySerializer, valueSerializer, withProducer) =>
new TransactionalKafkaProducer.Metrics[F, K, V] {
new TransactionalKafkaProducer.WithoutOffsets[F, K, V] {
override def produce[P](
records: TransactionalProducerRecords[F, P, K, V]
): F[ProducerResult[P, K, V]] =
produceTransaction(records)
produceTransactionWithOffsets(records)
.map(ProducerResult(_, records.passthrough))

private[this] def produceTransaction[P](
private[this] def produceTransactionWithOffsets[P](
records: TransactionalProducerRecords[F, P, K, V]
): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] =
if (records.records.isEmpty) F.pure(Chunk.empty)
Expand All @@ -99,34 +115,50 @@ object TransactionalKafkaProducer {
else F.pure(batch.consumerGroupIds.head)

consumerGroupId.flatMap { groupId =>
withProducer { (producer, blocking) =>
blocking(producer.beginTransaction())
.bracketCase { _ =>
records.records
.flatMap(_.records)
.traverse(
KafkaProducer
.produceRecord(keySerializer, valueSerializer, producer, blocking)
)
.map(_.sequence)
.flatTap { _ =>
blocking {
producer.sendOffsetsToTransaction(
batch.offsets.asJava,
groupId
)
}
}
} {
case (_, Outcome.Succeeded(_)) =>
blocking(producer.commitTransaction())
case (_, Outcome.Canceled() | Outcome.Errored(_)) =>
blocking(producer.abortTransaction())
}
}.flatten
val sendOffsets: (KafkaByteProducer, Blocking[F]) => F[Unit] = (producer, blocking) =>
blocking {
producer.sendOffsetsToTransaction(
batch.offsets.asJava,
groupId
)
}

produceTransaction(records.records.flatMap(_.records), Some(sendOffsets))
}
}

override def produceWithoutOffsets[P](
records: ProducerRecords[P, K, V]
): F[ProducerResult[P, K, V]] =
produceTransaction(records.records, None).map(ProducerResult(_, records.passthrough))

private[this] def produceTransaction[P](
records: Chunk[ProducerRecord[K, V]],
sendOffsets: Option[(KafkaByteProducer, Blocking[F]) => F[Unit]]
): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] =
if (records.isEmpty) F.pure(Chunk.empty)
else {

withProducer { (producer, blocking) =>
blocking(producer.beginTransaction())
.bracketCase { _ =>
val produce = records
.traverse(
KafkaProducer
.produceRecord(keySerializer, valueSerializer, producer, blocking)
)
.map(_.sequence)

sendOffsets.fold(produce)(f => produce.flatTap(_ => f(producer, blocking)))
} {
case (_, Outcome.Succeeded(_)) =>
blocking(producer.commitTransaction())
case (_, Outcome.Canceled() | Outcome.Errored(_)) =>
blocking(producer.abortTransaction())
}
}.flatten
}

override def metrics: F[Map[MetricName, Metric]] =
withProducer.blocking { _.metrics().asScala.toMap }

Expand All @@ -150,7 +182,7 @@ object TransactionalKafkaProducer {
)(
implicit F: Async[F],
mk: MkProducer[F]
): Stream[F, TransactionalKafkaProducer.Metrics[F, K, V]] =
): Stream[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] =
Stream.resource(resource(settings)(F, mk))

def apply[F[_]]: TransactionalProducerPartiallyApplied[F] =
Expand All @@ -172,7 +204,7 @@ object TransactionalKafkaProducer {
def resource[K, V](settings: TransactionalProducerSettings[F, K, V])(
implicit F: Async[F],
mk: MkProducer[F]
): Resource[F, TransactionalKafkaProducer.Metrics[F, K, V]] =
): Resource[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] =
TransactionalKafkaProducer.resource(settings)(F, mk)

/**
Expand All @@ -188,7 +220,7 @@ object TransactionalKafkaProducer {
def stream[K, V](settings: TransactionalProducerSettings[F, K, V])(
implicit F: Async[F],
mk: MkProducer[F]
): Stream[F, TransactionalKafkaProducer.Metrics[F, K, V]] =
): Stream[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] =
TransactionalKafkaProducer.stream(settings)(F, mk)

override def toString: String =
Expand Down
Loading

0 comments on commit 8f9c1e9

Please sign in to comment.