Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[fix] Use async back pressure to limit RIP #1621

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

michaeljmarshall
Copy link
Contributor

@michaeljmarshall michaeljmarshall commented Dec 13, 2022

Motivation

This is a work in progress to make the KafkaCommandDecoder use asynchronous backpressure to limit the number of requests in progress. The current code uses synchronous backpressure to block a netty event loop, which is an anti pattern, and can lead to issues with other connections.

Modifications

  • Revert Fix pulsar-io-thread block forever #946. While this solution worked, it left the underlying problem of blocking a netty thread in place, so it needs to be removed.
  • Introduce numQueuedRequestsInProgress to track number of requests. This variable is only updated from the event loop thread, so it does not need to be synchronized in any way.
  • Disable autoRead when the channel has reached its limit, and enable auto read when it has gone below its limit.

Verifying this change

I'll work on adding tests tomorrow.

Documentation

  • no-need-doc

This is an internal optimization that does not need to be documented.

@github-actions github-actions bot added the no-need-doc This pr does not need any document label Dec 13, 2022
@lhotari
Copy link
Member

lhotari commented Dec 13, 2022

The changes make sense. Some improvement areas:

  • there's a need to consider the overall backpressure solution.
  • it could be beneficial to have a high-watermark / low-watermark type of solution for backpressure so that channel state doesn't get toggled after each incoming request, when the queue is full and there's a continuous flow of incoming requests. The reason for this seems to be that there's a cost involved in backpressuring the channel and by enabling some sort of batch with the high-watermark / low-watermark, this could improve overall efficiency. This stackoverflow answer references a blog post that seems to support this theory.

The 2nd point isn't as important as the 1st point where the overall backpressure solution should be considered.

Some parts of the existing backpressure solution can be seen in

private void disableCnxAutoRead() {
if (ctx != null && ctx.channel().config().isAutoRead()) {
ctx.channel().config().setAutoRead(false);
if (log.isDebugEnabled()) {
log.debug("[{}] disable auto read", ctx.channel());
}
}
}
private void enableCnxAutoRead() {
if (ctx != null && !ctx.channel().config().isAutoRead()
&& !autoReadDisabledPublishBufferLimiting) {
// Resume reading from socket if pending-request is not reached to threshold
ctx.channel().config().setAutoRead(true);
// triggers channel read
ctx.read();
if (log.isDebugEnabled()) {
log.debug("[{}] enable auto read", ctx.channel());
}
}
}
private void startSendOperationForThrottling(long msgSize) {
final long currentPendingBytes = pendingBytes.addAndGet(msgSize);
if (currentPendingBytes >= maxPendingBytes && !autoReadDisabledPublishBufferLimiting && maxPendingBytes > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] disable auto read because currentPendingBytes({}) > maxPendingBytes({})",
ctx.channel(), currentPendingBytes, maxPendingBytes);
}
disableCnxAutoRead();
autoReadDisabledPublishBufferLimiting = true;
pulsarService.getBrokerService().pausedConnections(1);
}
}
private void completeSendOperationForThrottling(long msgSize) {
final long currentPendingBytes = pendingBytes.addAndGet(-msgSize);
if (currentPendingBytes < resumeThresholdPendingBytes && autoReadDisabledPublishBufferLimiting) {
if (log.isDebugEnabled()) {
log.debug("[{}] enable auto read because currentPendingBytes({}) < resumeThreshold({})",
ctx.channel(), currentPendingBytes, resumeThresholdPendingBytes);
}
autoReadDisabledPublishBufferLimiting = false;
enableCnxAutoRead();
pulsarService.getBrokerService().resumedConnections(1);
}
}
. This code was added in #488 .

If there are multiple independent backpressure solutions on the same channel, that would lead to conflicting behavior.

Regarding the overall backpressure solution, it's a major gap in Pulsar and KOP that Netty's asynchronous backpressure for handling outbound traffic (channelWritabilityChanged events). It seems that the solution added in #488 somewhat covers that without using Netty's built-in solution for not overwhelming outbound buffers.

@codecov
Copy link

codecov bot commented Dec 13, 2022

Codecov Report

Merging #1621 (10d1d92) into master (3f3a73f) will increase coverage by 0.80%.
The diff coverage is 60.60%.

❗ Current head 10d1d92 differs from pull request most recent head 039955b. Consider uploading reports for the commit 039955b to get more accurate results

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1621      +/-   ##
============================================
+ Coverage     14.96%   15.76%   +0.80%     
- Complexity      592      612      +20     
============================================
  Files           164      164              
  Lines         12225    12247      +22     
  Branches       1120     1123       +3     
============================================
+ Hits           1829     1931     +102     
+ Misses        10242    10157      -85     
- Partials        154      159       +5     
Impacted Files Coverage Δ
...ative/pulsar/handlers/kop/KafkaRequestHandler.java 1.09% <0.00%> (-0.01%) ⬇️
...oordinator/transaction/TransactionCoordinator.java 0.00% <0.00%> (ø)
...p/coordinator/transaction/TransactionMetadata.java 0.00% <0.00%> (ø)
...coordinator/transaction/ProducerIdManagerImpl.java 54.97% <70.17%> (+54.97%) ⬆️
...ative/pulsar/handlers/kop/PendingTopicFutures.java 70.58% <0.00%> (-5.89%) ⬇️

@michaeljmarshall
Copy link
Contributor Author

Thanks for your review @lhotari. I didn't notice that we were already using the auto read, so I'll re-review the solution there as well as at the low and high water mark.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
no-need-doc This pr does not need any document
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants