Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for using CooperativeStickyAssignor with KafkaConsumer #844

Merged
merged 3 commits into from
Feb 20, 2022

Conversation

biochimia
Copy link
Contributor

KafkaConsumer.partitionsMapStream would drop all partition streams
whenever a partition revocation was advertised, but would only create
streams for newly assigned partitions. This worked fine with the default
assignor, which revokes all existing assignments before new assignments
are issued during a rebalance operation.

The CooperativeStickyAssignor, attempts to keep assignments stable,
performing rebalance operations in two steps: on a first pass a minimal
set of assignments are revoked, these are then redistributed to balance
the consumer cluster.

In order to support this assignor, the consumer needs to keep track of
the current set of assignments and, in the case of partitionsMapStream,
recreate partition streams for partitions that were kept. While the
underlying org.apache.kafka.clients.consumer.KafkaConsumer does this for
us, this follows existing practice in KafkaConsumer.assignmentStream to
keep track of the currently assigned set.

Going forward, it would make sense to implement partitionedStream
independently of partitionsMapStream, so as to avoid recreating streams
for sticky partitions, during a rebalance operation.

New tests were added to validate that assignmentStream and
partitionsMapStream work with the CooperativeStickyAssignor. The former
was working out of the box, as the stream kept track of the assigned set
on its own. For the latter use case, a timeout was added, as revoking
assigned partitions would make the test hang indefinitely.

KafkaConsumer.partitionsMapStream would drop all partition streams
whenever a partition revocation was advertised, but would only create
streams for newly assigned partitions. This worked fine with the default
assignor, which revokes all existing assignments before new assignments
are issued during a rebalance operation.

The CooperativeStickyAssignor, attempts to keep assignments stable,
performing rebalance operations in two steps: on a first pass a minimal
set of assignments are revoked, these are then redistributed to balance
the consumer cluster.

In order to support this assignor, the consumer needs to keep track of
the current set of assignments and, in the case of partitionsMapStream,
recreate partition streams for partitions that were kept. While the
underlying org.apache.kafka.clients.consumer.KafkaConsumer does this for
us, this follows existing practice in KafkaConsumer.assignmentStream to
keep track of the currently assigned set.

Going forward, it would make sense to implement partitionedStream
independently of partitionsMapStream, so as to avoid recreating streams
for sticky partitions, during a rebalance operation.

New tests were added to validate that assignmentStream and
partitionsMapStream work with the CooperativeStickyAssignor. The former
was working out of the box, as the stream kept track of the assigned set
on its own. For the latter use case, a timeout was added, as revoking
assigned partitions would make the test hang indefinitely.
@LMnet
Copy link
Member

LMnet commented Jan 27, 2022

Hi @biochimia! Thank you for this contribution, it's definitely a useful addition.

Implementation and tests look good to me. But could you update scaladoc for methods in the KafkaConsume trait? It has some mentions about rebalance behavior and it reflects only default behavior.

@LMnet LMnet requested review from bplommer and LMnet January 27, 2022 03:12
@bplommer
Copy link
Member

I'll try to look at this at the weekend but also happy for this be to merged based on an approval from @LMnet

@sideeffffect
Copy link

sideeffffect commented Jan 27, 2022

Hello guys, would it be possible to have this back-ported to 1.x series? Thank you so very much 🙇

@biochimia
Copy link
Contributor Author

Hi @biochimia! Thank you for this contribution, it's definitely a useful addition.

Implementation and tests look good to me. But could you update scaladoc for methods in the KafkaConsume trait? It has some mentions about rebalance behavior and it reflects only default behavior.

Thanks for giving this a look. I updated the documentation a bit, almost minimally, but feedback is welcome.

What are your thoughts on implementing partitionedStream independently of partitionsMapStream, to avoid closing/recreating streams for partitions that are not revoked? (I could take a stab at this in a separate PR.)

@@ -540,6 +541,91 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
}
}

it("should handle rebalance with CooperativeStickyAssignor") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a copy of the "should handle rebalance" test, adapted to use the custom assignor, along with the resulting assert.

Comment on lines 314 to +315
val revokedFetches = revoked intersect fetches
val revokedNonFetches = revoked diff fetches
val revokedNonFetches = revoked diff revokedFetches
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would appreciate a comment on this change in particular.

The change makes sense to me, logically: revokedFetches and revokedNonFetches add up to revoked. But I did not dig enough into the implications to be sure this is the intention. For instance: in which combination of pending records and pending fetches it will make a difference, and what is the consequence?

I thought I had left this out of the PR, but while it's here we might as well discuss it.

@@ -707,6 +793,65 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
}
}

it("should stream assignment updates to listeners when using CooperativeStickyAssignor") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a copy of the existing "should stream assignment updates to listeners" test, adapted to use the custom assignor, along with the resulting asserts.

