Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunk-based consumer API #1281

Merged
merged 19 commits into from
Feb 16, 2024
Merged

Add chunk-based consumer API #1281

merged 19 commits into from
Feb 16, 2024

Conversation

L7R7
Copy link
Contributor

@L7R7 L7R7 commented Nov 30, 2023

This adds an API to make the pattern of chunk-based consumers a first-class concept. The idea keeps popping up in the typelevel discord, and we've been successfully using it at $WORK for a while now, so it makes sense to add it.

General idea

The pattern aims at helping users to implement consumers without auto-commit to write their code without having to do too much work in order to achieve correct offset committing (no offsets must be lost, offsets should be committed only after messages have been processed, etc).
It achieves that by switching from processing messages in a Stream[F, CommittableConsumerRecord[F, K, V]] to processing them chunks: Chunk[ConsumerRecord[K, V]] => F[Unit]. After each chunk, the offsets of the chunk are committed.

This has a couple of advantages (summarizing what Fabio said on the Discord) :

  • Consumers can write their business logic without having to care about offsets
  • batching several messages is very straightforward, which is often practical (e.g. batch writes to the db)
  • concurrency and filtering can be done liberally without having to worry about incorrect commits

open questions

  • Do we want to mirror all methods from the KafkaConsume trait in the new trait? I only added one of them so far to show the concept, the rest could be implemented in the same fashion.
  • Instead of Unit, the processor for the chunks could use a return type of F[CommitNow], where CommitNow is an intention revealing equivalent of Unit that also makes it more clear that afterwards no processing should be done on the records. Do we want that?
  • Instead of only processing single chunks, the library could also offer (additional?) ways that take care of things like:
    • deduplicating messages in a chunk by key (I feel this may be too much for the library and might be confusing for some consumers, and it's also relatively easy to implement consumer side)
    • allowing to process messages concurrently, using concurrency per key (this is a bit more tricky to implement, so it might be a good idea to provide it in an opt-in way for those consumers who want it but don't want to implement it themselves). If we decide to allow both the "simple" chunk-based approach as well as the "concurrent-by-key" approach, I'd be happy to add that after this PR is done.

todos

There's still a lot of work to do, but I wanted to get some early feedback on the general concept before doing the busy work.

  • testing
  • docs
  • add the missing methods to make the API complete and consistent to the rest

I'm looking forward to receiving some feedbacks and thoughts on this idea!

@L7R7
Copy link
Contributor Author

L7R7 commented Dec 5, 2023

After adding the missing methods smiliar to the ones from the KafkaConsume trait, I'm wondering if it makes sense to simply "mirror" the signatures of the underlying methods. streamChunk is obvious, but with partitionedStreamChunk and maybe even partitionsMapStreamChunk, I'm not sure if we should expose the different streams to the user because I don't know what a good scenario would look like in which the user may want to do something with the streams after each chunk is processed and commited.
At the moment, I think it's a tradeoff between symmetry to KafkaConsume and more convenience for the user.

EDIT:

What the library could also do is something like the following:

def consumeTopic[K, V](consumer: KafkaConsumer[IO, K, V], processor: Chunk[ConsumerRecord[K, V]] => IO[Unit]): IO[Unit] =
    consumer.partitionedStream
      .map(
        _.chunks
          .evalMap(consumeChunk(processor))
      )
      .parJoinUnbounded
      .compile
      .drain

(Generalized over F[_] instead of IO, and directly on the consumer instead of taking one as an argument, I just took that from the code we use at $WORK). SInce all the client could do afterwards is .parJoinUnbounded.compile.drain, the library could already do that

@aartigao
Copy link
Contributor

aartigao commented Dec 9, 2023

Nice work! Let's start the discussion 🧵

This addition looks good, because it leverages the logic to the library, but it also means we're being more strict on the patterns we support. For example, our $WORK use case doesn't involve committing offsets, but we're working with Chunk for performance reasons. Does that mean that the lib should support that too? Or this is a user-land decision? 🤷🏽

So far, in my experience, working with Stream directly gives you the building blocks for crafting more complex patterns, like the one you expose.

The discussion this PR brings, philosophically, is somewhat like: do we think there's a canonical pattern for Kafka consumption around the Chunk[Record] => F[Unit] pattern?

And to be honest I don't know. I've mixed feelings, on one hand, by looking at the code, it's so simple that adding it to the library doesn't disrupt the rest of the API, but on the other hand, if it's that simple, why not having in in user-land? Otherwise looks to me that we maybe want this library to become more like a framework? Because we are loosing (not actually loosing, because the current API remains untouched, but you get the idea) the Stream foundations in favor of a more constrained API.

What I agree on is to properly document this use case, so the question doesn't pop up again in Discord.

I'd love to see what the original designers think about it 👀

@SystemFw
Copy link
Member

SystemFw commented Dec 10, 2023

My 2c:
I think it's worth adding, because it's close to universally useful when working with Kafka for OLTP rather than OLAP use cases. In this case, what we're adding is not the small amount of simple code, but the expertise that this is how you can achieve performance, flexibility and correctness.
In my extensive experience working with teams using fs2-kafka, that expertise is lacking and at best you get correct code which leaves a lot of performance on the table, and at worst you get incorrect code. The questions in Discord show a similar pattern.

However, what I would not do is add the whole set of variations mirroring the full api, I'd really just add the one method, otherwise we're back to the issue of people having to navigate which one to use. If you know enough that you need a variation, you can use the low-level Stream api directly.

Asides:

  • in the one method we do add, the return type should be F[Nothing], rather than Stream.
  • you can avoid traversing the Chunk twice with map, by foldMapping it instead to a tuple of offsets and records

@L7R7
Copy link
Contributor Author

L7R7 commented Dec 11, 2023

Thank you for your comments, really appreciate it.

I think it's a good idea to only add one, biased, method to give the users an obvious way of using the patterns that works well for most cases. I'll proceed in that direction and will also start to add some docs for it. These docs will also mention that this is the first place where the library automatically commits offsets, but not using auto-commit (@aartigao thank you for mentioning the auto-commit use cases, I didn't think about that initially).

@SystemFw regarding your points to the implementation details:

  • traversing only once over the Chunk totally makes sense, I couldn't find a way to use foldMap because there's no monoid instance for CommittableOffsetBatch, but it works fine with a normal foldLeft
  • I don't quite understand yet how I turn a Stream[Unit] into a Stream[Nothing], or a F[Unit] into a F[Nothing]. I think you chose Nothing to indicate that this action doesn't emit anything, and on Stream that is a good way to indicate that the stream won't emit any elements (since a Stream[Unit] might emit several elements), but will just finish at some point. I'm not sure if that applies to F[Unit] vs F[Nothing] as well. I came up with .flatMap(_ => Stream.exec(F.unit)).compile.onlyOrError, but I'm not sure if that's what you intended. Am I missing something?

@SystemFw
Copy link
Member

It should be an F[Nothing] (so, not a Stream, nor an IO of something else), because it represents a process that doesn't terminate, doesn't emit any results, and doesn't need to be composed as a Stream.
Just compile.drain >> F.never

@L7R7
Copy link
Contributor Author

L7R7 commented Dec 11, 2023

Oh, I see. But that relies on the assumption that the previous stream never finishes, right? If the stream would terminate without error, the whole action wouldn't terminate because of the never

@L7R7
Copy link
Contributor Author

L7R7 commented Dec 12, 2023

Should we use consumeChunk in the quick example in the docs to nudge people in the "right direction" right from the beginning?
I changed it to do so, and I think the result is quite nice, but i'm happy to hear your thoughts.

@L7R7 L7R7 marked this pull request as ready for review December 14, 2023 10:12
@L7R7
Copy link
Contributor Author

L7R7 commented Dec 14, 2023

I'd say this is ready to be reviewed, the only questions I have is the one above regarding the F.never, and whether I should squash the commits into one single commit (I think it should be squashed, I left it unsquashed because it might be easier to review then)

@aartigao aartigao self-requested a review February 15, 2024 08:19
Copy link
Contributor

@aartigao aartigao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding docs, without these suggestions, docs / run is failing

docs/src/main/mdoc/quick-example.md Outdated Show resolved Hide resolved
docs/src/main/mdoc/consumers.md Show resolved Hide resolved
docs/src/main/mdoc/quick-example.md Show resolved Hide resolved
L7R7 and others added 4 commits February 15, 2024 16:43
Co-authored-by: Alan Artigao Carreño <alanartigao@gmail.com>
Co-authored-by: Alan Artigao Carreño <alanartigao@gmail.com>
Co-authored-by: Alan Artigao Carreño <alanartigao@gmail.com>
Copy link
Contributor

@aartigao aartigao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm super grateful of this type of PR:

  1. Small # of files
  2. Single feature
  3. Well documented

Bravo @L7R7 !

Enough time has passed since the creation and I'll be merging it soon.

Congrats to everyone involved!

@L7R7
Copy link
Contributor Author

L7R7 commented Feb 16, 2024

@aartigao thank you for the review and thank you for helping me get through the last details yesterday.
I enjoyed working on that PR 😃

@aartigao aartigao merged commit b4dd5a4 into fd4s:series/3.x Feb 16, 2024
8 checks passed
@L7R7 L7R7 deleted the consume-chunk branch February 16, 2024 10:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

5 participants