Skip to content

Commit

Permalink
Merge pull request #901 from fd4s/remove-passthrough
Browse files Browse the repository at this point in the history
Simplify ProducerRecords types by removing `passthrough`
  • Loading branch information
bplommer authored Mar 15, 2022
2 parents 3756374 + 2895d88 commit 8bee895
Show file tree
Hide file tree
Showing 17 changed files with 184 additions and 634 deletions.
16 changes: 7 additions & 9 deletions docs/src/main/mdoc/producers.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ The following settings are specific to the library.

- `withCloseTimeout` controls the timeout when waiting for producer shutdown. Default is 60 seconds.

- `withParallelism` sets the max number of `ProducerRecords` to produce in the same batch when using the `produce` pipe. Default is 100.

- `withCreateProducer` changes how the underlying Java Kafka producer is created. The default merely creates a Java `KafkaProducer` instance using set properties, but this function allows overriding the behaviour for e.g. testing purposes.

## Producer Creation
Expand Down Expand Up @@ -174,7 +172,7 @@ object ProduceExample extends IOApp {
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
ProducerRecords.one(record)
}
.through(KafkaProducer.pipe(producerSettings))

Expand All @@ -185,9 +183,9 @@ object ProduceExample extends IOApp {

In the stream above, we're simply producing the records we receive back to the topic.

The `produce` function creates a `KafkaProducer` and produces records in `ProducerRecords`. Note that `ProducerRecords` support multiple records and a passthrough value, `committable.offset`. Once all records have been produced in the `ProducerRecords`, the passthrough will be emitted.
The `produce` function creates a `KafkaProducer` and produces records in `ProducerRecords`, which is al alias for `fs2.Chunk`. Once all records have been produced in the `ProducerRecords`, the inner effect will complete with a `ProducerResult`, which is an alias for `Chunk[(ProducerRecord[K, V], RecordMetadata)]`.

If we're producing in multiple places in our stream, we can create the `KafkaProducer` ourselves, and pass it to the `produce` function. Every `produce` allow up to `ProducerSettings#parallelism` instances of `ProducerRecords` to be batched together in the same batch.
If we're producing in multiple places in our stream, we can create the `KafkaProducer` ourselves, and pass it to the `pipe` function.

```scala mdoc:silent
object PartitionedProduceExample extends IOApp {
Expand All @@ -204,9 +202,9 @@ object PartitionedProduceExample extends IOApp {
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
ProducerRecords.one(record)
}
.through(KafkaProducer.pipe(producerSettings, producer))
.through(KafkaProducer.pipe(producer))
}
.parJoinUnbounded
}
Expand All @@ -231,7 +229,7 @@ object KafkaProducerProduceExample extends IOApp {
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
ProducerRecords.one(record)
}
.evalMap(producer.produce)
.groupWithin(500, 15.seconds)
Expand Down Expand Up @@ -261,7 +259,7 @@ object KafkaProducerProduceFlattenExample extends IOApp {
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
ProducerRecords.one(record)
}
.evalMap { record =>
producer.produce(record).flatten
Expand Down
15 changes: 10 additions & 5 deletions docs/src/main/mdoc/quick-example.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,17 @@ object Main extends IOApp {
processRecord(committable.record)
.map { case (key, value) =>
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
committable.offset -> ProducerRecords.one(record)
}
}
.through(KafkaProducer.pipe(producerSettings))
.map(_.passthrough)
.through(commitBatchWithin(500, 15.seconds))
}.through { offsetsAndProducerRecords =>
KafkaProducer.stream(producerSettings).flatMap { producer =>
offsetsAndProducerRecords.evalMap {
case (offset, producerRecord) =>
producer.produce(producerRecord)
.map(_.as(offset))
}.parEvalMap(Int.MaxValue)(identity)
}
}.through(commitBatchWithin(500, 15.seconds))

stream.compile.drain.as(ExitCode.Success)
}
Expand Down
1 change: 0 additions & 1 deletion docs/src/main/mdoc/transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ object Main extends IOApp {
}
}
.groupWithin(500, 15.seconds)
.map(TransactionalProducerRecords(_))
.evalMap(producer.produce)
}

Expand Down
74 changes: 27 additions & 47 deletions modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,14 @@ import scala.jdk.CollectionConverters._
import fs2.kafka.producer.MkProducer
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.{Metric, MetricName}
import fs2.Chunk
import cats.Functor

import scala.annotation.nowarn
import scala.concurrent.Promise

/**
* [[KafkaProducer]] represents a producer of Kafka records, with the
* ability to produce `ProducerRecord`s using [[produce]]. Records are
* wrapped in [[ProducerRecords]] which allow an arbitrary value, that
* is a passthrough, to be included in the result. Most often this is
* used for keeping the [[CommittableOffset]]s, in order to commit
* offsets, but any value can be used as passthrough value.
* ability to produce `ProducerRecord`s using [[produce]].
*/
abstract class KafkaProducer[F[_], K, V] {

Expand All @@ -54,9 +49,9 @@ abstract class KafkaProducer[F[_], K, V] {
* have `otherAction` execute after records have been sent,
* but losing the order of produced records.
*/
def produce[P](
records: ProducerRecords[P, K, V]
): F[F[ProducerResult[P, K, V]]]
def produce(
records: ProducerRecords[K, V]
): F[F[ProducerResult[K, V]]]
}

object KafkaProducer {
Expand All @@ -65,47 +60,36 @@ object KafkaProducer {
extends AnyVal {

/**
* Produce a single [[ProducerRecord]] without a passthrough value,
* see [[KafkaProducer.produce]] for general semantics.
* Produce a single [[ProducerRecord]], see [[KafkaProducer.produce]] for general semantics.
*/
def produceOne_(record: ProducerRecord[K, V])(implicit F: Functor[F]): F[F[RecordMetadata]] =
produceOne(record, ()).map(_.map { res =>
res.records.head.get._2 //Should always be present so get is ok
produceOne(record).map(_.map { res =>
res.head.get._2 //Should always be present so get is ok
})

/**
* Produce a single record to the specified topic using the provided key and value
* without a passthrough value, see [[KafkaProducer.produce]] for general semantics.
* Produce a single record to the specified topic using the provided key and value,
* see [[KafkaProducer.produce]] for general semantics.
*/
def produceOne_(topic: String, key: K, value: V)(implicit F: Functor[F]): F[F[RecordMetadata]] =
produceOne_(ProducerRecord(topic, key, value))

/**
* Produces the specified [[ProducerRecords]] without a passthrough value,
* see [[KafkaProducer.produce]] for general semantics.
*/
def produce_(
records: ProducerRecords[_, K, V]
)(implicit F: Functor[F]): F[F[Chunk[(ProducerRecord[K, V], RecordMetadata)]]] =
producer.produce(records).map(_.map(_.records))

/**
* Produce a single record to the specified topic using the provided key and value,
* see [[KafkaProducer.produce]] for general semantics.
*/
def produceOne[P](
def produceOne(
topic: String,
key: K,
value: V,
passthrough: P
): F[F[ProducerResult[P, K, V]]] =
produceOne(ProducerRecord(topic, key, value), passthrough)
value: V
): F[F[ProducerResult[K, V]]] =
produceOne(ProducerRecord(topic, key, value))

/**
* Produce a single [[ProducerRecord]], see [[KafkaProducer.produce]] for general semantics.
*/
def produceOne[P](record: ProducerRecord[K, V], passthrough: P): F[F[ProducerResult[P, K, V]]] =
producer.produce(ProducerRecords.one(record, passthrough))
def produceOne(record: ProducerRecord[K, V]): F[F[ProducerResult[K, V]]] =
producer.produce(ProducerRecords.one(record))

}

Expand Down Expand Up @@ -145,13 +129,13 @@ object KafkaProducer {
valueSerializer: Serializer[F, V]
): KafkaProducer.Metrics[F, K, V] =
new KafkaProducer.Metrics[F, K, V] {
override def produce[P](
records: ProducerRecords[P, K, V]
): F[F[ProducerResult[P, K, V]]] =
override def produce(
records: ProducerRecords[K, V]
): F[F[ProducerResult[K, V]]] =
withProducer { (producer, blocking) =>
records.records
records
.traverse(produceRecord(keySerializer, valueSerializer, producer, blocking))
.map(_.sequence.map(ProducerResult(_, records.passthrough)))
.map(_.sequence)
}

override def metrics: F[Map[MetricName, Metric]] =
Expand Down Expand Up @@ -202,27 +186,23 @@ object KafkaProducer {

/**
* Creates a [[KafkaProducer]] using the provided settings and
* produces record in batches, limiting the number of records
* in the same batch using [[ProducerSettings#parallelism]].
* produces record in batches.
*/
def pipe[F[_], K, V, P](
def pipe[F[_], K, V](
settings: ProducerSettings[F, K, V]
)(
implicit F: Async[F],
mk: MkProducer[F]
): Pipe[F, ProducerRecords[P, K, V], ProducerResult[P, K, V]] =
records => stream(settings).flatMap(pipe(settings, _).apply(records))
): Pipe[F, ProducerRecords[K, V], ProducerResult[K, V]] =
records => stream(settings).flatMap(pipe(_).apply(records))

/**
* Produces records in batches using the provided [[KafkaProducer]].
* The number of records in the same batch is limited using the
* [[ProducerSettings#parallelism]] setting.
*/
def pipe[F[_]: Concurrent, K, V, P](
settings: ProducerSettings[F, K, V],
def pipe[F[_]: Concurrent, K, V](
producer: KafkaProducer[F, K, V]
): Pipe[F, ProducerRecords[P, K, V], ProducerResult[P, K, V]] =
_.evalMap(producer.produce).mapAsync(settings.parallelism)(identity)
): Pipe[F, ProducerRecords[K, V], ProducerResult[K, V]] =
_.evalMap(producer.produce).parEvalMap(Int.MaxValue)(identity)

private[this] def serializeToBytes[F[_], K, V](
keySerializer: Serializer[F, K],
Expand Down
122 changes: 0 additions & 122 deletions modules/core/src/main/scala/fs2/kafka/ProducerRecords.scala

This file was deleted.

Loading

0 comments on commit 8bee895

Please sign in to comment.