Comment on lines -701 to +787
consumer2Updates.length == 2
consumer2Updates.length == 2 &&
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this affected scalatest's ability to produce pretty failure messages.

@@ -516,6 +516,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
}
}
.takeWhile(_.size < 200)
.timeout(20.seconds)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout added to match test with custom assignor. This test was working without it, while the one for the custom assignor would hang indefinitely.

@LMnet
Copy link
Member

LMnet commented Jan 28, 2022

@biochimia could you please clarify for me one moment: what will a user receive after rebalancing with the CooperativeStickyAssignor in the partitionsMapStream method:

  1. Streams only for the newly created partitions?
  2. Or streams for all partitions?

If the answer is 1, then it means that the current returning type of partitionsMapStream is not perfectly fit a reality. The main motivation behind partitionsMapStream was to provide full information about rebalance to a library user. And now it looks like Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]] type can't fully describe each assignment. It looks like we need something like this (ignore naming for now):

case class FullAssigmnentInfo[F[_], K, V](
  newlyAssignedPartitions: Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]],
  revokedPartitions: SortedSet[TopicPartition],
  retainedPartitions: SortedSet[TopicPartition],
)

If the answer is 2, we have 2 variants to deal with the streams for the retained partitions:

  1. They will be recreated. For me, it looks like unnecessary work. And we will lose all benefits of the CooperativeStickyAssignor.
  2. They will not be recreated and will be passed to the Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]] along with the streams for created streams for newly assigned partitions. In this case, there is no way for a user to understand, which streams should be started, and which should be ignored (because they were already started earlier).

I believe that to retain backward compatibility we should pass to the partitionsMapStream streams only for the newly created partitions and not recreated streams for the retained partitions. And it looks like we need one more method like this (ignore naming one more time):

def streamDetailed: Stream[F, FullAssigmnentInfo[F, K, V]]

@biochimia
Copy link
Contributor Author

@biochimia could you please clarify for me one moment: what will a user receive after rebalancing with the CooperativeStickyAssignor in the partitionsMapStream method:

  1. Streams only for the newly created partitions?
  2. Or streams for all partitions?

As it stands, they'll receive 2. new streams for all currently assigned partitions.

If the answer is 1, then it means that the current returning type of partitionsMapStream is not perfectly fit a reality. The main motivation behind partitionsMapStream was to provide full information about rebalance to a library user. And now it looks like Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]] type can't fully describe each assignment. It looks like we need something like this (ignore naming for now):

case class FullAssigmnentInfo[F[_], K, V](
  newlyAssignedPartitions: Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]],
  revokedPartitions: SortedSet[TopicPartition],
  retainedPartitions: SortedSet[TopicPartition],
)

If the answer is 2, we have 2 variants to deal with the streams for the retained partitions:

  1. They will be recreated. For me, it looks like unnecessary work. And we will lose all benefits of the CooperativeStickyAssignor.
  2. They will not be recreated and will be passed to the Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]] along with the streams for created streams for newly assigned partitions. In this case, there is no way for a user to understand, which streams should be started, and which should be ignored (because they were already started earlier).

I believe that to retain backward compatibility we should pass to the partitionsMapStream streams only for the newly created partitions and not recreated streams for the retained partitions. And it looks like we need one more method like this (ignore naming one more time):

def streamDetailed: Stream[F, FullAssigmnentInfo[F, K, V]]

As you mention, one motivation behind partitionsMapStream is to provide full assignment information to the users. The current implementation retains this behaviour: every new element in the stream includes the current assignment, and a new stream for each assigned partition.

While this is not optimal for the CooperativeStickyAssignor use case, it will be the least surprising behaviour for library users. They'll be able to trust that the returned map includes the full list of assignments, and they'll be able to use .parJoinUnbounded to parallelise processing of all streams, without worrying about getting duplicate streams.

Finally, these changes have minimal impact on the default use case, while making the CooperativeStickyAssignor use case work at all to begin with. Currently, using the custom assignor leads to lost partition streams on rebalance.

Without breaking user expectations for the interface, I don't think that we can make partitionsMapStream more optimal for the CooperativeStickyAssignor use case.

My proposal here is to add an implementation of partitionedStream that does not rely on partitionsMapStream. partitionedStream could be made to emit new streams only for newly assigned partitions, without closing/recreating streams. I think this would be optimal when using CooperativeStickyAssignor.

@bplommer
Copy link
Member

bplommer commented Jan 29, 2022

Hello guys, would it be possible to have this back-ported to 1.x series? Thank you so very much 🙇

The current policy seems to be that we accept pull requests against 1.x but don't actively seek to maintain feature parity with 2.x. Would this be easy to retarget for 1.x @biochimia?

