-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAYAK-3391 RecordStream.processingAndCommitting #846
Conversation
val offsets = c.foldLeft(Map.empty[TopicPartition, Long])(_ ++ _._1) | ||
val nextOffsets = | ||
offsets.view.mapValues(o => new OffsetAndMetadata(o + 1)).toMap | ||
println( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this println
be removed, or replaced with proper logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely removed 😄 this is some experimental code to try out Stream.groupWithin
. Will clean up before merge.
maxRecordCount: Long = 1000L, | ||
maxElapsedTime: FiniteDuration = 60.seconds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think default args in an anonymous implementation will do anything but maybe get out of sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah these default args everywhere are a bit weird. Removed from the 2 impls in 43345d5.
Local testing in 2 real services went well, so going to merge this, cut a release, and continue testing. Can definitely make any needed followup changes in future PRs. |
KAYAK-3391 Refactors
ConsumerOps.processingAndCommitting
to be very generic. Also adds a batched implementation.Adds
processingAndCommitting
to theRecordStream
abstraction, and implements it for both chunked (i.e. one-at-a-time) and batched record streams.Note that batched
processingAndCommitting
simply processes the entire batch, and if that succeeds then it updates the state to include that successfully processed batch's offsets. Only offsets for records in successfully processed batches will be committed. It does not try to do fancy things like handle partially succeeding (and failing) batches, or try to update state with individual records' offsets as they are processed. Note that if a batch fails processing, none of the offsets in that batch will be committed. This will lead to more reprocessing after startup, but hey this is at-least-once and your processing is idempotent, right?