Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: drop use of FetchRequests #1369

Open
wants to merge 3 commits into
base: series/3.x
Choose a base branch
from

Conversation

biochimia
Copy link
Contributor

Record streams are streamlined by dropping the FetchRequest middleman. Instead, queues of records are lazily created by KafkaConsumerActor, one for each assigned partition.

Enqueueing records uses Queue.tryOffer, to avoid blocking the polling fiber. KafkaConsumerActor maintains a separate spillover map to hold records when queues are full. This acts as back-pressure and pauses polling for "slow" partitions.

KafkaConsumerActor.poll takes a more active role in managing assigned partitions. Namely, it signals the revocation of any partitions that are not assigned, and drops those from the internal state.

While the ConsumerRebalanceListener interface is meant to handle this state when the consumer faces rebalance operations, explicitly handling assignments in poll caters to manually assigned partitions. In addition, it ensures KafkaConsumerActor's internal state stays consistent in the face of race conditions between rebalance operations and the setup of record streams in KafkaConsumer.

The newly introduced partitionState (maintaining per-partition queues and spillover records) bears some resemblance to the former fetches and records fields, but differs in some important ways:

  • registration of fetch requests involved a query of the current consumer assignment, which forced synchronization with the polling thread via the use of withConsumer.blocking.
  • fetch requests would follow the lifcycle of record chunks, and needed to be continuously re-added to the state, before new data would be polled from a partition. Now, the current assignment forms the basis to fetch data from a partition, with spillover records acting as a back-pressure signal;
  • records acted as a holding area, but mainly supported a race condition between a new assignment, the start of a new stream, and the subsequent registration of fetch requests. records was not generally used as a spillover area for incoming data.

In the face of multiple streams hooked up to a single consumer, an inherent race in the old registration of fetch requests meant that each chunk of records could be added to all or only a subset of the listening streams. With the new approach, multiple streams will forcefully compete to take elements from the queue, ensuring each chunk of records goes to only one stream. (While I do not expect that such use of multiple streams was a feature, the potential behavior change is noted.)

An internal StreamId was previously used to match FetchRequests to the owning stream. This is no longer used and the ID is dropped.

KafkaConsumer previously kept track of its partition assignment, along with a Deferred instances to interrupt partition streams. This is now handled by KafkaConsumerActor, which relies solely on the underlying consumer to keep track of its current assignment.

Overall, this change should reduce some overhead and latency between the polling thread and the record streams. It does this by removing layers through which records must pass, as well as reducing synchronization between polling and consuming fibers.

Record streams are streamlined by dropping the `FetchRequest` middleman.
Instead, queues of records are lazily created by `KafkaConsumerActor`,
one for each assigned partition.

Enqueueing records uses `Queue.tryOffer`, to avoid blocking the polling
fiber. `KafkaConsumerActor` maintains a separate `spillover` map to hold
records when queues are full. This acts as back-pressure and pauses
polling for "slow" partitions.

`KafkaConsumerActor.poll` takes a more active role in managing assigned
partitions. Namely, it signals the revocation of any partitions that are
not assigned, and drops those from the internal state.

While the `ConsumerRebalanceListener` interface is meant to handle this
state when the consumer faces rebalance operations, explicitly handling
assignments in `poll` caters to manually assigned partitions. In
addition, it ensures `KafkaConsumerActor`'s internal state stays
consistent in the face of race conditions between rebalance operations
and the setup of record streams in `KafkaConsumer`.

The newly introduced `partitionState` (maintaining per-partition queues
and spillover records) bears some resemblance to the former `fetches`
and `records` fields, but differs in some important ways:

- registration of fetch requests involved a query of the current
  consumer assignment, which forced synchronization with the polling
  thread via the use of `withConsumer.blocking`.
- fetch requests would follow the lifcycle of record chunks, and needed
  to be continuously re-added to the state, before new data would be
  polled from a partition. Now, the current assignment forms the basis
  to fetch data from a partition, with spillover records acting as a
  back-pressure signal;
- `records` acted as a holding area, but mainly supported a race
  condition between a new assignment, the start of a new stream, and the
  subsequent registration of fetch requests. `records` was not generally
  used as a spillover area for incoming data.

In the face of multiple streams hooked up to a single consumer, an
inherent race in the old registration of fetch requests meant that each
chunk of records could be added to all or only a subset of the listening
streams. With the new approach, multiple streams will forcefully compete
to take elements from the queue, ensuring each chunk of records goes to
only one stream. (While I do not expect that such use of multiple
streams was a feature, the potential behavior change is noted.)

An internal `StreamId` was previously used to match `FetchRequest`s to
the owning stream. This is no longer used and the ID is dropped.

`KafkaConsumer` previously kept track of its partition assignment, along
with a `Deferred` instances to interrupt partition streams. This is now
handled by `KafkaConsumerActor`, which relies solely on the underlying
consumer to keep track of its current assignment.

Overall, this change should reduce some overhead and latency between the
polling thread and the record streams. It does this by removing layers
through which records must pass, as well as reducing synchronization
between polling and consuming fibers.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

1 participant