Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance doesn't work reliably fix #533

Merged
merged 1 commit into from
Feb 19, 2021
Merged

Conversation

LMnet
Copy link
Member

@LMnet LMnet commented Feb 16, 2021

This is a fix for #532

Some words about expected behavior. fs2 is a pull-based library: stream will not push the new step until the previous step is completed. So, if we have a flatMap on a stream, the next step will be executed only when the current inner stream completes. Projecting this onto our situation, if we have a code like this:

consumer.partitionsMapStream.flatMap { assignment =>
  val innerStream = handle(assignment)
  innerStream
} 

the next assignment will be received only when the innerStream finished. If innerStream is based on partition streams from the assignment, it will automatically work: on rebalance, old streams are finished, and innerStream is finished too. And this is the expected behavior.

But sometimes not all old partition streams finished. And that's why the rebalance doesn't work. I added a test case for this, which is failed on the current stable branch.

Actually, I didn't found a root cause for this issue. Mostly because the current solution for the finishing old partition streams is pretty complex, and also distributed across multiple files (KafkaConsumer and KafkaConsumerActor). I decided to try to change and simplify old partition streams completion logic. And it worked.

In my implementation I don't try to track all partition streams with partitionStreamId. I just interrupt old partition streams on rebalance. Actually, this is a lot closer to the kafka ideology: when the partition revoked, I interrupt old streams. When the partition assigned, I create new streams for each partition. It turns out that having Ref[F, Deferred[F, Unit]] is enough for this. On each rebalance I created a new Deferred and saved it in Ref, and at the same time, I completed the old Deferred. Each partition stream in a current assignment subscribed on the Deferred from the current assignment, and when it completes, streams interrupts. All logic is in the one method partitionsMapStream.

Because of a new logic for closing old partition streams functionality with tracking partitionStreamId is redundant now. So, I deleted it. I think it removes a bit of the inner complexity of a library.

@bplommer
Copy link
Member

Thanks for this - I'll take a look later today.

@LMnet
Copy link
Member Author

LMnet commented Feb 16, 2021

It's still work in progress, but the root cause is fixed

@LMnet LMnet force-pushed the 532-rebalance branch 3 times, most recently from ce4e00b to c6c0ee0 Compare February 17, 2021 03:48
@LMnet LMnet changed the title WIP: Rebalance doesn't work reliably fix Rebalance doesn't work reliably fix Feb 17, 2021
@LMnet
Copy link
Member Author

LMnet commented Feb 17, 2021

@vlovgr @bplommer it's ready for the review

@bplommer
Copy link
Member

This looks good as far as I can tell but I'd like to get @vlovgr's thoughts too.

@bplommer bplommer merged commit e82bfe6 into fd4s:series/1.x Feb 19, 2021
@LMnet LMnet deleted the 532-rebalance branch February 24, 2021 06:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants