Skip to content

Commit

Permalink
Fix buffer size include in flight messages in backpressure calculatio…
Browse files Browse the repository at this point in the history
…n logic (#836)

* include inflight messages in backpressure logic
  • Loading branch information
rkolesnev authored Oct 21, 2024
1 parent fff10ce commit b24b5cd
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ endif::[]

=== Fixes

* fix: include inflight message count in polling backpressure logic (#836)
* fix: message loss on closing or partitions revoked (#827) fixes (#826)
* fix: unbounded retry queue growth preventing polling from being throttled and leading to OOM (#834) fixes (#832, #817)

=== Note
#836 introduces a change in how buffer size is calculated as now inflight messages are counted as part of buffer size - so behaviour of existing applications may change and pausing of consumer happen sooner.

== 0.5.3.1

=== Fixes
Expand Down
4 changes: 4 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1537,9 +1537,13 @@ endif::[]

=== Fixes

* fix: include inflight message count in polling backpressure logic (#836)
* fix: message loss on closing or partitions revoked (#827) fixes (#826)
* fix: unbounded retry queue growth preventing polling from being throttled and leading to OOM (#834) fixes (#832, #817)

=== Note
#836 introduces a change in how buffer size is calculated as now inflight messages are counted as part of buffer size - so behaviour of existing applications may change and pausing of consumer happen sooner.

== 0.5.3.1

=== Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ public static <K, V> ControllerEventMessage<K, V> of(WorkContainer<K, V> work) {
private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean();

/**
* Indicates state of waiting while in-flight messages complete processing on shutdown.
* Used to prevent control thread interrupt due to wakeup logic on rebalances
* Indicates state of waiting while in-flight messages complete processing on shutdown. Used to prevent control
* thread interrupt due to wakeup logic on rebalances
*/
private final AtomicBoolean awaitingInflightProcessingCompletionOnShutdown = new AtomicBoolean();

Expand Down Expand Up @@ -508,7 +508,7 @@ private static Optional<Boolean> getAutoCommitEnabled(final org.apache.kafka.cli
if (consumer instanceof MockConsumer<?, ?>) {
log.debug("Detected MockConsumer class which doesn't do auto commits");
return Optional.of(false);
} else if (!(consumer instanceof KafkaConsumer<?,?>)) {
} else if (!(consumer instanceof KafkaConsumer<?, ?>)) {
log.warn("Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: {}", consumer.getClass());
return Optional.of(false); // Probably Mockito
}
Expand Down Expand Up @@ -606,6 +606,16 @@ public void close(DrainingMode drainMode) {
log.info("Close complete.");
}

/**
* Returns cached view of paused partition size. Useful for testing and monitoring by wrapping application / user
* code.
*
* @return number of paused partitions
*/
public int getPausedPartitionSize() {
return brokerPollSubsystem.getPausedPartitionSize();
}

private void waitForClose(Duration timeout) throws TimeoutException, ExecutionException {
log.info("Waiting on closed state...");
while (!state.equals(CLOSED)) {
Expand Down Expand Up @@ -909,9 +919,13 @@ protected <R> void controlLoop(Function<PollContextInternal<K, V>, List<R>> user
private void maybeWakeupPoller() {
if (state == RUNNING) {
if (!wm.isSufficientlyLoaded() && brokerPollSubsystem.isPausedForThrottling()) {
log.debug("Found Poller paused with not enough front loaded messages, ensuring poller is awake (mail: {} vs target: {})",
wm.getNumberOfWorkQueuedInShardsAwaitingSelection(),
options.getTargetAmountOfRecordsInFlight());
if (log.isDebugEnabled()) {
long inShards = wm.getNumberOfWorkQueuedInShardsAwaitingSelection();
long outForProcessing = wm.getNumberRecordsOutForProcessing();
log.debug("Found Poller paused with not enough front loaded messages, ensuring poller is awake (in buffers: {} vs target: {}), in shards: {}, outForProcessing: {}",
inShards + outForProcessing,
options.getTargetAmountOfRecordsInFlight(), inShards, outForProcessing);
}
brokerPollSubsystem.wakeupIfPaused();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,14 @@ public void resumePollingAndWorkRegistrationIfPaused() {
log.info("Skipping transition of broker poll system to state running. Current state is {}.", this.runState);
}
}

/**
* Returns cached view of paused partition size. Useful for testing and monitoring by wrapping application / user
* code.
*
* @return number of paused partitions
*/
public int getPausedPartitionSize() {
return consumerManager.getPausedPartitionSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public boolean shouldThrottle() {
* should be downloaded (or pipelined in the Consumer)
*/
public boolean isSufficientlyLoaded() {
return getNumberOfWorkQueuedInShardsAwaitingSelection() > (long) options.getTargetAmountOfRecordsInFlight() * getLoadingFactor();
return (getNumberOfWorkQueuedInShardsAwaitingSelection() + getNumberRecordsOutForProcessing() ) > (long) options.getTargetAmountOfRecordsInFlight() * getLoadingFactor();
}

private int getLoadingFactor() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@

/*-
* Copyright (C) 2020-2024 Confluent, Inc.
*/
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import pl.tlinkowski.unij.api.UniSets;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

@Slf4j
public class BrokerPollerBackpressureTest extends BrokerIntegrationTest<String, String> {

Consumer<String, String> consumer;

ParallelConsumerOptions<String, String> pcOpts;
ParallelEoSStreamProcessor<String, String> pc;

@BeforeEach
void setUp() {
setupTopic();
consumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP);

pcOpts = ParallelConsumerOptions.<String, String>builder()
.consumer(consumer)
.ordering(KEY)
.maxConcurrency(10)
.messageBufferSize(150)
.build();

pc = new ParallelEoSStreamProcessor<>(pcOpts);

pc.subscribe(UniSets.of(topic));
}

@Test
@SneakyThrows
void brokerPollPausedWithEmptyShardsButHighInFlight() {
var messageProcessingLatch = new CountDownLatch(1);
assertThat(pc.getPausedPartitionSize()).isEqualTo(0); // should be polling initially
getKcu().produceMessages(topic, 200);
AtomicInteger count = new AtomicInteger(0);
pc.poll((context) -> {
try {
count.incrementAndGet();
messageProcessingLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(200)).until(() -> pc.getWm().getNumberOfWorkQueuedInShardsAwaitingSelection() == 0); //wait for all successful messages to complete
await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(pc.getPausedPartitionSize()).isEqualTo(1)); //polling should be paused even though shards size is 0;
//give it a second to make sure it does not get resumed again
ThreadUtils.sleepQuietly(1000);
assertThat(pc.getPausedPartitionSize()).isEqualTo(1); // should be polling still paused
messageProcessingLatch.countDown();
await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> {
assertThat(count.get()).isEqualTo(200);
assertThat(pc.getWm().getNumberRecordsOutForProcessing()).isEqualTo(0);
}); //wait for all messages to complete and verify no messages are out for processing anymore.
await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(pc.getPausedPartitionSize()).isEqualTo(0)); //should resume now that messages have been processed and inflight is 0.
getKcu().produceMessages(topic, 10); //send 10 more - just to make sure PC is actually polling for msgs
await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(count.get()).isEqualTo(210)); //should process the 10 new messages
}

@Test
@SneakyThrows
void brokerPollPausedWithHighNumberInShardsButLowInFlight() {
var messageProcessingLatch = new CountDownLatch(1);
assertThat(pc.getPausedPartitionSize()).isEqualTo(0); // should be polling initially
getKcu().produceMessages(topic, 200, 5);
AtomicInteger count = new AtomicInteger(0);
pc.poll((context) -> {
try {
count.incrementAndGet();
messageProcessingLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(200)).until(() -> pc.getWm().getNumberOfWorkQueuedInShardsAwaitingSelection() == 195); //wait for all processing threads to be blocked
await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(pc.getPausedPartitionSize()).isEqualTo(1)); //polling should be paused with shards size being above buffer size and inflight being low;
//give it a second to make sure it does not get resumed again
ThreadUtils.sleepQuietly(1000);
assertThat(pc.getPausedPartitionSize()).isEqualTo(1); // should be polling still paused
messageProcessingLatch.countDown();
await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> {
assertThat(count.get()).isEqualTo(200);
assertThat(pc.getWm().getNumberRecordsOutForProcessing()).isEqualTo(0);
}); //wait for all messages to complete and verify no messages are out for processing anymore.
await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(pc.getPausedPartitionSize()).isEqualTo(0)); //should resume now that messages have been processed and inflight is 0.
getKcu().produceMessages(topic, 10); //send 10 more - just to make sure PC is actually polling for msgs
await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(count.get()).isEqualTo(210)); //should process the 10 new messages
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,29 @@ public List<NewTopic> createTopics(int numTopics) {
}

public List<String> produceMessages(String topicName, long numberToSend) throws InterruptedException, ExecutionException {
return produceMessages(topicName, numberToSend, "");
return produceMessages(topicName, numberToSend, "", numberToSend);
}

public List<String> produceMessages(String topicName, long numberToSend, long numberOfUniqueKeys) throws InterruptedException, ExecutionException {
return produceMessages(topicName, numberToSend, "", numberOfUniqueKeys);
}

public List<String> produceMessages(String topicName, long numberToSend, String prefix) throws InterruptedException, ExecutionException {
return produceMessages(topicName, numberToSend, prefix, numberToSend);
}

public List<String> produceMessages(String topicName, long numberToSend, String prefix, long numberOfUniqueKeys) throws InterruptedException, ExecutionException {
log.info("Producing {} messages to {}", numberToSend, topicName);
final List<String> expectedKeys = new ArrayList<>();
List<Future<RecordMetadata>> sends = new ArrayList<>();
try (Producer<String, String> kafkaProducer = createNewProducer(false)) {

var mu = new ModelUtils(new PCModuleTestEnv());
List<ProducerRecord<String, String>> recs = mu.createProducerRecords(topicName, numberToSend, prefix);
List<ProducerRecord<String, String>> recs = new ArrayList<>();
while (recs.size() < numberToSend) { //generate records in blocks of numberOfUniqueKeys
recs.addAll(mu.createProducerRecords(topicName, numberOfUniqueKeys, prefix));
}
recs = recs.subList(0, (int) numberToSend); //trim back to requested number to send

for (var record : recs) {
Future<RecordMetadata> send = kafkaProducer.send(record, (meta, exception) -> {
Expand Down

0 comments on commit b24b5cd

Please sign in to comment.