diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 367ff2ca0..97d120ef4 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -17,9 +17,13 @@ endif::[] === Fixes +* fix: include inflight message count in polling backpressure logic (#836) * fix: message loss on closing or partitions revoked (#827) fixes (#826) * fix: unbounded retry queue growth preventing polling from being throttled and leading to OOM (#834) fixes (#832, #817) +=== Note +#836 introduces a change in how buffer size is calculated as now inflight messages are counted as part of buffer size - so behaviour of existing applications may change and pausing of consumer happen sooner. + == 0.5.3.1 === Fixes diff --git a/README.adoc b/README.adoc index 37ec38cb8..d8d542cc1 100644 --- a/README.adoc +++ b/README.adoc @@ -1537,9 +1537,13 @@ endif::[] === Fixes +* fix: include inflight message count in polling backpressure logic (#836) * fix: message loss on closing or partitions revoked (#827) fixes (#826) * fix: unbounded retry queue growth preventing polling from being throttled and leading to OOM (#834) fixes (#832, #817) +=== Note +#836 introduces a change in how buffer size is calculated as now inflight messages are counted as part of buffer size - so behaviour of existing applications may change and pausing of consumer happen sooner. + == 0.5.3.1 === Fixes diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index cdad98300..2918717eb 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -184,8 +184,8 @@ public static ControllerEventMessage of(WorkContainer work) { private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean(); /** - * Indicates state of waiting while in-flight messages complete processing on shutdown. - * Used to prevent control thread interrupt due to wakeup logic on rebalances + * Indicates state of waiting while in-flight messages complete processing on shutdown. Used to prevent control + * thread interrupt due to wakeup logic on rebalances */ private final AtomicBoolean awaitingInflightProcessingCompletionOnShutdown = new AtomicBoolean(); @@ -508,7 +508,7 @@ private static Optional getAutoCommitEnabled(final org.apache.kafka.cli if (consumer instanceof MockConsumer) { log.debug("Detected MockConsumer class which doesn't do auto commits"); return Optional.of(false); - } else if (!(consumer instanceof KafkaConsumer)) { + } else if (!(consumer instanceof KafkaConsumer)) { log.warn("Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: {}", consumer.getClass()); return Optional.of(false); // Probably Mockito } @@ -606,6 +606,16 @@ public void close(DrainingMode drainMode) { log.info("Close complete."); } + /** + * Returns cached view of paused partition size. Useful for testing and monitoring by wrapping application / user + * code. + * + * @return number of paused partitions + */ + public int getPausedPartitionSize() { + return brokerPollSubsystem.getPausedPartitionSize(); + } + private void waitForClose(Duration timeout) throws TimeoutException, ExecutionException { log.info("Waiting on closed state..."); while (!state.equals(CLOSED)) { @@ -909,9 +919,13 @@ protected void controlLoop(Function, List> user private void maybeWakeupPoller() { if (state == RUNNING) { if (!wm.isSufficientlyLoaded() && brokerPollSubsystem.isPausedForThrottling()) { - log.debug("Found Poller paused with not enough front loaded messages, ensuring poller is awake (mail: {} vs target: {})", - wm.getNumberOfWorkQueuedInShardsAwaitingSelection(), - options.getTargetAmountOfRecordsInFlight()); + if (log.isDebugEnabled()) { + long inShards = wm.getNumberOfWorkQueuedInShardsAwaitingSelection(); + long outForProcessing = wm.getNumberRecordsOutForProcessing(); + log.debug("Found Poller paused with not enough front loaded messages, ensuring poller is awake (in buffers: {} vs target: {}), in shards: {}, outForProcessing: {}", + inShards + outForProcessing, + options.getTargetAmountOfRecordsInFlight(), inShards, outForProcessing); + } brokerPollSubsystem.wakeupIfPaused(); } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java index b5b828b1e..da4746c0e 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java @@ -399,4 +399,14 @@ public void resumePollingAndWorkRegistrationIfPaused() { log.info("Skipping transition of broker poll system to state running. Current state is {}.", this.runState); } } + + /** + * Returns cached view of paused partition size. Useful for testing and monitoring by wrapping application / user + * code. + * + * @return number of paused partitions + */ + public int getPausedPartitionSize() { + return consumerManager.getPausedPartitionSize(); + } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java index 7babea9e4..1e0ad3932 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java @@ -232,7 +232,7 @@ public boolean shouldThrottle() { * should be downloaded (or pipelined in the Consumer) */ public boolean isSufficientlyLoaded() { - return getNumberOfWorkQueuedInShardsAwaitingSelection() > (long) options.getTargetAmountOfRecordsInFlight() * getLoadingFactor(); + return (getNumberOfWorkQueuedInShardsAwaitingSelection() + getNumberRecordsOutForProcessing() ) > (long) options.getTargetAmountOfRecordsInFlight() * getLoadingFactor(); } private int getLoadingFactor() { diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerPollerBackpressureTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerPollerBackpressureTest.java new file mode 100644 index 000000000..982fc015c --- /dev/null +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerPollerBackpressureTest.java @@ -0,0 +1,110 @@ + +/*- + * Copyright (C) 2020-2024 Confluent, Inc. + */ +package io.confluent.parallelconsumer.integrationTests; + +import io.confluent.csid.utils.ThreadUtils; +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; +import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import pl.tlinkowski.unij.api.UniSets; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + +@Slf4j +public class BrokerPollerBackpressureTest extends BrokerIntegrationTest { + + Consumer consumer; + + ParallelConsumerOptions pcOpts; + ParallelEoSStreamProcessor pc; + + @BeforeEach + void setUp() { + setupTopic(); + consumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP); + + pcOpts = ParallelConsumerOptions.builder() + .consumer(consumer) + .ordering(KEY) + .maxConcurrency(10) + .messageBufferSize(150) + .build(); + + pc = new ParallelEoSStreamProcessor<>(pcOpts); + + pc.subscribe(UniSets.of(topic)); + } + + @Test + @SneakyThrows + void brokerPollPausedWithEmptyShardsButHighInFlight() { + var messageProcessingLatch = new CountDownLatch(1); + assertThat(pc.getPausedPartitionSize()).isEqualTo(0); // should be polling initially + getKcu().produceMessages(topic, 200); + AtomicInteger count = new AtomicInteger(0); + pc.poll((context) -> { + try { + count.incrementAndGet(); + messageProcessingLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(200)).until(() -> pc.getWm().getNumberOfWorkQueuedInShardsAwaitingSelection() == 0); //wait for all successful messages to complete + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(pc.getPausedPartitionSize()).isEqualTo(1)); //polling should be paused even though shards size is 0; + //give it a second to make sure it does not get resumed again + ThreadUtils.sleepQuietly(1000); + assertThat(pc.getPausedPartitionSize()).isEqualTo(1); // should be polling still paused + messageProcessingLatch.countDown(); + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> { + assertThat(count.get()).isEqualTo(200); + assertThat(pc.getWm().getNumberRecordsOutForProcessing()).isEqualTo(0); + }); //wait for all messages to complete and verify no messages are out for processing anymore. + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(pc.getPausedPartitionSize()).isEqualTo(0)); //should resume now that messages have been processed and inflight is 0. + getKcu().produceMessages(topic, 10); //send 10 more - just to make sure PC is actually polling for msgs + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(count.get()).isEqualTo(210)); //should process the 10 new messages + } + + @Test + @SneakyThrows + void brokerPollPausedWithHighNumberInShardsButLowInFlight() { + var messageProcessingLatch = new CountDownLatch(1); + assertThat(pc.getPausedPartitionSize()).isEqualTo(0); // should be polling initially + getKcu().produceMessages(topic, 200, 5); + AtomicInteger count = new AtomicInteger(0); + pc.poll((context) -> { + try { + count.incrementAndGet(); + messageProcessingLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(200)).until(() -> pc.getWm().getNumberOfWorkQueuedInShardsAwaitingSelection() == 195); //wait for all processing threads to be blocked + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(pc.getPausedPartitionSize()).isEqualTo(1)); //polling should be paused with shards size being above buffer size and inflight being low; + //give it a second to make sure it does not get resumed again + ThreadUtils.sleepQuietly(1000); + assertThat(pc.getPausedPartitionSize()).isEqualTo(1); // should be polling still paused + messageProcessingLatch.countDown(); + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> { + assertThat(count.get()).isEqualTo(200); + assertThat(pc.getWm().getNumberRecordsOutForProcessing()).isEqualTo(0); + }); //wait for all messages to complete and verify no messages are out for processing anymore. + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(pc.getPausedPartitionSize()).isEqualTo(0)); //should resume now that messages have been processed and inflight is 0. + getKcu().produceMessages(topic, 10); //send 10 more - just to make sure PC is actually polling for msgs + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(20)).untilAsserted(() -> assertThat(count.get()).isEqualTo(210)); //should process the 10 new messages + } +} \ No newline at end of file diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java index 3d476b0df..02b318d54 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java @@ -271,17 +271,29 @@ public List createTopics(int numTopics) { } public List produceMessages(String topicName, long numberToSend) throws InterruptedException, ExecutionException { - return produceMessages(topicName, numberToSend, ""); + return produceMessages(topicName, numberToSend, "", numberToSend); + } + + public List produceMessages(String topicName, long numberToSend, long numberOfUniqueKeys) throws InterruptedException, ExecutionException { + return produceMessages(topicName, numberToSend, "", numberOfUniqueKeys); } public List produceMessages(String topicName, long numberToSend, String prefix) throws InterruptedException, ExecutionException { + return produceMessages(topicName, numberToSend, prefix, numberToSend); + } + + public List produceMessages(String topicName, long numberToSend, String prefix, long numberOfUniqueKeys) throws InterruptedException, ExecutionException { log.info("Producing {} messages to {}", numberToSend, topicName); final List expectedKeys = new ArrayList<>(); List> sends = new ArrayList<>(); try (Producer kafkaProducer = createNewProducer(false)) { var mu = new ModelUtils(new PCModuleTestEnv()); - List> recs = mu.createProducerRecords(topicName, numberToSend, prefix); + List> recs = new ArrayList<>(); + while (recs.size() < numberToSend) { //generate records in blocks of numberOfUniqueKeys + recs.addAll(mu.createProducerRecords(topicName, numberOfUniqueKeys, prefix)); + } + recs = recs.subList(0, (int) numberToSend); //trim back to requested number to send for (var record : recs) { Future send = kafkaProducer.send(record, (meta, exception) -> {