Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for using CooperativeStickyAssignor with KafkaConsumer #844

Merged
merged 3 commits into from
Feb 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 62 additions & 34 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,18 +211,18 @@ object KafkaConsumer {

def enqueueAssignment(
streamId: StreamId,
assigned: SortedSet[TopicPartition],
partitionsMapQueue: PartitionsMapQueue,
assignmentRevoked: F[Unit]
assigned: Map[TopicPartition, Deferred[F, Unit]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] = {
val assignment: F[PartitionsMap] = if (assigned.isEmpty) {
F.pure(Map.empty)
} else {
assigned.toVector
.traverse { partition =>
createPartitionStream(streamId, partition, assignmentRevoked).map { stream =>
partition -> stream
}
.traverse {
case (partition, finisher) =>
createPartitionStream(streamId, partition, finisher.get).map { stream =>
partition -> stream
}
}
.map(_.toMap)
}
Expand All @@ -239,68 +239,96 @@ object KafkaConsumer {

def onRebalance(
streamId: StreamId,
prevAssignmentFinisherRef: Ref[F, Deferred[F, Unit]],
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
partitionsMapQueue: PartitionsMapQueue
): OnRebalance[F] =
OnRebalance(
onRevoked = _ => {
onRevoked = revoked => {
for {
newFinisher <- Deferred[F, Unit]
prevAssignmentFinisher <- prevAssignmentFinisherRef.getAndSet(newFinisher)
_ <- prevAssignmentFinisher.complete(())
finishers <- assignmentRef.modify(_.partition(entry => !revoked.contains(entry._1)))
_ <- finishers.toVector
.traverse {
case (_, finisher) =>
finisher.complete(())
}
} yield ()
},
onAssigned = assigned => {
prevAssignmentFinisherRef.get.flatMap { prevAssignmentFinisher =>
enqueueAssignment(
onAssigned = assignedPartitions => {
for {
assignment <- assignedPartitions.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
_ <- assignmentRef.update(_ ++ assignment)
_ <- enqueueAssignment(
streamId = streamId,
assigned = assigned,
partitionsMapQueue = partitionsMapQueue,
assignmentRevoked = prevAssignmentFinisher.get
assigned = assignment,
partitionsMapQueue = partitionsMapQueue
)
}
} yield ()
}
)

def requestAssignment(
streamId: StreamId,
prevAssignmentFinisherRef: Ref[F, Deferred[F, Unit]],
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
partitionsMapQueue: PartitionsMapQueue
): F[SortedSet[TopicPartition]] =
): F[Map[TopicPartition, Deferred[F, Unit]]] =
Deferred[F, Either[Throwable, SortedSet[TopicPartition]]].flatMap { deferred =>
val request =
Request.Assignment[F](
deferred.complete(_).void,
Some(onRebalance(streamId, prevAssignmentFinisherRef, partitionsMapQueue))
Some(
onRebalance(
streamId,
assignmentRef,
partitionsMapQueue
)
)
)
val assignment = requests.offer(request) >> deferred.get.rethrow
F.race(awaitTermination.attempt, assignment).map {
case Left(_) => SortedSet.empty[TopicPartition]
case Right(assigned) => assigned
F.race(awaitTermination.attempt, assignment).flatMap {
case Left(_) =>
F.pure(Map.empty)

case Right(assigned) =>
assigned.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
}
}

def initialEnqueue(
streamId: StreamId,
partitionsMapQueue: PartitionsMapQueue,
prevAssignmentFinisherRef: Ref[F, Deferred[F, Unit]]
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] =
for {
prevAssignmentFinisher <- prevAssignmentFinisherRef.get
assigned <- requestAssignment(streamId, prevAssignmentFinisherRef, partitionsMapQueue)
assignmentRevoked = prevAssignmentFinisher.get
_ <- enqueueAssignment(streamId, assigned, partitionsMapQueue, assignmentRevoked)
assigned <- requestAssignment(
streamId,
assignmentRef,
partitionsMapQueue
)
_ <- enqueueAssignment(streamId, assigned, partitionsMapQueue)
} yield ()

Stream.eval(stopConsumingDeferred.tryGet).flatMap {
case None =>
for {
partitionsMapQueue <- Stream.eval(Queue.unbounded[F, Option[PartitionsMap]])
streamId <- Stream.eval(streamIdRef.modify(n => (n + 1, n)))
prevAssignmentFinisher <- Stream.eval(Deferred[F, Unit])
prevAssignmentFinisherRef <- Stream.eval(Ref[F].of(prevAssignmentFinisher))
assignmentRef <- Stream.eval(Ref[F].of(Map.empty[TopicPartition, Deferred[F, Unit]]))
_ <- Stream
.eval(initialEnqueue(streamId, partitionsMapQueue, prevAssignmentFinisherRef))
.eval(
initialEnqueue(
streamId,
assignmentRef,
partitionsMapQueue
)
)
out <- Stream
.fromQueueNoneTerminated(partitionsMapQueue)
.interruptWhen(awaitTermination.attempt)
Expand Down
27 changes: 16 additions & 11 deletions modules/core/src/main/scala/fs2/kafka/consumer/KafkaConsume.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,26 @@ trait KafkaConsume[F[_], K, V] {
def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]]

/**
* `Stream` where each element contains a current assignment. The current
* assignment is the `Map`, where keys is a `TopicPartition`, and values are
* streams with records for a particular `TopicPartition`.<br>
* `Stream` where each element contains an assignment. Each assignment is
* `Map`, where keys are `TopicPartition`s, and values are record streams for
* the `TopicPartition`.<br>
* <br>
* New assignments will be received on each rebalance. On rebalance,
* Kafka revoke all previously assigned partitions, and after that assigned
* new partitions all at once. `partitionsMapStream` reflects this process
* in a streaming manner.<br>
* With the default assignor, previous partition assignments are revoked at
* once, and a new set of partitions assigned to a consumer on each
* rebalance. In this case, each returned map contains the full partition
* assignment for the consumer. `partitionsMapStream` reflects the assignment
* process in a streaming manner.<br>
* <br>
* Note, that partition streams for revoked partitions will
* be closed after the new assignment comes.<br>
* This may not be the case when a custom assignor is configured in the
* consumer. When using the `CooperativeStickyAssignor`, for instance,
* partition assignments may be revoked individually. In this case, each
* element in the stream will contain only streams for newly assigned
* partitions. Streams returned previously will remain active until the
* assignment is revoked.<br>
* <br>
* This is the most generic `Stream` method. If you don't need such control,
* consider using `partitionedStream` or `stream` methods.
* They are both based on a `partitionsMapStream`.
* consider using `partitionedStream` or `stream` methods. They are both
* based on a `partitionsMapStream`.
*
* @note you have to first use `subscribe` to subscribe the consumer
* before using this `Stream`. If you forgot to subscribe, there
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
val records = withRebalancing.records.keySetStrict

val revokedFetches = revoked intersect fetches
val revokedNonFetches = revoked diff fetches
val revokedNonFetches = revoked diff revokedFetches
Comment on lines 314 to +315
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would appreciate a comment on this change in particular.

The change makes sense to me, logically: revokedFetches and revokedNonFetches add up to revoked. But I did not dig enough into the implications to be sure this is the intention. For instance: in which combination of pending records and pending fetches it will make a difference, and what is the consequence?

I thought I had left this out of the PR, but while it's here we might as well discuss it.


val withRecords = records intersect revokedFetches
val withoutRecords = revokedFetches diff records
Expand Down
146 changes: 144 additions & 2 deletions modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import cats.effect.unsafe.implicits.global
import fs2.Stream
import fs2.concurrent.SignallingRef
import fs2.kafka.internal.converters.collection._
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
import org.apache.kafka.clients.consumer.{ConsumerConfig, CooperativeStickyAssignor, NoOffsetForPartitionException}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.scalatest.Assertion
Expand Down Expand Up @@ -516,6 +516,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
}
}
.takeWhile(_.size < 200)
.timeout(20.seconds)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout added to match test with custom assignor. This test was working without it, while the one for the custom assignor would hang indefinitely.

.compile
.drain
.guarantee(stopSignal.set(true))
Expand All @@ -540,6 +541,88 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
}
}

it("should handle rebalance with CooperativeStickyAssignor") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a copy of the "should handle rebalance" test, adapted to use the custom assignor, along with the resulting assert.

withTopic { topic =>
createCustomTopic(topic, partitions = 3)
val produced1 = (0 until 100).map(n => s"key-$n" -> s"value->$n")
val produced2 = (100 until 200).map(n => s"key-$n" -> s"value->$n")
val producedTotal = produced1.size.toLong + produced2.size.toLong

def startConsumer(
consumedQueue: Queue[IO, CommittableConsumerRecord[IO, String, String]],
stopSignal: SignallingRef[IO, Boolean]
): IO[Fiber[IO, Throwable, Vector[Set[Int]]]] =
Ref[IO]
.of(Vector.empty[Set[Int]])
.flatMap { assignedPartitionsRef =>
KafkaConsumer
.stream(
consumerSettings[IO]
.withProperties(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName
)
)
.subscribeTo(topic)
.flatMap(_.partitionsMapStream)
.filter(_.nonEmpty)
.evalMap { assignment =>
assignedPartitionsRef.update(_ :+ assignment.keySet.map(_.partition())).as {
Stream
.emits(assignment.map {
case (_, stream) =>
stream.evalMap(consumedQueue.offer)
}.toList)
.covary[IO]
}
}
.flatten
.parJoinUnbounded
.interruptWhen(stopSignal)
.compile
.drain >> assignedPartitionsRef.get
}
.start

(for {
stopSignal <- SignallingRef[IO, Boolean](false)
queue <- Queue.unbounded[IO, CommittableConsumerRecord[IO, String, String]]
ref <- Ref.of[IO, Map[String, Int]](Map.empty)
fiber1 <- startConsumer(queue, stopSignal)
_ <- IO.sleep(5.seconds)
_ <- IO(publishToKafka(topic, produced1))
fiber2 <- startConsumer(queue, stopSignal)
_ <- IO.sleep(5.seconds)
_ <- IO(publishToKafka(topic, produced2))
_ <- Stream
.fromQueueUnterminated(queue)
.evalMap { committable =>
ref.modify { counts =>
val key = committable.record.key
val newCounts = counts.updated(key, counts.getOrElse(key, 0) + 1)
(newCounts, newCounts)
}
}
.takeWhile(_.size < 200)
.timeout(20.seconds)
.compile
.drain
.guarantee(stopSignal.set(true))
consumer1assignments <- fiber1.joinWithNever
consumer2assignments <- fiber2.joinWithNever
keys <- ref.get
} yield {
assert {
keys.size.toLong == producedTotal &&
keys.values.sum == 236 &&
consumer1assignments.size == 1 &&
consumer1assignments(0) == Set(0, 1, 2) &&
consumer2assignments.size == 1 &&
consumer2assignments(0) == Set(2)
}
}).unsafeRunSync()
}
}

it("should close all old streams on rebalance") {
withTopic { topic =>
val numPartitions = 3
Expand Down Expand Up @@ -698,7 +781,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
consumer1Updates(2).isEmpty &&
consumer1Updates(3).size < 3 &&
// Startup assignments (zero), initial assignments (< 3)
consumer2Updates.length == 2
consumer2Updates.length == 2 &&
Comment on lines -701 to +784
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this affected scalatest's ability to produce pretty failure messages.

consumer2Updates.head.isEmpty &&
consumer2Updates(1).size < 3 &&
(consumer1Updates(3) ++ consumer2Updates(1)) == consumer1Updates(1)
Expand All @@ -707,6 +790,65 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
}
}

it("should stream assignment updates to listeners when using CooperativeStickyAssignor") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a copy of the existing "should stream assignment updates to listeners" test, adapted to use the custom assignor, along with the resulting asserts.

withTopic { topic =>
createCustomTopic(topic, partitions = 3)

val consumer =
for {
queue <- Stream.eval(Queue.unbounded[IO, Option[SortedSet[TopicPartition]]])
_ <- KafkaConsumer
.stream(
consumerSettings[IO]
.withProperties(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName
)

)
.subscribeTo(topic)
.evalMap { consumer =>
consumer.assignmentStream
.concurrently(consumer.records)
.evalMap(as => queue.offer(Some(as)))
.compile
.drain
.start
.void
}
} yield {
queue
}

(for {
queue1 <- consumer
_ <- Stream.eval(IO.sleep(5.seconds))
queue2 <- consumer
_ <- Stream.eval(IO.sleep(5.seconds))
_ <- Stream.eval(queue1.offer(None))
_ <- Stream.eval(queue2.offer(None))
consumer1Updates <- Stream.eval(
Stream.fromQueueNoneTerminated(queue1).compile.toList
)
consumer2Updates <- Stream.eval(
Stream.fromQueueNoneTerminated(queue2).compile.toList
)
_ <- Stream.eval(IO(assert {
// Startup assignment (zero), initial assignment (all partitions),
// minimal revocation when 2nd joins (keep two)
consumer1Updates.length == 3 &&
consumer1Updates.head.isEmpty &&
consumer1Updates(1).size == 3 &&
consumer1Updates(2).size == 2 &&
// Startup assignments (zero), initial assignments (one)
consumer2Updates.length == 2 &&
consumer2Updates.head.isEmpty &&
consumer2Updates(1).size == 1 &&
(consumer1Updates(2) ++ consumer2Updates(1)) == consumer1Updates(1)
}))
} yield ()).compile.drain.unsafeRunSync()
}
}

it("begin from the current assignments") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
Expand Down