Skip to content

Commit

Permalink
fixed long rebalancing for AckMode.EXACTLY_ONCE (#371)
Browse files Browse the repository at this point in the history
Don't increment uncommitted messages in CommitBatch for AckMode.ExactlyOnce.
Increasing the number of uncommitted messages in CommitBatch causes an infinite loop during rebalancing because they never decrease.

Fixes #371.
  • Loading branch information
Alexander committed Jan 21, 2024
1 parent 9b09225 commit b25fe21
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -172,7 +172,11 @@ private void onPartitionsRevoked(Collection<TopicPartition> partitions) {
long end = maxDelayRebalance + System.currentTimeMillis();
do {
try {
log.debug("Rebalancing; waiting for {} records in pipeline", inPipeline);
log.debug(
"Rebalancing; waiting for {} records in pipeline or awaitingTransaction: {}",
inPipeline,
this.awaitingTransaction.get()
);
Thread.sleep(interval);
commitEvent.runIfRequired(true);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -374,7 +378,10 @@ public void run() {
}

if (!records.isEmpty()) {
this.commitBatch.addUncommitted(records);
// Handled separately using transactional KafkaSender
if (ackMode != AckMode.EXACTLY_ONCE) {
this.commitBatch.addUncommitted(records);
}
r = Operators.produced(REQUESTED, ConsumerEventLoop.this, 1);
log.debug("Emitting {} records, requested now {}", records.count(), r);
sink.emitNext(records, ConsumerEventLoop.this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Sinks.Many;
Expand All @@ -39,6 +38,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -73,16 +73,13 @@ public void deferredCommitsWithRevoke() throws InterruptedException {
t.printStackTrace();
return null;
}).given(sink).emitError(any(), any());
ConsumerEventLoop loop = new ConsumerEventLoop<>(AckMode.MANUAL_ACK, null, opts,
scheduler, consumer, t -> false, sink, new AtomicBoolean());
Set<String> topics = new HashSet<>();
topics.add("test");
Collection<TopicPartition> partitions = new ArrayList<>();
TopicPartition tp = new TopicPartition("test", 0);
partitions.add(tp);
Map<TopicPartition, List<ConsumerRecord>> record = new HashMap<>();
record.put(tp, Collections.singletonList(
new ConsumerRecord("test", 0, 0, 0, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, null)));
record.put(tp, Collections.singletonList(new ConsumerRecord("test", 0, 0, null, null)));
ConsumerRecords records = new ConsumerRecords(record);
CountDownLatch latch = new CountDownLatch(2);
AtomicBoolean paused = new AtomicBoolean();
Expand All @@ -102,6 +99,8 @@ public void deferredCommitsWithRevoke() throws InterruptedException {
paused.set(false);
return null;
}).given(consumer).resume(any());
ConsumerEventLoop loop = new ConsumerEventLoop<>(AckMode.MANUAL_ACK, null, opts,
scheduler, consumer, t -> false, sink, new AtomicBoolean());
loop.onRequest(1);
loop.onRequest(1);
CommittableBatch batch = loop.commitEvent.commitBatch;
Expand All @@ -115,4 +114,69 @@ public void deferredCommitsWithRevoke() throws InterruptedException {
assertThat(batch.deferred).hasSize(0);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void revokePartitionsForExactlyOnce() throws InterruptedException {
AtomicBoolean isPartitionRevokeFinished = new AtomicBoolean();
ReceiverOptions opts = ReceiverOptions.create(
Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "deferredCommitsWithRevoke"))
.maxDelayRebalance(Duration.ofSeconds(30))
.addRevokeListener(p -> isPartitionRevokeFinished.set(true))
.subscription(Collections.singletonList("test"));
Consumer consumer = mock(Consumer.class);
Scheduler scheduler = KafkaSchedulers.newEvent(opts.groupId());
Many sink = mock(Many.class);
willAnswer(inv -> {
Throwable t = inv.getArgument(0);
t.printStackTrace();
return null;
}).given(sink).emitError(any(), any());
Set<String> topics = new HashSet<>();
topics.add("test");
Collection<TopicPartition> partitions = new ArrayList<>();
TopicPartition tp = new TopicPartition("test", 0);
partitions.add(tp);
Map<TopicPartition, List<ConsumerRecord>> record = new HashMap<>();
record.put(tp, Collections.singletonList(new ConsumerRecord("test", 0, 0, null, null)));
ConsumerRecords records = new ConsumerRecords(record);
CountDownLatch latch = new CountDownLatch(2);
AtomicBoolean paused = new AtomicBoolean();
willAnswer(inv -> {
Thread.sleep(10);
latch.countDown();
if (paused.get()) {
return ConsumerRecords.empty();
}
return records;
}).given(consumer).poll(any());
willAnswer(inv -> {
paused.set(true);
return null;
}).given(consumer).pause(any());
willAnswer(inv -> {
paused.set(false);
return null;
}).given(consumer).resume(any());

ConsumerEventLoop loop = new ConsumerEventLoop<>(AckMode.EXACTLY_ONCE, null, opts,
scheduler, consumer, t -> false, sink, new AtomicBoolean());

loop.onRequest(1);
loop.onRequest(1);

CommittableBatch batch = loop.commitEvent.commitBatch;
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(batch.uncommitted).hasSize(0);

loop.awaitingTransaction.set(true);
ArgumentCaptor<ConsumerRebalanceListener> rebal = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
verify(consumer).subscribe(any(Collection.class), rebal.capture());

Executors.newSingleThreadExecutor().execute(() -> rebal.getValue().onPartitionsRevoked(partitions));
await().pollDelay(Duration.ofSeconds(5)).until(() -> true);
assertThat(isPartitionRevokeFinished.get()).isFalse();
loop.awaitingTransaction.set(false);
await().atMost(Duration.ofSeconds(10)).until(isPartitionRevokeFinished::get);
}

}

0 comments on commit b25fe21

Please sign in to comment.