Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use IOApp.Simple in docs #908

Merged
merged 1 commit into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 29 additions & 41 deletions docs/src/main/mdoc/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,9 @@ In addition, there are several settings specific to the library.
Once [`ConsumerSettings`][consumersettings] is defined, use `KafkaConsumer.stream` to create a [`KafkaConsumer`][kafkaconsumer] instance.

```scala mdoc:silent
object ConsumerExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream =
KafkaConsumer.stream(consumerSettings)

stream.compile.drain.as(ExitCode.Success)
}
object ConsumerExample extends IOApp.Simple {
val run: IO[Unit] =
KafkaConsumer.stream(consumerSettings).compile.drain
}
```

Expand All @@ -176,14 +172,11 @@ In the example above, we simply create the consumer and then immediately shutdow
We can use `subscribe` with a non-empty collection of topics, or `subscribeTo` for varargs support. There is also an option to `subscribe` using a `Regex` regular expression for the topic names, in case the exact topic names are not known up-front. When allocating a consumer in a `Stream` context, these are available as extension methods directly on the `Stream`.

```scala mdoc:silent
object ConsumerSubscribeExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")

stream.compile.drain.as(ExitCode.Success)
}
object ConsumerSubscribeExample extends IOApp.Simple {
val run: IO[Unit] =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.compile.drain
}
```