In the longer term I think we should decide to either maintain feature parity in 1.x or stop adding any new features - any thoughts on this @LMnet @vlovgr? What do the Maven download figures for 1.x vs 2.x look like?

@biochimia
Copy link
Contributor Author

Would this be easy to retarget for 1.x?

Looking at the files I touched (implementation and test) and the diff between the series/1.x and series/2.x branches, it looks like it should be relatively straightforward to backport the patch.

@LMnet
Copy link
Member

LMnet commented Jan 31, 2022

@biochimia after your last comment I decided to test your branch locally. And it looks like even with the CooperativeStickyAssignor after each rebalance all old partition streams are closed, and for each partition (even for retained partitions) new streams are started. Is this a behavior you want to achieve?

I think we should implement support for CooperativeStickyAssignor differently: we should not close partitions streams for retained partitions. partitionsMapStream should receive only newly assigned partitions and they streams. For the default assignor, nothing will be changed. But for the CooperativeStickyAssignor it means that old streams will continue to work without recreation.

This change means that an old convention for the partitionsMapStream method will be violated: it will not receive a full assignment on each rebalance in all cases. But I think this is fixable only through scaladoc change. Moreover, for the default use-case, as I said before, nothing will be changed. And we could reflect this in the documentation. And nothing will be broken in terms of binary or source compatibility. So this is a safe change I believe.

This decision also means that we will not have a method with the full assignment info after each rebalance. But I already proposed a solution for this: we need one more method with the FullAssigmnentInfo. It could be done in a separate pull request, I don't ask to implement it in the current branch.

My proposal here is to add an implementation of partitionedStream that does not rely on partitionsMapStream. partitionedStream could be made to emit new streams only for newly assigned partitions, without closing/recreating streams. I think this would be optimal when using CooperativeStickyAssignor.

I don't think that this is a good idea. The current behavior of all stream methods is consistent: they all work identically. But they have a different amount of details. And that's all. I think this is an excellent thing in terms of API predictability: it doesn't matter which stream method you are using. They all have the same semantic. If different stream methods will behave differently it could be surprised for users. It may become one of these tricky minor things you have to remember to use a library.

@bplommer @vlovgr I need your opinions on this design decision because we have multiple options here.

About 1.x backport: I have nothing against it if someone will volunteer this backport. But overall the current strategy about 1.x is that maintainers don't try to maintain feature parity.

@vlovgr
Copy link
Contributor

vlovgr commented Jan 31, 2022

@LMnet I agree with everything you said. 👍

@biochimia
Copy link
Contributor Author

@biochimia after your last comment I decided to test your branch locally.

Thank you for taking the time to give this a test run.

And it looks like even with the CooperativeStickyAssignor after each rebalance all old partition streams are closed, and for each partition (even for retained partitions) new streams are started. Is this a behavior you want to achieve?

Yes, this was the intended behaviour for my changes.

I think we should implement support for CooperativeStickyAssignor differently: we should not close partitions streams for retained partitions. partitionsMapStream should receive only newly assigned partitions and they streams. For the default assignor, nothing will be changed. But for the CooperativeStickyAssignor it means that old streams will continue to work without recreation.

This change means that an old convention for the partitionsMapStream method will be violated: it will not receive a full assignment on each rebalance in all cases. But I think this is fixable only through scaladoc change. Moreover, for the default use-case, as I said before, nothing will be changed. And we could reflect this in the documentation. And nothing will be broken in terms of binary or source compatibility. So this is a safe change I believe.

My concerns with doing this are as follows (as noted, these concerns only affect the users of CooperativeStickyAssignor, there are no changes to the default use case):

  1. The assignment map no longer includes a full assignment.
  2. Consumers of partitionsMapStream will not be able to distinguish an assignment revocation from a partition assignment that is kept without looking at whether the individual streams have finished.
  3. Consumers of partitionsMapStream will not be able to rely on the behaviour that each element in partitionsMapStream, itself a map of streams, is fully processed and all streams closed before the next element is emitted. I don't know if this can lead to surprising behaviour in common code patterns.

This decision also means that we will not have a method with the full assignment info after each rebalance. But I already proposed a solution for this: we need one more method with the FullAssigmnentInfo. It could be done in a separate pull request, I don't ask to implement it in the current branch.

My proposal here is to add an implementation of partitionedStream that does not rely on partitionsMapStream. partitionedStream could be made to emit new streams only for newly assigned partitions, without closing/recreating streams. I think this would be optimal when using CooperativeStickyAssignor.

I don't think that this is a good idea. The current behavior of all stream methods is consistent: they all work identically. But they have a different amount of details. And that's all. I think this is an excellent thing in terms of API predictability: it doesn't matter which stream method you are using. They all have the same semantic. If different stream methods will behave differently it could be surprised for users. It may become one of these tricky minor things you have to remember to use a library.

@bplommer @vlovgr I need your opinions on this design decision because we have multiple options here.

As you note, this is a design choice for the library. I bring forward my concerns so they can be discussed (and even invalidated). If there's agreement that the best path forward is to not recreate streams at the cost of having an incomplete assignment map in partitionsMapStream, I'm happy to update my changes.

About impact on current use cases using the default assignor, my only concern was about an implementation detail: currently we use a single prevAssignmentFinisherRef, we'll need to switch to independent finishers per partition. I mention this for completeness, but I'm happy for the discussion to focus on the design choices first, and come back to the implementation later.

@LMnet
Copy link
Member

LMnet commented Feb 1, 2022

@biochimia, all your concerns are correct. It is a trade-off, of course.

As you note, this is a design choice for the library. I bring forward my concerns so they can be discussed (and even invalidated). If there's agreement that the best path forward is to not recreate streams at the cost of having an incomplete assignment map in partitionsMapStream, I'm happy to update my changes.

I think you can go on and update your changes. I'm pretty sure @bplommer will not be against it.

About impact on current use cases using the default assignor, my only concern was about an implementation detail: currently we use a single prevAssignmentFinisherRef, we'll need to switch to independent finishers per partition. I mention this for completeness, but I'm happy for the discussion to focus on the design choices first, and come back to the implementation later.

I think you could create a single Ref with all finalizers. This will not have too much performance impact.

@biochimia
Copy link
Contributor Author

I've updated the PR to reflect what we've discussed. Namely, partitionsMapStream now keeps a per-partition finisher, and will only close streams for revoked partitions. Assignment maps emitted by partitionsMapStream will only include newly assigned partitions and their streams.

Copy link
Member

@LMnet LMnet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job, @biochimia! Thanks for your contribution!

The purpose of this change is to allow partition streams returned by
KafkaConsumer.partitionsMapStream to be individually completed, without
having to close the full assignment at once as happened before.

When using the default assignor, there should be no visible change in
behaviour, as partition assignments are revoked in full on a rebalance,
and new assignments sent out to consumers.

When using the CooperativeStickyAssignor, however, partition assignments
may be individually revoked without affecting other assignments. In this
case, partition streams for non-revoked partitions will remain active
until the point they get revoked.

Compared to behaviour observed with the default assignor, when using
partitionsMapStream with CooperativeStickyAssignor the following will be
observed:

- maps in partitionsMapStream will include only entries for newly
  assigned partitions; When partition assignments are revoked, an empty
  map will be emitted.
- partition streams for non-revoked assignments will remain active after
  a new (partial) assignment map is emitted.
@biochimia biochimia force-pushed the cooperative-assignor branch from 7fba4c9 to c04aba2 Compare February 7, 2022 08:48
@biochimia
Copy link
Contributor Author

I re-pushed my changes (updated hash, only), to trigger the CI workflow. There was an issue affecting GitHub actions before that meant some triggers did not fire.

@sideeffffect
Copy link

This PR's CI needs trigger from a maintainer 🙏

@LMnet
Copy link
Member

LMnet commented Feb 9, 2022

@biochimia I tried to run CI multiple times and it fails on the fs2.kafka.KafkaConsumerSpec#stream should consume all records with subscribing for several consumers. Could you check it, please?

@biochimia
Copy link
Contributor Author

biochimia commented Feb 9, 2022

@biochimia I tried to run CI multiple times and it fails on the fs2.kafka.KafkaConsumerSpec#stream should consume all records with subscribing for several consumers. Could you check it, please?

I can give this a look, but it seems that this test is flaky and fails on a race condition:

Edit: the unrelated PR: #859

@LMnet
Copy link
Member

LMnet commented Feb 9, 2022

@biochimia ok, got it.

@bplommer can you take a look at this pr? Or maybe we could merge it and fix all stuff by ourselves.

@biochimia
Copy link
Contributor Author

I had a look at the failing test, and was able to reproduce the failure on top of vanilla series/2.x with a loop in the test and some added logging to see what's going on.

When the test fails, you see the first consumer being assigned all partitions and fetching the records before the second consumer is able to join. When the second consumer joins, and assignment changes, the underlying consumer resets offsets back to 0, and records are consumed again, thus failing the test with repeated records.

The race condition then seems to be about the first consumer being able to fetch and process all records before a rebalance is triggered by the second consumer joining.

@sideeffffect
Copy link

Awesome news that this got merged! 🎉 @biochimia how much work do you think it would take to backport this to series/1.x (compatible with Cats Effect 2)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

5 participants