Skip to content

Commit

Permalink
Use Phaser instead of CountDownLatch
Browse files Browse the repository at this point in the history
  • Loading branch information
汪苏诚 committed Mar 26, 2024
1 parent 20ea61a commit c18e634
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
Expand Down Expand Up @@ -52,24 +53,23 @@ public void testAcknowledgeIndividualConcurrently() throws InterruptedException
BitSet bitSet = new BitSet(batchSize);
bitSet.set(0, batchSize);
AtomicInteger individualAcked = new AtomicInteger();
CountDownLatch startLatch = new CountDownLatch(1);
Phaser phaser = new Phaser(1);
CountDownLatch finishLatch = new CountDownLatch(batchSize);
for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) {
phaser.register();
BatchMessageIdImpl messageId = new BatchMessageIdImpl(1, 0, 0, batchIndex, batchSize, bitSet);
executorService.execute(() -> {
try {
startLatch.await();
phaser.arriveAndAwaitAdvance();
if (MessageIdAdvUtils.acknowledge(messageId, true)) {
individualAcked.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
finishLatch.countDown();
}
});
}
startLatch.countDown();
phaser.arriveAndDeregister();
finishLatch.await();
assertEquals(individualAcked.get(), 1);
}
Expand Down

0 comments on commit c18e634

Please sign in to comment.