Expand All @@ -194,15 +187,12 @@ Note that only automatic partition assignment is supported. Like in the [consume
Once subscribed to at least one topic, we can use `stream` for a `Stream` of [`CommittableConsumerRecord`][committableconsumerrecord]s. Each record contains a deserialized [`ConsumerRecord`][consumerrecord], as well as a [`CommittableOffset`][committableoffset] for managing [offset commits](#offset-commits). Streams guarantee records in topic-partition order, but not ordering across partitions. This is the same ordering guarantee that Kafka provides.

```scala mdoc:silent
object ConsumerStreamExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records

stream.compile.drain.as(ExitCode.Success)
}
object ConsumerStreamExample extends IOApp.Simple {
val run: IO[Unit] =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
.compile.drain
}
```

Expand All @@ -211,8 +201,8 @@ Note that this is an infinite stream, meaning it will only terminate if it's int
When using `stream`, records on all assigned partitions end up in the same `Stream`. Depending on how records are processed, we might want to separate records per topic-partition. This exact functionality is provided by `partitionedStream`.

```scala mdoc:silent
object ConsumerPartitionedStreamExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
object ConsumerPartitionedStreamExample extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: ConsumerRecord[String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

Expand All @@ -228,7 +218,7 @@ object ConsumerPartitionedStreamExample extends IOApp {
}
.parJoinUnbounded

stream.compile.drain.as(ExitCode.Success)
stream.compile.drain
}
}
```
Expand All @@ -252,8 +242,8 @@ Note, that partition streams for revoked partitions will be closed after the new
When processing of records is independent of each other, as is the case with `processRecord` above, it's often easier and more performant to use `stream` and `mapAsync`, as seen in the example below. Generally, it's crucial to ensure there are no data races between processing of any two records.

```scala mdoc:silent
object ConsumerMapAsyncExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
object ConsumerMapAsyncExample extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: ConsumerRecord[String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

Expand All @@ -265,7 +255,7 @@ object ConsumerMapAsyncExample extends IOApp {
processRecord(committable.record)
}

stream.compile.drain.as(ExitCode.Success)
stream.compile.drain
}
}
```
Expand All @@ -279,8 +269,8 @@ Offset commits are usually done in batches for performance reasons. We normally
We should keep the [`CommittableOffset`][committableoffset] in our `Stream` once we've finished processing the record. For at-least-once delivery, it's essential that offset commits preserve topic-partition ordering, so we have to make sure we keep offsets in the same order as we receive them. There are then several functions available for common batch committing scenarios, like `commitBatch`, `commitBatchOption`, and `commitBatchWithin`.

```scala mdoc:silent
object ConsumerCommitBatchExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
object ConsumerCommitBatchExample extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: ConsumerRecord[String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

Expand All @@ -294,7 +284,7 @@ object ConsumerCommitBatchExample extends IOApp {
}
.through(commitBatchWithin(500, 15.seconds))

stream.compile.drain.as(ExitCode.Success)
stream.compile.drain
}
}
```
Expand All @@ -312,8 +302,8 @@ If we're sure we need to commit every offset, we can `commit` individual [`Commi
With the fs2-kafka you could gracefully shutdown a `KafkaConsumer`. Consider this example:

```scala mdoc:silent
object NoGracefulShutdownExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
object NoGracefulShutdownExample extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: CommittableConsumerRecord[IO, String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

Expand All @@ -323,9 +313,7 @@ object NoGracefulShutdownExample extends IOApp {
}.through(commitBatchWithin(100, 15.seconds)).compile.drain
}

KafkaConsumer.resource(consumerSettings).use { consumer =>
run(consumer).as(ExitCode.Success)
}
KafkaConsumer.resource(consumerSettings).use(run)
}
}
```
Expand All @@ -344,8 +332,8 @@ We could combine `stopConsuming` with the custom resource handling and implement
```scala mdoc:silent
import cats.effect.{Deferred, Ref}

object WithGracefulShutdownExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
object WithGracefulShutdownExample extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: CommittableConsumerRecord[IO, String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

Expand Down Expand Up @@ -383,7 +371,7 @@ object WithGracefulShutdownExample extends IOApp {
} yield ()
}).guarantee(closeConsumer) // [15]
}
} yield ExitCode.Success
} yield ()
}
}
```
Expand Down
161 changes: 72 additions & 89 deletions docs/src/main/mdoc/producers.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,9 @@ The following settings are specific to the library.
Once [`ProducerSettings`][producersettings] is defined, use `KafkaProducer.stream` to create a [`KafkaProducer`][kafkaproducer] instance.

```scala mdoc:silent
object ProducerExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream =
KafkaProducer.stream(producerSettings)

stream.compile.drain.as(ExitCode.Success)
}
object ProducerExample extends IOApp.Simple {
val run: IO[Unit] =
KafkaProducer.stream(producerSettings).compile.drain
}
```

Expand All @@ -164,22 +160,19 @@ val consumerSettings =
.withBootstrapServers("localhost:9092")
.withGroupId("group")

object ProduceExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
.map { committable =>
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
}
.through(KafkaProducer.pipe(producerSettings))

stream.compile.drain.as(ExitCode.Success)
}
object ProduceExample extends IOApp.Simple {
val run: IO[Unit] =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
.map { committable =>
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
}
.through(KafkaProducer.pipe(producerSettings))
.compile.drain
}
```

Expand All @@ -190,57 +183,50 @@ The `produce` function creates a `KafkaProducer` and produces records in `Produc
If we're producing in multiple places in our stream, we can create the `KafkaProducer` ourselves, and pass it to the `produce` function. Every `produce` allow up to `ProducerSettings#parallelism` instances of `ProducerRecords` to be batched together in the same batch.

```scala mdoc:silent
object PartitionedProduceExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream =
KafkaProducer.stream(producerSettings)
.flatMap { producer =>
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.partitionedRecords
.map { partition =>
partition
.map { committable =>
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
}
.through(KafkaProducer.pipe(producerSettings, producer))
}
.parJoinUnbounded
}

stream.compile.drain.as(ExitCode.Success)
}
object PartitionedProduceExample extends IOApp.Simple {
val run: IO[Unit] =
KafkaProducer.stream(producerSettings)
.flatMap { producer =>
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.partitionedRecords
.map { partition =>
partition
.map { committable =>
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
}
.through(KafkaProducer.pipe(producerSettings, producer))
}
.parJoinUnbounded
}
.compile.drain
}
```

If we need more control of how records are produced, we can use `KafkaProducer#produce`. The function returns two effects, e.g. `IO[IO[...]]`, where the first effect puts the records in the producer's buffer, and the second effects waits for the records to have been sent.

```scala mdoc:silent
object KafkaProducerProduceExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream =
KafkaProducer.stream(producerSettings)
.flatMap { producer =>
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
.map { committable =>
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
}
.evalMap(producer.produce)
.groupWithin(500, 15.seconds)
.evalMap(_.sequence)
}


stream.compile.drain.as(ExitCode.Success)
}
object KafkaProducerProduceExample extends IOApp.Simple {
val run: IO[Unit] =
KafkaProducer.stream(producerSettings)
.flatMap { producer =>
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
.map { committable =>
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
}
.evalMap(producer.produce)
.groupWithin(500, 15.seconds)
.evalMap(_.sequence)
}
.compile.drain
}
```

Expand All @@ -249,27 +235,24 @@ The example above puts 500 records in the producer's buffer or however many can
Sometimes there is a need to wait for individual `ProducerRecords` to send. In this case, we can `flatten` the result of `produce` to both send the record and wait for the send to complete. Note that this should generally be avoided, as it achieves poor performance.

```scala mdoc:silent
object KafkaProducerProduceFlattenExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream =
KafkaProducer.stream(producerSettings)
.flatMap { producer =>
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
.map { committable =>
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
}
.evalMap { record =>
producer.produce(record).flatten
}
}


stream.compile.drain.as(ExitCode.Success)
object KafkaProducerProduceFlattenExample extends IOApp.Simple {
val run: IO[Unit] = {
KafkaProducer.stream(producerSettings)
.flatMap { producer =>
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
.map { committable =>
val key = committable.record.key
val value = committable.record.value
val record = ProducerRecord("topic", key, value)
ProducerRecords.one(record, committable.offset)
}
.evalMap { record =>
producer.produce(record).flatten
}
}
.compile.drain
}
}
```
Expand Down
8 changes: 4 additions & 4 deletions docs/src/main/mdoc/quick-example.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ Following is an example showing how to:
- use `commitBatchWithin` to commit consumed offsets in batches.

```scala mdoc
import cats.effect.{ExitCode, IO, IOApp}
import cats.effect.{IO, IOApp}
import fs2.kafka._
import scala.concurrent.duration._

object Main extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
object Main extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: ConsumerRecord[String, String]): IO[(String, String)] =
IO.pure(record.key -> record.value)

Expand Down Expand Up @@ -44,7 +44,7 @@ object Main extends IOApp {
.map(_.passthrough)
.through(commitBatchWithin(500, 15.seconds))

stream.compile.drain.as(ExitCode.Success)
stream.compile.drain
}
}
```
Loading