Skip to content

Commit

Permalink
[cleanup][broker] Follow up on apache#16968 to restore some behavior …
Browse files Browse the repository at this point in the history
…in PersistentDispatcherMultipleConsumers class (apache#17018)

(cherry picked from commit abff91f)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
mattisonchao authored and nodece committed Sep 10, 2024
1 parent 433b22b commit 6176720
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
protected volatile boolean sendInProgress;
protected boolean sendInProgress;
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
Expand Down Expand Up @@ -255,8 +255,8 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM
* We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
*
*/
public void readMoreEntiresAsync() {
topic.getBrokerService().executor().execute(() -> readMoreEntries());
public void readMoreEntriesAsync() {
topic.getBrokerService().executor().execute(this::readMoreEntries);
}

public synchronized void readMoreEntries() {
Expand Down Expand Up @@ -303,7 +303,7 @@ public synchronized void readMoreEntries() {
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntiresAsync();
readMoreEntriesAsync();
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
Expand Down Expand Up @@ -892,7 +892,7 @@ public void addUnAckedMessages(int numberOfMessages) {
if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
readMoreEntiresAsync();
readMoreEntriesAsync();
}

int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
Expand All @@ -915,7 +915,7 @@ public void addUnAckedMessages(int numberOfMessages) {
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked", name);
topic.getBrokerService().executor().execute(() -> readMoreEntries());
readMoreEntriesAsync();
}
}
// increment broker-level count
Expand Down

0 comments on commit 6176720

Please sign in to comment.