From fc9b34cb4612cdada0dbccc5a9fa16544486de9f Mon Sep 17 00:00:00 2001 From: sschepens Date: Thu, 9 Feb 2017 22:49:22 -0300 Subject: [PATCH] Replace usages of Atomic* for Atomic*FieldUpdater (#195) Replaced some usages of AtomicInteger, AtomicLong and AtomicReference for their Atomic*FieldUpdater counterpart on classes which get instantiated a lot, this should reduce memory usage for the broker and clients which use lots of topics and consumers. --- .../mledger/impl/ManagedCursorImpl.java | 114 ++++++++++-------- .../mledger/impl/ManagedLedgerImpl.java | 93 +++++++------- .../bookkeeper/mledger/impl/OpAddEntry.java | 6 +- .../pulsar/broker/namespace/OwnedBundle.java | 19 +-- .../yahoo/pulsar/broker/service/Consumer.java | 48 +++++--- ...sistentDispatcherSingleActiveConsumer.java | 31 ++--- .../persistent/PersistentReplicator.java | 65 +++++----- .../persistent/PersistentSubscription.java | 33 ++--- .../service/persistent/PersistentTopic.java | 31 ++--- .../impl/BrokerClientIntegrationTest.java | 90 +++++++++----- .../pulsar/client/impl/MessageIdTest.java | 46 ++++--- .../org/testng/listener/TestListener.java | 2 +- .../pulsar/client/impl/ConsumerBase.java | 6 +- .../pulsar/client/impl/ConsumerImpl.java | 56 ++++----- .../yahoo/pulsar/client/impl/HandlerBase.java | 56 ++++++--- .../client/impl/PartitionedConsumerImpl.java | 24 ++-- .../client/impl/PartitionedProducerImpl.java | 14 +-- .../pulsar/client/impl/ProducerBase.java | 1 - .../pulsar/client/impl/ProducerImpl.java | 30 ++--- .../pulsar/client/impl/PulsarClientImpl.java | 2 +- .../RoundRobinPartitionMessageRouterImpl.java | 9 +- .../GrowableArrayBlockingQueue.java | 37 +++--- 22 files changed, 466 insertions(+), 347 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index b9d014a784afd..799bf1251bf8a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -24,12 +24,9 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; @@ -82,9 +79,18 @@ public class ManagedCursorImpl implements ManagedCursor { private volatile PositionImpl markDeletePosition; private volatile PositionImpl readPosition; - protected final AtomicReference waitingReadOp = new AtomicReference(); - protected AtomicBoolean resetCursorInProgress = new AtomicBoolean(false); - private final AtomicInteger pendingReadOps = new AtomicInteger(0); + protected static final AtomicReferenceFieldUpdater WAITING_READ_OP_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp"); + private volatile OpReadEntry waitingReadOp = null; + + private static final int FALSE = 0; + private static final int TRUE = 1; + private static final AtomicIntegerFieldUpdater RESET_CURSOR_IN_PROGRESS_UPDATER = AtomicIntegerFieldUpdater + .newUpdater(ManagedCursorImpl.class, "resetCursorInProgress"); + private volatile int resetCursorInProgress = FALSE; + private static final AtomicIntegerFieldUpdater PENDING_READ_OPS_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingReadOps"); + private volatile int pendingReadOps = 0; // This counters are used to compute the numberOfEntries and numberOfEntriesInBacklog values, without having to look // at the list of ledgers in the ml. They are initialized to (-backlog) at opening, and will be incremented each @@ -119,7 +125,9 @@ public PendingMarkDeleteEntry(PositionImpl newPosition, MarkDeleteCallback callb } private final ArrayDeque pendingMarkDeleteOps = new ArrayDeque(); - private final AtomicInteger pendingMarkDeletedSubmittedCount = new AtomicInteger(); + private static final AtomicIntegerFieldUpdater PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount"); + private volatile int pendingMarkDeletedSubmittedCount = 0; private long lastLedgerSwitchTimestamp; enum State { @@ -130,7 +138,9 @@ enum State { Closed // The managed cursor has been closed }; - private final AtomicReference state = new AtomicReference(); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, State.class, "state"); + private volatile State state = null; public interface VoidCallback { public void operationComplete(); @@ -143,7 +153,11 @@ public interface VoidCallback { this.config = config; this.ledger = ledger; this.name = cursorName; - this.state.set(State.Uninitialized); + STATE_UPDATER.set(this, State.Uninitialized); + PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0); + PENDING_READ_OPS_UPDATER.set(this, 0); + RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE); + WAITING_READ_OP_UPDATER.set(this, null); this.lastLedgerSwitchTimestamp = System.currentTimeMillis(); if (config.getThrottleMarkDelete() > 0.0) { @@ -260,7 +274,7 @@ private void recoveredCursor(PositionImpl position) { markDeletePosition = position; readPosition = ledger.getNextValidPosition(position); - state.set(State.NoLedger); + STATE_UPDATER.set(this, State.NoLedger); } void initialize(PositionImpl position, final VoidCallback callback) { @@ -273,7 +287,7 @@ void initialize(PositionImpl position, final VoidCallback callback) { createNewMetadataLedger(new VoidCallback() { @Override public void operationComplete() { - state.set(State.Open); + STATE_UPDATER.set(ManagedCursorImpl.this, State.Open); callback.operationComplete(); } @@ -323,12 +337,12 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { public void asyncReadEntries(final int numberOfEntriesToRead, final ReadEntriesCallback callback, final Object ctx) { checkArgument(numberOfEntriesToRead > 0); - if (state.get() == State.Closed) { + if (STATE_UPDATER.get(this) == State.Closed) { callback.readEntriesFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; } - pendingReadOps.incrementAndGet(); + PENDING_READ_OPS_UPDATER.incrementAndGet(this); OpReadEntry op = OpReadEntry.create(this, PositionImpl.get(readPosition), numberOfEntriesToRead, callback, ctx); ledger.asyncReadEntries(op); } @@ -372,7 +386,7 @@ public void readEntryComplete(Entry entry, Object ctx) { public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, ReadEntryCallback callback, Object ctx) { checkArgument(N > 0); - if (state.get() == State.Closed) { + if (STATE_UPDATER.get(this) == State.Closed) { callback.readEntryFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; } @@ -436,7 +450,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { @Override public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx) { checkArgument(numberOfEntriesToRead > 0); - if (state.get() == State.Closed) { + if (STATE_UPDATER.get(this) == State.Closed) { callback.readEntriesFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; } @@ -451,7 +465,7 @@ public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallbac OpReadEntry op = OpReadEntry.create(this, PositionImpl.get(readPosition), numberOfEntriesToRead, callback, ctx); - if (!waitingReadOp.compareAndSet(null, op)) { + if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) { callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"), ctx); return; @@ -488,12 +502,12 @@ public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallbac log.debug("[{}] [{}] Found more entries", ledger.getName(), name); } // Try to cancel the notification request - if (waitingReadOp.compareAndSet(op, null)) { + if (WAITING_READ_OP_UPDATER.compareAndSet(this, op, null)) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Cancelled notification and scheduled read at {}", ledger.getName(), name, op.readPosition); } - pendingReadOps.incrementAndGet(); + PENDING_READ_OPS_UPDATER.incrementAndGet(this); ledger.asyncReadEntries(op); } else { if (log.isDebugEnabled()) { @@ -510,11 +524,11 @@ public boolean cancelPendingReadRequest() { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Cancel pending read request", ledger.getName(), name); } - return waitingReadOp.getAndSet(null) != null; + return WAITING_READ_OP_UPDATER.getAndSet(this, null) != null; } public boolean hasPendingReadRequest() { - return waitingReadOp.get() != null; + return WAITING_READ_OP_UPDATER.get(this) != null; } @Override @@ -549,10 +563,10 @@ public long getNumberOfEntriesSinceFirstNotAckedMessage() { public long getNumberOfEntriesInBacklog() { if (log.isDebugEnabled()) { log.debug("[{}] Consumer {} cursor ml-entries: {} -- deleted-counter: {} other counters: mdPos {} rdPos {}", - ledger.getName(), name, ledger.entriesAddedCounter.get(), messagesConsumedCounter, + ledger.getName(), name, ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger), messagesConsumedCounter, markDeletePosition, readPosition); } - long backlog = ledger.entriesAddedCounter.get() - messagesConsumedCounter; + long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter; if (backlog < 0) { // In some case the counters get incorrect values, fall back to the precise backlog count backlog = getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())); @@ -648,7 +662,7 @@ protected void internalResetCursor(final PositionImpl newPosition, log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), newPosition, name); synchronized (pendingMarkDeleteOps) { - if (!resetCursorInProgress.compareAndSet(false, true)) { + if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) { log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}", ledger.getName(), newPosition, name); resetCursorCallback.resetFailed( @@ -692,7 +706,7 @@ public void operationComplete() { } synchronized (pendingMarkDeleteOps) { pendingMarkDeleteOps.clear(); - if (!resetCursorInProgress.compareAndSet(true, false)) { + if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", ledger.getName(), newPosition, name); } @@ -704,7 +718,7 @@ public void operationComplete() { @Override public void operationFailed(ManagedLedgerException exception) { synchronized (pendingMarkDeleteOps) { - if (!resetCursorInProgress.compareAndSet(true, false)) { + if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", ledger.getName(), newPosition, name); } @@ -1169,12 +1183,12 @@ public void asyncMarkDelete(final Position position, final MarkDeleteCallback ca checkNotNull(position); checkArgument(position instanceof PositionImpl); - if (state.get() == State.Closed) { + if (STATE_UPDATER.get(this) == State.Closed) { callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; } - if (resetCursorInProgress.get()) { + if (RESET_CURSOR_IN_PROGRESS_UPDATER.get(this) == TRUE) { if (log.isDebugEnabled()) { log.debug("[{}] cursor reset in progress - ignoring mark delete on position [{}] for cursor [{}]", ledger.getName(), (PositionImpl) position, name); @@ -1214,7 +1228,7 @@ private void internalAsyncMarkDelete(final PositionImpl newPosition, final MarkD // We cannot write to the ledger during the switch, need to wait until the new metadata ledger is available synchronized (pendingMarkDeleteOps) { // The state might have changed while we were waiting on the queue mutex - switch (state.get()) { + switch (STATE_UPDATER.get(this)) { case Closed: callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; @@ -1228,7 +1242,7 @@ private void internalAsyncMarkDelete(final PositionImpl newPosition, final MarkD break; case Open: - if (pendingReadOps.get() > 0) { + if (PENDING_READ_OPS_UPDATER.get(this) > 0) { // Wait until no read operation are pending pendingMarkDeleteOps.add(mdEntry); } else { @@ -1249,7 +1263,7 @@ void internalMarkDelete(final PendingMarkDeleteEntry mdEntry) { // The counter is used to mark all the pending mark-delete request that were submitted to BK and that are not // yet finished. While we have outstanding requests we cannot close the current ledger, so the switch to new // ledger is postponed to when the counter goes to 0. - pendingMarkDeletedSubmittedCount.incrementAndGet(); + PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.incrementAndGet(this); persistPosition(cursorLedger, mdEntry.newPosition, new VoidCallback() { @Override @@ -1361,7 +1375,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callback, Object ctx) { checkArgument(pos instanceof PositionImpl); - if (state.get() == State.Closed) { + if (STATE_UPDATER.get(this) == State.Closed) { callback.deleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; } @@ -1608,7 +1622,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { @Override public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object ctx) { - State oldState = state.getAndSet(State.Closed); + State oldState = STATE_UPDATER.getAndSet(this, State.Closed); if (oldState == State.Closed) { log.info("[{}] [{}] State is already closed", ledger.getName(), name); callback.closeComplete(ctx); @@ -1662,14 +1676,14 @@ void setReadPosition(Position newReadPositionInt) { void startCreatingNewMetadataLedger() { // Change the state so that new mark-delete ops will be queued and not immediately submitted - State oldState = state.getAndSet(State.SwitchingLedger); + State oldState = STATE_UPDATER.getAndSet(this, State.SwitchingLedger); if (oldState == State.SwitchingLedger) { // Ignore double request return; } // Check if we can immediately switch to a new metadata ledger - if (pendingMarkDeletedSubmittedCount.get() == 0) { + if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.get(this) == 0) { createNewMetadataLedger(); } } @@ -1683,7 +1697,7 @@ public void operationComplete() { flushPendingMarkDeletes(); // Resume normal mark-delete operations - state.set(State.Open); + STATE_UPDATER.set(ManagedCursorImpl.this, State.Open); } } @@ -1698,7 +1712,7 @@ public void operationFailed(ManagedLedgerException exception) { } // At this point we don't have a ledger ready - state.set(State.NoLedger); + STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger); } } }); @@ -1706,7 +1720,7 @@ public void operationFailed(ManagedLedgerException exception) { private void flushPendingMarkDeletes() { if (!pendingMarkDeleteOps.isEmpty()) { - if (resetCursorInProgress.get()) { + if (RESET_CURSOR_IN_PROGRESS_UPDATER.get(this) == TRUE) { failPendingMarkDeletes(); } else { internalFlushPendingMarkDeletes(); @@ -1807,7 +1821,7 @@ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { position, lh.getId(), BKException.getMessage(rc)); // If we've had a write error, the ledger will be automatically closed, we need to create a new one, // in the meantime the mark-delete will be queued. - state.compareAndSet(State.Open, State.NoLedger); + STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger); callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc))); } } @@ -1818,7 +1832,7 @@ boolean shouldCloseLedger(LedgerHandle lh) { long now = System.currentTimeMillis(); if ((lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger() || lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000)) - && state.get() != State.Closed) { + && STATE_UPDATER.get(this) != State.Closed) { // It's safe to modify the timestamp since this method will be only called from a callback, implying that // calls will be serialized on one single thread lastLedgerSwitchTimestamp = now; @@ -1871,7 +1885,7 @@ void notifyEntriesAvailable() { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Received ml notification", ledger.getName(), name); } - OpReadEntry opReadEntry = waitingReadOp.getAndSet(null); + OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndSet(this, null); if (opReadEntry != null) { if (log.isDebugEnabled()) { @@ -1881,7 +1895,7 @@ void notifyEntriesAvailable() { ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition); } - pendingReadOps.incrementAndGet(); + PENDING_READ_OPS_UPDATER.incrementAndGet(this); opReadEntry.readPosition = (PositionImpl) getReadPosition(); ledger.asyncReadEntries(opReadEntry); } else { @@ -1910,8 +1924,8 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) { } void decrementPendingMarkDeleteCount() { - if (pendingMarkDeletedSubmittedCount.decrementAndGet() == 0) { - final State state = this.state.get(); + if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.decrementAndGet(this) == 0) { + final State state = STATE_UPDATER.get(this); if (state == State.SwitchingLedger) { // A metadata ledger switch was pending and now we can do it since we don't have any more // outstanding mark-delete requests @@ -1921,12 +1935,12 @@ void decrementPendingMarkDeleteCount() { } void readOperationCompleted() { - if (pendingReadOps.decrementAndGet() == 0) { + if (PENDING_READ_OPS_UPDATER.decrementAndGet(this) == 0) { synchronized (pendingMarkDeleteOps) { - if (state.get() == State.Open) { + if (STATE_UPDATER.get(this) == State.Open) { // Flush the pending writes only if the state is open. flushPendingMarkDeletes(); - } else if (pendingMarkDeletedSubmittedCount.get() != 0) { + } else if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.get(this) != 0) { log.info( "[{}] read operation completed and cursor was closed. need to call any queued cursor close", name); @@ -1959,7 +1973,7 @@ void asyncDeleteLedger(final LedgerHandle lh) { } void asyncDeleteCursorLedger() { - state.set(State.Closed); + STATE_UPDATER.set(this, State.Closed); if (cursorLedger == null) { // No ledger was created @@ -2016,7 +2030,7 @@ private PositionImpl getRollbackPosition(ManagedCursorInfo info) { // / Expose internal values for debugging purpose public int getPendingReadOpsCount() { - return pendingReadOps.get(); + return PENDING_READ_OPS_UPDATER.get(this); } public long getMessagesConsumedCounter() { @@ -2056,7 +2070,7 @@ public long getLastLedgerSwitchTimestamp() { } public String getState() { - return state.get().toString(); + return STATE_UPDATER.get(this).toString(); } private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index de82c403ff7a2..f456b03e2475b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -31,9 +31,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -103,10 +101,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final ManagedCursorContainer activeCursors = new ManagedCursorContainer(); // Ever increasing counter of entries added - AtomicLong entriesAddedCounter = new AtomicLong(0); + static final AtomicLongFieldUpdater ENTRIES_ADDED_COUNTER_UPDATER = + AtomicLongFieldUpdater.newUpdater(ManagedLedgerImpl.class, "entriesAddedCounter"); + private volatile long entriesAddedCounter = 0; - AtomicLong numberOfEntries = new AtomicLong(0); - AtomicLong totalSize = new AtomicLong(0); + static final AtomicLongFieldUpdater NUMBER_OF_ENTRIES_UPDATER = + AtomicLongFieldUpdater.newUpdater(ManagedLedgerImpl.class, "numberOfEntries"); + private volatile long numberOfEntries = 0; + static final AtomicLongFieldUpdater TOTAL_SIZE_UPDATER = + AtomicLongFieldUpdater.newUpdater(ManagedLedgerImpl.class, "totalSize"); + private volatile long totalSize = 0; private RateLimiter updateCursorRateLimit; @@ -160,7 +164,9 @@ enum PositionBound { startIncluded, startExcluded } - private final AtomicReference state = new AtomicReference(); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state"); + private volatile State state = null; private final ScheduledExecutorService scheduledExecutor; private final OrderedSafeExecutor executor; @@ -185,7 +191,10 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper this.name = name; this.scheduledExecutor = scheduledExecutor; this.executor = orderedExecutor; - this.state.set(State.None); + TOTAL_SIZE_UPDATER.set(this, 0); + NUMBER_OF_ENTRIES_UPDATER.set(this, 0); + ENTRIES_ADDED_COUNTER_UPDATER.set(this, 0); + STATE_UPDATER.set(this, State.None); this.ledgersVersion = null; this.mbean = new ManagedLedgerMBeanImpl(this); this.entryCache = factory.getEntryCacheManager().getEntryCache(this); @@ -263,8 +272,8 @@ private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedg while (iterator.hasNext()) { LedgerInfo li = iterator.next(); if (li.getEntries() > 0) { - numberOfEntries.addAndGet(li.getEntries()); - totalSize.addAndGet(li.getSize()); + NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, li.getEntries()); + TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize()); } else { iterator.remove(); bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> { @@ -301,7 +310,7 @@ public void operationFailed(MetaStoreException e) { } log.info("[{}] Created ledger {}", name, lh.getId()); - state.set(State.LedgerOpened); + STATE_UPDATER.set(this, State.LedgerOpened); lastLedgerCreatedTimestamp = System.currentTimeMillis(); currentLedger = lh; lastConfirmedEntry = new PositionImpl(lh.getId(), -1); @@ -435,7 +444,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback if (log.isDebugEnabled()) { log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); } - final State state = this.state.get(); + final State state = STATE_UPDATER.get(this); if (state == State.Fenced) { callback.addFailed(new ManagedLedgerFencedException(), ctx); return; @@ -466,7 +475,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback if (log.isDebugEnabled()) { log.debug("[{}] Creating a new ledger", name); } - if (this.state.compareAndSet(State.ClosedLedger, State.CreatingLedger)) { + if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) { this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), @@ -492,7 +501,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback } // This entry will be the last added to current ledger addOperation.setCloseWhenDone(true); - this.state.set(State.ClosingLedger); + STATE_UPDATER.set(this, State.ClosingLedger); } addOperation.initiate(); @@ -694,7 +703,7 @@ public boolean hasActiveCursors() { @Override public long getNumberOfEntries() { - return numberOfEntries.get(); + return NUMBER_OF_ENTRIES_UPDATER.get(this); } @Override @@ -713,7 +722,7 @@ public long getNumberOfActiveEntries() { @Override public long getTotalSize() { - return totalSize.get(); + return TOTAL_SIZE_UPDATER.get(this); } @Override @@ -864,7 +873,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { @Override public synchronized void asyncClose(final CloseCallback callback, final Object ctx) { - State state = this.state.get(); + State state = STATE_UPDATER.get(this); if (state == State.Fenced) { factory.close(this); callback.closeFailed(new ManagedLedgerFencedException(), ctx); @@ -880,7 +889,7 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c log.info("[{}] Closing managed ledger", name); factory.close(this); - this.state.set(State.Closed); + STATE_UPDATER.set(this, State.Closed); LedgerHandle lh = currentLedger; if (log.isDebugEnabled()) { @@ -932,7 +941,7 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct // Empty the list of pending requests and make all of them fail clearPendingAddEntries(status); lastLedgerCreationFailureTimestamp = System.currentTimeMillis(); - state.set(State.ClosedLedger); + STATE_UPDATER.set(this, State.ClosedLedger); } else { log.info("[{}] Created new ledger {}", name, lh.getId()); ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); @@ -962,7 +971,7 @@ public void operationFailed(MetaStoreException e) { log.error( "[{}] Failed to udpate ledger list. z-node version mismatch. Closing managed ledger", name); - state.set(State.Fenced); + STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced); clearPendingAddEntries(e); return; } @@ -985,7 +994,7 @@ public void operationFailed(MetaStoreException e) { synchronized (ManagedLedgerImpl.this) { lastLedgerCreationFailureTimestamp = System.currentTimeMillis(); - state.set(State.ClosedLedger); + STATE_UPDATER.set(ManagedLedgerImpl.this, State.ClosedLedger); clearPendingAddEntries(e); } } @@ -1010,7 +1019,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback callback) { } public synchronized void updateLedgersIdsComplete(Version version) { - state.set(State.LedgerOpened); + STATE_UPDATER.set(this, State.LedgerOpened); lastLedgerCreatedTimestamp = System.currentTimeMillis(); if (log.isDebugEnabled()) { @@ -1028,7 +1037,7 @@ public synchronized void updateLedgersIdsComplete(Version version) { } if (currentLedgerIsFull()) { - state.set(State.ClosingLedger); + STATE_UPDATER.set(this, State.ClosingLedger); op.setCloseWhenDone(true); op.initiate(); if (log.isDebugEnabled()) { @@ -1046,9 +1055,9 @@ public synchronized void updateLedgersIdsComplete(Version version) { // Private helpers synchronized void ledgerClosed(final LedgerHandle lh) { - final State state = this.state.get(); + final State state = STATE_UPDATER.get(this); if (state == State.ClosingLedger || state == State.LedgerOpened) { - this.state.set(State.ClosedLedger); + STATE_UPDATER.set(this, State.ClosedLedger); } else { // In case we get multiple write errors for different outstanding write request, we should close the ledger // just once @@ -1080,7 +1089,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) { if (log.isDebugEnabled()) { log.debug("[{}] Creating a new ledger", name); } - this.state.set(State.CreatingLedger); + STATE_UPDATER.set(this, State.CreatingLedger); this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), @@ -1097,7 +1106,7 @@ void clearPendingAddEntries(ManagedLedgerException e) { } void asyncReadEntries(OpReadEntry opReadEntry) { - final State state = this.state.get(); + final State state = STATE_UPDATER.get(this); if (state == State.Fenced || state == State.Closed) { opReadEntry.readEntriesFailed(new ManagedLedgerFencedException(), opReadEntry.ctx); return; @@ -1353,9 +1362,9 @@ void internalTrimConsumedLedgers() { synchronized (this) { if (log.isDebugEnabled()) { log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(), - totalSize.get()); + TOTAL_SIZE_UPDATER.get(this)); } - if (state.get() == State.Closed) { + if (STATE_UPDATER.get(this) == State.Closed) { log.debug("[{}] Ignoring trimming request since the managed ledger was already closed", name); trimmerMutex.unlock(); return; @@ -1384,7 +1393,7 @@ void internalTrimConsumedLedgers() { // skip ledger if retention constraint met for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) { boolean expired = hasLedgerRetentionExpired(ls.getTimestamp()); - boolean overRetentionQuota = totalSize.get() > ((long) config.getRetentionSizeInMB()) * 1024 * 1024; + boolean overRetentionQuota = TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024; if (ls.getLedgerId() == currentLedger.getId() || (!expired && !overRetentionQuota)) { if (log.isDebugEnabled()) { if (!expired) { @@ -1392,7 +1401,7 @@ void internalTrimConsumedLedgers() { } if (!overRetentionQuota) { log.debug("[{}] ledger id: {} skipped for deletion as size: {} under quota: {} MB", name, - ls.getLedgerId(), totalSize.get(), config.getRetentionSizeInMB()); + ls.getLedgerId(), TOTAL_SIZE_UPDATER.get(this), config.getRetentionSizeInMB()); } } break; @@ -1407,7 +1416,7 @@ void internalTrimConsumedLedgers() { return; } - if (state.get() == State.CreatingLedger // Give up now and schedule a new trimming + if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming || !ledgersListMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list scheduleDeferredTrimming(); trimmerMutex.unlock(); @@ -1417,8 +1426,8 @@ void internalTrimConsumedLedgers() { // Update metadata for (LedgerInfo ls : ledgersToDelete) { ledgers.remove(ls.getLedgerId()); - numberOfEntries.addAndGet(-ls.getEntries()); - totalSize.addAndGet(-ls.getSize()); + NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries()); + TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize()); entryCache.invalidateAllEntries(ls.getLedgerId()); } @@ -1431,7 +1440,7 @@ void internalTrimConsumedLedgers() { @Override public void operationComplete(Void result, Version version) { log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(), - totalSize.get()); + TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this)); ledgersVersion = version; ledgersListMutex.unlock(); trimmerMutex.unlock(); @@ -1501,7 +1510,7 @@ public void deleteLedgerFailed(ManagedLedgerException e, Object ctx) { public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) { // Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and // ledgers - state.set(State.Fenced); + STATE_UPDATER.set(this, State.Fenced); List cursors = Lists.newArrayList(this.cursors); if (cursors.isEmpty()) { @@ -1830,7 +1839,7 @@ Pair getLastPositionAndCounter() { do { pos = lastConfirmedEntry; - count = entriesAddedCounter.get(); + count = ENTRIES_ADDED_COUNTER_UPDATER.get(this); // Ensure no entry was written while reading the two values } while (pos.compareTo(lastConfirmedEntry) != 0); @@ -1910,20 +1919,20 @@ OrderedSafeExecutor getExecutor() { * @throws ManagedLedgerException */ private void checkFenced() throws ManagedLedgerException { - if (state.get() == State.Fenced) { + if (STATE_UPDATER.get(this) == State.Fenced) { log.error("[{}] Attempted to use a fenced managed ledger", name); throw new ManagedLedgerFencedException(); } } private void checkManagedLedgerIsOpen() throws ManagedLedgerException { - if (state.get() == State.Closed) { + if (STATE_UPDATER.get(this) == State.Closed) { throw new ManagedLedgerException("ManagedLedger " + name + " has already been closed"); } } synchronized void setFenced() { - state.set(State.Fenced); + STATE_UPDATER.set(this, State.Fenced); } MetaStore getStore() { @@ -1942,7 +1951,7 @@ static interface ManagedLedgerInitializeLedgerCallback { // Expose internal values for debugging purposes public long getEntriesAddedCounter() { - return entriesAddedCounter.get(); + return ENTRIES_ADDED_COUNTER_UPDATER.get(this); } public long getCurrentLedgerEntries() { @@ -1974,7 +1983,7 @@ public PositionImpl getLastConfirmedEntry() { } public String getState() { - return state.get().toString(); + return STATE_UPDATER.get(this).toString(); } public ManagedLedgerMBeanImpl getMBean() { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 42b1ae9e80939..f9bab9b0ab66a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -137,8 +137,8 @@ public void safeRun() { OpAddEntry firstInQueue = ml.pendingAddEntries.poll(); checkArgument(this == firstInQueue); - ml.numberOfEntries.incrementAndGet(); - ml.totalSize.addAndGet(dataLength); + ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml); + ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength); if (ml.hasActiveCursors()) { // Avoid caching entries if no cursor has been created ml.entryCache.insert(new EntryImpl(ledger.getId(), entryId, data)); @@ -148,7 +148,7 @@ public void safeRun() { data.release(); PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId); - ml.entriesAddedCounter.incrementAndGet(); + ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(ml); ml.lastConfirmedEntry = lastEntry; if (closeWhenDone) { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedBundle.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedBundle.java index ab0664fc29d8a..613dec3543837 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedBundle.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedBundle.java @@ -16,7 +16,7 @@ package com.yahoo.pulsar.broker.namespace; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.slf4j.Logger; @@ -35,7 +35,11 @@ public class OwnedBundle { * based on {@link #active} flag */ private final ReentrantReadWriteLock nsLock = new ReentrantReadWriteLock(); - private final AtomicBoolean isActive = new AtomicBoolean(true); + private static final int FALSE = 0; + private static final int TRUE = 1; + private static final AtomicIntegerFieldUpdater IS_ACTIVE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(OwnedBundle.class, "isActive"); + private volatile int isActive = TRUE; /** * constructor @@ -44,6 +48,7 @@ public class OwnedBundle { */ public OwnedBundle(NamespaceBundle suName) { this.bundle = suName; + IS_ACTIVE_UPDATER.set(this, TRUE); }; /** @@ -55,7 +60,7 @@ public OwnedBundle(NamespaceBundle suName) { */ public OwnedBundle(NamespaceBundle suName, boolean active) { this.bundle = suName; - this.isActive.set(active); + IS_ACTIVE_UPDATER.set(this, active ? TRUE : FALSE); } /** @@ -90,11 +95,11 @@ public void handleUnloadRequest(PulsarService pulsar) throws Exception { try { // set the flag locally s.t. no more producer/consumer to this namespace is allowed - if (!this.isActive.compareAndSet(true, false)) { + if (!IS_ACTIVE_UPDATER.compareAndSet(this, TRUE, FALSE)) { // An exception is thrown when the namespace is not in active state (i.e. another thread is // removing/have removed it) throw new IllegalStateException( - "Namespace is not active. ns:" + this.bundle + "; state:" + this.isActive.get()); + "Namespace is not active. ns:" + this.bundle + "; state:" + IS_ACTIVE_UPDATER.get(this)); } } finally { // no matter success or not, unlock @@ -137,10 +142,10 @@ public void handleUnloadRequest(PulsarService pulsar) throws Exception { * @return boolean value indicate that the namespace is active or not. */ public boolean isActive() { - return this.isActive.get(); + return IS_ACTIVE_UPDATER.get(this) == TRUE; } public void setActive(boolean active) { - isActive.set(active); + IS_ACTIVE_UPDATER.set(this, active ? TRUE : FALSE); } } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index 267030001e9fd..0d6f74b93d2b0 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -22,7 +22,7 @@ import java.time.Instant; import java.util.Iterator; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -67,18 +67,24 @@ public class Consumer { // Represents how many messages we can safely send to the consumer without // overflowing its receiving queue. The consumer will use Flow commands to // increase its availability - private final AtomicInteger messagePermits = new AtomicInteger(0); + private static final AtomicIntegerFieldUpdater MESSAGE_PERMITS_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits"); + private volatile int messagePermits = 0; // It starts keep tracking of messagePermits once consumer gets blocked, as consumer needs two separate counts: // messagePermits (1) before and (2) after being blocked: to dispatch only blockedPermit number of messages at the // time of redelivery - private final AtomicInteger permitsReceivedWhileConsumerBlocked = new AtomicInteger(0); + private static final AtomicIntegerFieldUpdater PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked"); + private volatile int permitsReceivedWhileConsumerBlocked = 0; private final ConcurrentOpenHashMap pendingAcks; private final ConsumerStats stats; private final int maxUnackedMessages; - private AtomicInteger unackedMessages = new AtomicInteger(0); + private static final AtomicIntegerFieldUpdater UNACKED_MESSAGES_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages"); + private volatile int unackedMessages = 0; private volatile boolean blockedConsumerOnUnackedMsgs = false; public Consumer(Subscription subscription, SubType subType, long consumerId, String consumerName, @@ -93,6 +99,9 @@ public Consumer(Subscription subscription, SubType subType, long consumerId, Str this.msgOut = new Rate(); this.msgRedeliver = new Rate(); this.appId = appId; + PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0); + MESSAGE_PERMITS_UPDATER.set(this, 0); + UNACKED_MESSAGES_UPDATER.set(this, 0); stats = new ConsumerStats(); stats.address = cnx.clientAddress().toString(); @@ -181,7 +190,7 @@ public Pair sendMessages(final List entries) { } private void incrementUnackedMessages(int ackedMessages) { - if (unackedMessages.addAndGet(ackedMessages) >= maxUnackedMessages && shouldBlockConsumerOnUnackMsgs()) { + if (UNACKED_MESSAGES_UPDATER.addAndGet(this, ackedMessages) >= maxUnackedMessages && shouldBlockConsumerOnUnackMsgs()) { blockedConsumerOnUnackedMsgs = true; } } @@ -226,7 +235,7 @@ int updatePermitsAndPendingAcks(final List entries) { permitsToReduce += batchSize; } // reduce permit and increment unackedMsg count with total number of messages in batch-msgs - int permits = messagePermits.addAndGet(-permitsToReduce); + int permits = MESSAGE_PERMITS_UPDATER.addAndGet(this, -permitsToReduce); incrementUnackedMessages(permitsToReduce); if (permits < 0) { if (log.isDebugEnabled()) { @@ -305,15 +314,15 @@ void flowPermits(int additionalNumberOfMessages) { checkArgument(additionalNumberOfMessages > 0); // block shared consumer when unacked-messages reaches limit - if (shouldBlockConsumerOnUnackMsgs() && unackedMessages.get() >= maxUnackedMessages) { + if (shouldBlockConsumerOnUnackMsgs() && UNACKED_MESSAGES_UPDATER.get(this) >= maxUnackedMessages) { blockedConsumerOnUnackedMsgs = true; } int oldPermits; if (!blockedConsumerOnUnackedMsgs) { - oldPermits = messagePermits.getAndAdd(additionalNumberOfMessages); + oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages); subscription.consumerFlow(this, additionalNumberOfMessages); } else { - oldPermits = permitsReceivedWhileConsumerBlocked.getAndAdd(additionalNumberOfMessages); + oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages); } if (log.isDebugEnabled()) { @@ -331,15 +340,15 @@ void flowPermits(int additionalNumberOfMessages) { * Consumer whose blockedPermits needs to be dispatched */ void flowConsumerBlockedPermits(Consumer consumer) { - int additionalNumberOfPermits = consumer.permitsReceivedWhileConsumerBlocked.getAndSet(0); + int additionalNumberOfPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0); // add newly flow permits to actual consumer.messagePermits - consumer.messagePermits.getAndAdd(additionalNumberOfPermits); + MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits); // dispatch pending permits to flow more messages: it will add more permits to dispatcher and consumer subscription.consumerFlow(consumer, additionalNumberOfPermits); } public int getAvailablePermits() { - return messagePermits.get(); + return MESSAGE_PERMITS_UPDATER.get(this); } public boolean isBlocked() { @@ -367,7 +376,7 @@ public void updateRates() { public ConsumerStats getStats() { stats.availablePermits = getAvailablePermits(); - stats.unackedMessages = unackedMessages.get(); + stats.unackedMessages = UNACKED_MESSAGES_UPDATER.get(this); stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs; return stats; } @@ -433,7 +442,7 @@ private void removePendingAcks(PositionImpl position) { int totalAckedMsgs = ackOwnedConsumer.getPendingAcks().remove(position); // unblock consumer-throttling when receives half of maxUnackedMessages => consumer can start again // consuming messages - if (((ackOwnedConsumer.unackedMessages.addAndGet(-totalAckedMsgs) <= (maxUnackedMessages / 2)) + if (((UNACKED_MESSAGES_UPDATER.addAndGet(ackOwnedConsumer, -totalAckedMsgs) <= (maxUnackedMessages / 2)) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs) && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) { ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false; @@ -450,7 +459,7 @@ public ConcurrentOpenHashMap getPendingAcks() { public void redeliverUnacknowledgedMessages() { // cleanup unackedMessage bucket and redeliver those unack-msgs again - unackedMessages.set(0); + UNACKED_MESSAGES_UPDATER.set(this, 0); blockedConsumerOnUnackedMsgs = false; // redeliver unacked-msgs subscription.redeliverUnacknowledgedMessages(this); @@ -479,21 +488,20 @@ public void redeliverUnacknowledgedMessages(List messageIds) { } } - unackedMessages.addAndGet(-totalRedeliveryMessages); + UNACKED_MESSAGES_UPDATER.addAndGet(this, -totalRedeliveryMessages); blockedConsumerOnUnackedMsgs = false; subscription.redeliverUnacknowledgedMessages(this, pendingPositions); msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages); int numberOfBlockedPermits = Math.min(totalRedeliveryMessages, - permitsReceivedWhileConsumerBlocked.get()); + PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.get(this)); // if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages if (numberOfBlockedPermits > 0) { - permitsReceivedWhileConsumerBlocked.getAndAdd(-numberOfBlockedPermits); - messagePermits.getAndAdd(numberOfBlockedPermits); + PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, -numberOfBlockedPermits); + MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits); subscription.consumerFlow(this, numberOfBlockedPermits); - } } } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 0ea9d8a68c86a..7b5bbb0470d09 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -22,7 +22,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; @@ -46,7 +46,9 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche private final PersistentTopic topic; private final ManagedCursor cursor; - private final AtomicReference activeConsumer = new AtomicReference(); + private static final AtomicReferenceFieldUpdater ACTIVE_CONSUMER_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PersistentDispatcherSingleActiveConsumer.class, Consumer.class, "activeConsumer"); + private volatile Consumer activeConsumer = null; private final CopyOnWriteArrayList consumers; private boolean havePendingRead = false; private CompletableFuture closeFuture = null; @@ -67,6 +69,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su this.partitionIndex = partitionIndex; this.subscriptionType = subscriptionType; this.readBatchSize = MaxReadBatchSize; + ACTIVE_CONSUMER_UPDATER.set(this, null); } private void pickAndScheduleActiveConsumer() { @@ -75,9 +78,9 @@ private void pickAndScheduleActiveConsumer() { consumers.sort((c1, c2) -> c1.consumerName().compareTo(c2.consumerName())); int index = partitionIndex % consumers.size(); - Consumer prevConsumer = activeConsumer.getAndSet(consumers.get(index)); + Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index)); - if (prevConsumer == activeConsumer.get()) { + if (prevConsumer == ACTIVE_CONSUMER_UPDATER.get(this)) { // Active consumer did not change. Do nothing at this point return; } @@ -90,7 +93,7 @@ private void pickAndScheduleActiveConsumer() { // let it finish and then rewind if (!havePendingRead) { cursor.rewind(); - readMoreEntries(activeConsumer.get()); + readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this)); } } @@ -115,7 +118,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE } if (consumers.isEmpty()) { - activeConsumer.set(null); + ACTIVE_CONSUMER_UPDATER.set(this, null); } if (closeFuture == null && !consumers.isEmpty()) { @@ -143,7 +146,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE */ @Override public synchronized boolean canUnsubscribe(Consumer consumer) { - return (consumers.size() == 1) && Objects.equals(consumer, activeConsumer.get()); + return (consumers.size() == 1) && Objects.equals(consumer, ACTIVE_CONSUMER_UPDATER.get(this)); } /** @@ -189,7 +192,7 @@ public synchronized void readEntriesComplete(final List entries, Object o readFailureBackoff.reduceToHalf(); - Consumer currentConsumer = activeConsumer.get(); + Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this); if (currentConsumer == null || readConsumer != currentConsumer) { // Active consumer has changed since the read request has been issued. We need to rewind the cursor and // re-issue the read request for the new consumer @@ -203,7 +206,7 @@ public synchronized void readEntriesComplete(final List entries, Object o if (future.isSuccess()) { // Schedule a new read batch operation only after the previous batch has been written to the socket synchronized (PersistentDispatcherSingleActiveConsumer.this) { - Consumer newConsumer = activeConsumer.get(); + Consumer newConsumer = ACTIVE_CONSUMER_UPDATER.get(this); if (newConsumer != null && !havePendingRead) { readMoreEntries(newConsumer); } else { @@ -222,7 +225,7 @@ public synchronized void readEntriesComplete(final List entries, Object o @Override public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { if (!havePendingRead) { - if (activeConsumer.get() == consumer) { + if (ACTIVE_CONSUMER_UPDATER.get(this) == consumer) { if (log.isDebugEnabled()) { log.debug("[{}] Trigger new read after receiving flow control message", consumer); } @@ -242,7 +245,7 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { - if (consumer != activeConsumer.get()) { + if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) { log.info("[{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend", consumer); return; @@ -320,7 +323,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj topic.getBrokerService().executor().schedule(() -> { synchronized (PersistentDispatcherSingleActiveConsumer.this) { - Consumer currentConsumer = activeConsumer.get(); + Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this); // we should retry the read if we have an active consumer and there is no pending read if (currentConsumer != null && !havePendingRead) { if (log.isDebugEnabled()) { @@ -340,7 +343,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj @Override public boolean isConsumerConnected() { - return activeConsumer.get() != null; + return ACTIVE_CONSUMER_UPDATER.get(this) != null; } @Override @@ -354,7 +357,7 @@ public SubType getType() { } public Consumer getActiveConsumer() { - return activeConsumer.get(); + return ACTIVE_CONSUMER_UPDATER.get(this); } } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java index 6e28148c48195..03256c511911b 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java @@ -19,7 +19,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; @@ -68,16 +68,16 @@ public class PersistentReplicator implements ReadEntriesCallback, DeleteCallback private final int producerQueueThreshold; - private volatile int pendingMessages = 0; - private static final AtomicIntegerFieldUpdater pendingMessagesUpdater = AtomicIntegerFieldUpdater + private static final AtomicIntegerFieldUpdater PENDING_MESSAGES_UPDATER = AtomicIntegerFieldUpdater .newUpdater(PersistentReplicator.class, "pendingMessages"); + private volatile int pendingMessages = 0; private static final int FALSE = 0; private static final int TRUE = 1; - private volatile int havePendingRead = FALSE; - private static final AtomicIntegerFieldUpdater havePendingReadUpdater = AtomicIntegerFieldUpdater + private static final AtomicIntegerFieldUpdater HAVE_PENDING_READ_UPDATER = AtomicIntegerFieldUpdater .newUpdater(PersistentReplicator.class, "havePendingRead"); + private volatile int havePendingRead = FALSE; private final Rate msgOut = new Rate(); private final Rate msgExpired = new Rate(); @@ -107,6 +107,9 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster); this.producer = null; this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, cursor); + HAVE_PENDING_READ_UPDATER.set(this, FALSE); + PENDING_MESSAGES_UPDATER.set(this, 0); + STATE_UPDATER.set(this, State.Stopped); producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); readBatchSize = Math.min(producerQueueSize, MaxReadBatchSize); @@ -123,12 +126,14 @@ enum State { Stopped, Starting, Started, Stopping } - private final AtomicReference state = new AtomicReference(State.Stopped); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PersistentReplicator.class, State.class, "state"); + private volatile State state = State.Stopped; // This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer // the end result can be disconnect. public synchronized void startProducer() { - if (this.state.get() == State.Stopping) { + if (STATE_UPDATER.get(this) == State.Stopping) { long waitTimeMs = backOff.next(); if (log.isDebugEnabled()) { log.debug( @@ -139,8 +144,8 @@ public synchronized void startProducer() { brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); return; } - State state = this.state.get(); - if (!this.state.compareAndSet(State.Stopped, State.Starting)) { + State state = STATE_UPDATER.get(this); + if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) { if (state == State.Started) { // Already running if (log.isDebugEnabled()) { @@ -161,10 +166,10 @@ public synchronized void startProducer() { cursor.rewind(); cursor.cancelPendingReadRequest(); - havePendingRead = FALSE; + HAVE_PENDING_READ_UPDATER.set(this, FALSE); this.producer = (ProducerImpl) producer; - if (this.state.compareAndSet(State.Starting, State.Started)) { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { log.info("[{}][{} -> {}] Created replicator producer", topicName, localCluster, remoteCluster); backOff.reset(); // activate cursor: so, entries can be cached @@ -174,13 +179,13 @@ public synchronized void startProducer() { } else { log.info( "[{}][{} -> {}] Replicator was stopped while creating the producer. Closing it. Replicator state: {}", - topicName, localCluster, remoteCluster, this.state.get()); - this.state.set(State.Stopping); + topicName, localCluster, remoteCluster, STATE_UPDATER.get(this)); + STATE_UPDATER.set(this, State.Stopping); closeProducerAsync(); return; } }).exceptionally(ex -> { - if (this.state.compareAndSet(State.Starting, State.Stopped)) { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { long waitTimeMs = backOff.next(); log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName, localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0); @@ -189,7 +194,7 @@ public synchronized void startProducer() { brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); } else { log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: ", topicName, - localCluster, remoteCluster, this.state.get(), ex); + localCluster, remoteCluster, STATE_UPDATER.get(this), ex); } return null; }); @@ -198,12 +203,12 @@ public synchronized void startProducer() { private synchronized CompletableFuture closeProducerAsync() { if (producer == null) { - state.set(State.Stopped); + STATE_UPDATER.set(this, State.Stopped); return CompletableFuture.completedFuture(null); } CompletableFuture future = producer.closeAsync(); future.thenRun(() -> { - state.set(State.Stopped); + STATE_UPDATER.set(this, State.Stopped); this.producer = null; // deactivate cursor after successfully close the producer this.cursor.setInactive(); @@ -220,7 +225,7 @@ private synchronized CompletableFuture closeProducerAsync() { } private void readMoreEntries() { - int availablePermits = producerQueueSize - pendingMessages; + int availablePermits = producerQueueSize - PENDING_MESSAGES_UPDATER.get(this); if (availablePermits > 0) { int messagesToRead = Math.min(availablePermits, readBatchSize); @@ -234,7 +239,7 @@ private void readMoreEntries() { } // Schedule read - if (havePendingReadUpdater.compareAndSet(this, FALSE, TRUE)) { + if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) { if (log.isDebugEnabled()) { log.debug("[{}][{} -> {}] Schedule read of {} messages", topicName, localCluster, remoteCluster, messagesToRead); @@ -326,7 +331,7 @@ public void readEntriesComplete(List entries, Object ctx) { continue; } - if (state.get() != State.Started || isLocalMessageSkippedOnce) { + if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { // The producer is not ready yet after having stopped/restarted. Drop the message because it will // recovered when the producer is ready if (log.isDebugEnabled()) { @@ -340,7 +345,7 @@ public void readEntriesComplete(List entries, Object ctx) { } // Increment pending messages for messages produced locally - pendingMessagesUpdater.incrementAndGet(this); + PENDING_MESSAGES_UPDATER.incrementAndGet(this); msgOut.recordEvent(headersAndPayload.readableBytes()); @@ -356,7 +361,7 @@ public void readEntriesComplete(List entries, Object ctx) { e); } - havePendingRead = FALSE; + HAVE_PENDING_READ_UPDATER.set(this, FALSE); if (atLeastOneMessageSentForReplication && !isWritable()) { // Don't read any more entries until the current pending entries are persisted @@ -398,7 +403,7 @@ public void sendComplete(Exception exception) { replicator.cursor.asyncDelete(entry.getPosition(), replicator, entry.getPosition()); entry.release(); - int pending = pendingMessagesUpdater.decrementAndGet(replicator); + int pending = PENDING_MESSAGES_UPDATER.decrementAndGet(replicator); // In general, we schedule a new batch read operation when the occupied queue size gets smaller than half // the max size, unless another read operation is already in progress. @@ -406,7 +411,7 @@ public void sendComplete(Exception exception) { // until we have emptied the whole queue, and at that point we will read a batch of 1 single message if the // producer is still not "writable". if (pending < replicator.producerQueueThreshold // - && replicator.havePendingRead == FALSE // + && HAVE_PENDING_READ_UPDATER.get(replicator) == FALSE // ) { if (pending == 0 || replicator.producer.isWritable()) { replicator.readMoreEntries(); @@ -472,9 +477,9 @@ public CompletableFuture getFuture() { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - if (this.state.get() != State.Started) { + if (STATE_UPDATER.get(this) != State.Started) { log.info("[{}][{} -> {}] Replicator was stopped while reading entries. Stop reading. Replicator state: {}", - topic, localCluster, remoteCluster, this.state.get()); + topic, localCluster, remoteCluster, STATE_UPDATER.get(this)); return; } @@ -494,7 +499,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } } - havePendingRead = FALSE; + HAVE_PENDING_READ_UPDATER.set(this, FALSE); brokerService.executor().schedule(this::readMoreEntries, waitTimeMillis, TimeUnit.MILLISECONDS); } @@ -605,14 +610,14 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return disconnectFuture; } - if (producer != null && (state.compareAndSet(State.Starting, State.Stopping) - || state.compareAndSet(State.Started, State.Stopping))) { + if (producer != null && (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping) + || STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping))) { log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster, remoteCluster, cursor.getMarkDeletedPosition(), cursor.getNumberOfEntriesInBacklog()); return closeProducerAsync(); } else { // If there's already a reconnection happening, signal to close it whenever it's ready - state.set(State.Stopped); + STATE_UPDATER.set(this, State.Stopped); } return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java index a3a35d4c7316d..49b21a99f9a6c 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java @@ -17,7 +17,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; @@ -61,7 +61,11 @@ public class PersistentSubscription implements Subscription { private final String topicName; private final String subName; - private final AtomicBoolean isFenced = new AtomicBoolean(false); + private static final int FALSE = 0; + private static final int TRUE = 1; + private static final AtomicIntegerFieldUpdater IS_FENCED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(PersistentSubscription.class, "isFenced"); + private volatile int isFenced = FALSE; private PersistentMessageExpiryMonitor expiryMonitor; // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold @@ -73,11 +77,12 @@ public PersistentSubscription(PersistentTopic topic, ManagedCursor cursor) { this.topicName = topic.getName(); this.subName = Codec.decode(cursor.getName()); this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, cursor); + IS_FENCED_UPDATER.set(this, FALSE); } @Override public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { - if (isFenced.get()) { + if (IS_FENCED_UPDATER.get(this) == TRUE) { log.warn("Attempting to add consumer {} on a fenced subscription", consumer); throw new SubscriptionFencedException("Subscription is fenced"); } @@ -130,10 +135,10 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE // invalid consumer remove will throw an exception // decrement usage is triggered only for valid consumer close - topic.usageCount.decrementAndGet(); + PersistentTopic.USAGE_COUNT_UPDATER.decrementAndGet(topic); if (log.isDebugEnabled()) { log.debug("[{}] [{}] [{}] Removed consumer -- count: {}", topic.getName(), subName, consumer.consumerName(), - topic.usageCount.get()); + PersistentTopic.USAGE_COUNT_UPDATER.get(topic)); } } @@ -322,7 +327,7 @@ public void findEntryComplete(Position position, Object ctx) { finalPosition = position; } - if (!isFenced.compareAndSet(false, true)) { + if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription")); return; } @@ -332,7 +337,7 @@ public void findEntryComplete(Position position, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable); } - isFenced.set(false); + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); future.completeExceptionally(new SubscriptionBusyException("Failed to disconnect consumers from subscription")); return; } @@ -344,7 +349,7 @@ public void resetComplete(Object ctx) { log.debug("[{}][{}] Successfully reset subscription to timestamp {}", topicName, subName, timestamp); } - isFenced.set(false); + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); future.complete(null); } @@ -352,7 +357,7 @@ public void resetComplete(Object ctx) { public void resetFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}][{}] Failed to reset subscription to timestamp {}", topicName, subName, timestamp, exception); - isFenced.set(false); + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); // todo - retry on InvalidCursorPositionException // or should we just ask user to retry one more time? if (exception instanceof InvalidCursorPositionException) { @@ -429,7 +434,7 @@ public CompletableFuture close() { closeFuture.completeExceptionally(new SubscriptionBusyException("Subscription has active consumers")); return closeFuture; } - isFenced.set(true); + IS_FENCED_UPDATER.set(this, TRUE); log.info("[{}][{}] Successfully fenced cursor ledger [{}]", topicName, subName, cursor); } @@ -444,7 +449,7 @@ public void closeComplete(Object ctx) { @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { - isFenced.set(false); + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); log.error("[{}][{}] Error closing cursor for subscription", topicName, subName, exception); closeFuture.completeExceptionally(new PersistenceException(exception)); @@ -464,14 +469,14 @@ public synchronized CompletableFuture disconnect() { CompletableFuture disconnectFuture = new CompletableFuture<>(); // block any further consumers on this subscription - isFenced.set(true); + IS_FENCED_UPDATER.set(this, TRUE); (dispatcher != null ? dispatcher.disconnect() : CompletableFuture.completedFuture(null)) .thenCompose(v -> close()).thenRun(() -> { log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName); disconnectFuture.complete(null); }).exceptionally(exception -> { - isFenced.set(false); + IS_FENCED_UPDATER.set(this, FALSE); log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName, exception); @@ -497,7 +502,7 @@ public CompletableFuture delete() { // cursor close handles pending delete (ack) operations this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> deleteFuture.complete(null)) .exceptionally(exception -> { - isFenced.set(false); + IS_FENCED_UPDATER.set(this, FALSE); log.error("[{}][{}] Error deleting subscription", topicName, subName, exception); deleteFuture.completeExceptionally(exception); return null; diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index 790be3d3a7cf9..6fc13a9b3aa88 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -111,7 +111,9 @@ public class PersistentTopic implements Topic, AddEntryCallback { private volatile boolean isFenced; - protected AtomicLong usageCount = new AtomicLong(0); + protected static final AtomicLongFieldUpdater USAGE_COUNT_UPDATER = + AtomicLongFieldUpdater.newUpdater(PersistentTopic.class, "usageCount"); + private volatile long usageCount = 0; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -166,6 +168,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS this.replicators = new ConcurrentOpenHashMap<>(); this.isFenced = false; this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix(); + USAGE_COUNT_UPDATER.set(this, 0); for (ManagedCursor cursor : ledger.getCursors()) { if (cursor.getName().startsWith(replicatorPrefix)) { @@ -230,9 +233,9 @@ public void addProducer(Producer producer) throws BrokerServiceException { "Producer with name '" + producer.getProducerName() + "' is already connected to topic"); } - usageCount.incrementAndGet(); + USAGE_COUNT_UPDATER.incrementAndGet(this); if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), usageCount.get()); + log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this)); } // Start replication producers if not already @@ -285,10 +288,10 @@ public void removeProducer(Producer producer) { checkArgument(producer.getTopic() == this); if (producers.remove(producer)) { // decrement usage only if this was a valid producer close - usageCount.decrementAndGet(); + USAGE_COUNT_UPDATER.decrementAndGet(this); if (log.isDebugEnabled()) { log.debug("[{}] [{}] Removed producer -- count: {}", topic, producer.getProducerName(), - usageCount.get()); + USAGE_COUNT_UPDATER.get(this)); } lastActive = System.nanoTime(); } @@ -313,10 +316,10 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri future.completeExceptionally(new TopicFencedException("Topic is temporarily unavailable")); return future; } - usageCount.incrementAndGet(); + USAGE_COUNT_UPDATER.incrementAndGet(this); if (log.isDebugEnabled()) { log.debug("[{}] [{}] [{}] Added consumer -- count: {}", topic, subscriptionName, consumerName, - usageCount.get()); + USAGE_COUNT_UPDATER.get(this)); } } finally { lock.readLock().unlock(); @@ -341,7 +344,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { consumer.close(); if (log.isDebugEnabled()) { log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, - consumer.consumerName(), usageCount.get()); + consumer.consumerName(), USAGE_COUNT_UPDATER.get(PersistentTopic.this)); } future.completeExceptionally( new BrokerServiceException("Connection was closed while the opening the cursor ")); @@ -357,7 +360,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); } - usageCount.decrementAndGet(); + USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this); future.completeExceptionally(e); } } @@ -365,7 +368,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { @Override public void openCursorFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName); - usageCount.decrementAndGet(); + USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this); future.completeExceptionally(new PersistenceException(exception)); } }, null); @@ -440,7 +443,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions) { deleteFuture.completeExceptionally(new TopicFencedException("Topic is already fenced")); return deleteFuture; } - if (usageCount.get() == 0) { + if (USAGE_COUNT_UPDATER.get(this) == 0) { isFenced = true; List> futures = Lists.newArrayList(); @@ -480,7 +483,7 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { }); } else { deleteFuture.completeExceptionally( - new TopicBusyException("Topic has " + usageCount.get() + " connected producers/consumers")); + new TopicBusyException("Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers")); } } finally { lock.writeLock().unlock(); @@ -1033,7 +1036,7 @@ public boolean isActive() { // No local consumers and no local producers return !subscriptions.isEmpty() || hasLocalProducers(); } - return usageCount.get() != 0 || !subscriptions.isEmpty(); + return USAGE_COUNT_UPDATER.get(this) != 0 || !subscriptions.isEmpty(); } @Override diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java index a466b074a0370..7b323bdc54705 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -16,10 +16,7 @@ package com.yahoo.pulsar.client.impl; import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; @@ -80,12 +77,35 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception { final String dn1 = "persistent://" + ns1 + "/my-topic"; final String dn2 = "persistent://" + ns2 + "/my-topic"; - ConsumerImpl consumer1 = spy( - (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", new ConsumerConfiguration())); - ProducerImpl producer1 = spy((ProducerImpl) pulsarClient.createProducer(dn1, new ProducerConfiguration())); - ProducerImpl producer2 = spy((ProducerImpl) pulsarClient.createProducer(dn2, new ProducerConfiguration())); - - ClientCnx clientCnx = producer1.clientCnx.get(); + ConsumerImpl cons1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", new ConsumerConfiguration()); + ProducerImpl prod1 = (ProducerImpl) pulsarClient.createProducer(dn1, new ProducerConfiguration()); + ProducerImpl prod2 = (ProducerImpl) pulsarClient.createProducer(dn2, new ProducerConfiguration()); + ConsumerImpl consumer1 = spy(cons1); + doAnswer(invocationOnMock -> cons1.getState()).when(consumer1).getState(); + doAnswer(invocationOnMock -> cons1.getClientCnx()).when(consumer1).getClientCnx(); + doAnswer(invocationOnMock -> cons1.cnx()).when(consumer1).cnx(); + doAnswer(invocationOnMock -> { + cons1.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]); + return null; + }).when(consumer1).connectionClosed(anyObject()); + ProducerImpl producer1 = spy(prod1); + doAnswer(invocationOnMock -> prod1.getState()).when(producer1).getState(); + doAnswer(invocationOnMock -> prod1.getClientCnx()).when(producer1).getClientCnx(); + doAnswer(invocationOnMock -> prod1.cnx()).when(producer1).cnx(); + doAnswer(invocationOnMock -> { + prod1.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]); + return null; + }).when(producer1).connectionClosed(anyObject()); + ProducerImpl producer2 = spy(prod2); + doAnswer(invocationOnMock -> prod2.getState()).when(producer2).getState(); + doAnswer(invocationOnMock -> prod2.getClientCnx()).when(producer2).getClientCnx(); + doAnswer(invocationOnMock -> prod2.cnx()).when(producer2).cnx(); + doAnswer(invocationOnMock -> { + prod2.connectionClosed((ClientCnx) invocationOnMock.getArguments()[0]); + return null; + }).when(producer2).connectionClosed(anyObject()); + + ClientCnx clientCnx = producer1.getClientCnx(); Field pfield = ClientCnx.class.getDeclaredField("producers"); pfield.setAccessible(true); @@ -95,6 +115,10 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception { ConcurrentLongHashMap producers = (ConcurrentLongHashMap) pfield.get(clientCnx); ConcurrentLongHashMap consumers = (ConcurrentLongHashMap) cfield.get(clientCnx); + producers.put(2, producers.get(0)); + producers.put(3, producers.get(1)); + consumers.put(1, consumers.get(0)); + producers.put(0, producer1); producers.put(1, producer2); consumers.put(0, consumer1); @@ -121,14 +145,14 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception { Thread.sleep(200); // producer1 must not be able to connect again - assertTrue(producer1.clientCnx.get() == null); - assertTrue(producer1.state.get().equals(State.Connecting)); + assertTrue(prod1.getClientCnx() == null); + assertTrue(prod1.getState().equals(State.Connecting)); // consumer1 must not be able to connect again - assertTrue(consumer1.clientCnx.get() == null); - assertTrue(consumer1.state.get().equals(State.Connecting)); + assertTrue(cons1.getClientCnx() == null); + assertTrue(cons1.getState().equals(State.Connecting)); // producer2 must have live connection - assertTrue(producer2.clientCnx.get() != null); - assertTrue(producer2.state.get().equals(State.Ready)); + assertTrue(prod2.getClientCnx() != null); + assertTrue(prod2.getState().equals(State.Ready)); // unload ns-bundle2 as well @@ -138,18 +162,21 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception { Thread.sleep(200); // producer1 must not be able to connect again - assertTrue(producer1.clientCnx.get() == null); - assertTrue(producer1.state.get().equals(State.Connecting)); + assertTrue(prod1.getClientCnx() == null); + assertTrue(prod1.getState().equals(State.Connecting)); // consumer1 must not be able to connect again - assertTrue(consumer1.clientCnx.get() == null); - assertTrue(consumer1.state.get().equals(State.Connecting)); + assertTrue(cons1.getClientCnx() == null); + assertTrue(cons1.getState().equals(State.Connecting)); // producer2 must not be able to connect again - assertTrue(producer2.clientCnx.get() == null); - assertTrue(producer2.state.get().equals(State.Connecting)); + assertTrue(prod2.getClientCnx() == null); + assertTrue(prod2.getState().equals(State.Connecting)); producer1.close(); producer2.close(); consumer1.close(); + prod1.close(); + prod2.close(); + cons1.close(); } @@ -171,10 +198,9 @@ public void testCloseBrokerService() throws Exception { final String dn1 = "persistent://" + ns1 + "/my-topic"; final String dn2 = "persistent://" + ns2 + "/my-topic"; - ConsumerImpl consumer1 = spy( - (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", new ConsumerConfiguration())); - ProducerImpl producer1 = spy((ProducerImpl) pulsarClient.createProducer(dn1, new ProducerConfiguration())); - ProducerImpl producer2 = spy((ProducerImpl) pulsarClient.createProducer(dn2, new ProducerConfiguration())); + ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", new ConsumerConfiguration()); + ProducerImpl producer1 = (ProducerImpl) pulsarClient.createProducer(dn1, new ProducerConfiguration()); + ProducerImpl producer2 = (ProducerImpl) pulsarClient.createProducer(dn2, new ProducerConfiguration()); //unload all other namespace pulsar.getBrokerService().close(); @@ -185,14 +211,14 @@ public void testCloseBrokerService() throws Exception { // [2] All clients must be disconnected and in connecting state // producer1 must not be able to connect again - assertTrue(producer1.clientCnx.get() == null); - assertTrue(producer1.state.get().equals(State.Connecting)); + assertTrue(producer1.getClientCnx() == null); + assertTrue(producer1.getState().equals(State.Connecting)); // consumer1 must not be able to connect again - assertTrue(consumer1.clientCnx.get() == null); - assertTrue(consumer1.state.get().equals(State.Connecting)); + assertTrue(consumer1.getClientCnx() == null); + assertTrue(consumer1.getState().equals(State.Connecting)); // producer2 must not be able to connect again - assertTrue(producer2.clientCnx.get() == null); - assertTrue(producer2.state.get().equals(State.Connecting)); + assertTrue(producer2.getClientCnx() == null); + assertTrue(producer2.getState().equals(State.Connecting)); producer1.close(); producer2.close(); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/MessageIdTest.java index 9dd119a9d42e2..0b9754fc03796 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/MessageIdTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/MessageIdTest.java @@ -17,17 +17,14 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -36,6 +33,11 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.mockito.cglib.proxy.Enhancer; +import org.mockito.cglib.proxy.MethodInterceptor; +import org.mockito.cglib.proxy.MethodProxy; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -273,10 +275,13 @@ public void testChecksumVersionComptability() throws Exception { final String topicName = "persistent://prop/use/ns-abc/topic1"; // 1. producer connect - Producer prod = pulsarClient.createProducer(topicName); - ProducerImpl producer = spy((ProducerImpl) prod); + ProducerImpl prod = (ProducerImpl) pulsarClient.createProducer(topicName); + ProducerImpl producer = spy(prod); // return higher version compare to broker : so, it forces client-producer to remove checksum from payload doReturn(producer.brokerChecksumSupportedVersion() + 1).when(producer).brokerChecksumSupportedVersion(); + doAnswer(invocationOnMock -> prod.getState()).when(producer).getState(); + doAnswer(invocationOnMock -> prod.getClientCnx()).when(producer).getClientCnx(); + doAnswer(invocationOnMock -> prod.cnx()).when(producer).cnx(); Consumer consumer = pulsarClient.subscribe(topicName, "my-sub"); @@ -289,6 +294,10 @@ public void testChecksumVersionComptability() throws Exception { // mock-value from brokerChecksumSupportedVersion ((PulsarClientImpl) pulsarClient).timer().stop(); + ClientCnx mockClientCnx = spy(new ClientCnx((PulsarClientImpl) pulsarClient)); + doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion(); + prod.setClientCnx(mockClientCnx); + Message msg1 = MessageBuilder.create().setContent("message-1".getBytes()).build(); CompletableFuture future1 = producer.sendAsync(msg1); @@ -298,11 +307,13 @@ public void testChecksumVersionComptability() throws Exception { // corrupt the message msg2.getData()[msg2.getData().length - 1] = '3'; // new content would be 'message-3' + prod.setClientCnx(null); + // Restart the broker to have the messages published startBroker(); // grab broker connection with mocked producer which has higher version compare to broker - producer.grabCnx(); + prod.grabCnx(); try { // it should not fail: as due to unsupported version of broker: client removes checksum and broker should @@ -310,6 +321,7 @@ public void testChecksumVersionComptability() throws Exception { future1.get(); future2.get(); } catch (Exception e) { + e.printStackTrace(); fail("Broker shouldn't verify checksum for corrupted message and it shouldn't fail"); } @@ -327,11 +339,14 @@ public void testChecksumReconnection() throws Exception { final String topicName = "persistent://prop/use/ns-abc/topic1"; // 1. producer connect - Producer prod = pulsarClient.createProducer(topicName); - ProducerImpl producer = spy((ProducerImpl) prod); + ProducerImpl prod = (ProducerImpl) pulsarClient.createProducer(topicName); + ProducerImpl producer = spy(prod); // mock: broker-doesn't support checksum (remote_version < brokerChecksumSupportedVersion) so, it forces // client-producer to perform checksum-strip from msg at reconnection doReturn(producer.brokerChecksumSupportedVersion() + 1).when(producer).brokerChecksumSupportedVersion(); + doAnswer(invocationOnMock -> prod.getState()).when(producer).getState(); + doAnswer(invocationOnMock -> prod.getClientCnx()).when(producer).getClientCnx(); + doAnswer(invocationOnMock -> prod.cnx()).when(producer).cnx(); Consumer consumer = pulsarClient.subscribe(topicName, "my-sub"); @@ -345,7 +360,7 @@ public void testChecksumReconnection() throws Exception { // set clientCnx mock to get non-checksum supported version ClientCnx mockClientCnx = spy(new ClientCnx((PulsarClientImpl) pulsarClient)); doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion(); - producer.clientCnx.set(mockClientCnx); + prod.setClientCnx(mockClientCnx); Message msg1 = MessageBuilder.create().setContent("message-1".getBytes()).build(); CompletableFuture future1 = producer.sendAsync(msg1); @@ -357,22 +372,23 @@ public void testChecksumReconnection() throws Exception { msg2.getData()[msg2.getData().length - 1] = '3'; // new content would be // 'message-3' // unset mock - producer.clientCnx.set(null); + prod.setClientCnx(null); // Restart the broker to have the messages published startBroker(); // grab broker connection with mocked producer which has higher version // compare to broker - producer.grabCnx(); + prod.grabCnx(); try { // it should not fail: as due to unsupported version of broker: // client removes checksum and broker should // ignore the checksum validation - future1.get(1, TimeUnit.SECONDS); - future2.get(1, TimeUnit.SECONDS); + future1.get(10, TimeUnit.SECONDS); + future2.get(10, TimeUnit.SECONDS); } catch (Exception e) { + e.printStackTrace(); fail("Broker shouldn't verify checksum for corrupted message and it shouldn't fail"); } diff --git a/pulsar-broker/src/test/java/org/testng/listener/TestListener.java b/pulsar-broker/src/test/java/org/testng/listener/TestListener.java index 800eb391239e7..df072e32dc6bf 100644 --- a/pulsar-broker/src/test/java/org/testng/listener/TestListener.java +++ b/pulsar-broker/src/test/java/org/testng/listener/TestListener.java @@ -39,7 +39,7 @@ public void onTestSuccess(ITestResult tr) { @Override public void onTestFailure(ITestResult tr) { log.error("------------ Test Failed - {} / {} -- attrs: {}", tr.getTestClass().getName(), - tr.getMethod().getMethodName(), tr.getAttributeNames()); + tr.getMethod().getMethodName(), tr.getAttributeNames(), tr.getThrowable()); } @Override diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java index de0450df10904..22691eba17978 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java @@ -77,7 +77,7 @@ public Message receive() throws PulsarClientException { "Cannot use receive() when a listener has been set"); } - switch (state.get()) { + switch (getState()) { case Ready: case Connecting: break; // Ok @@ -100,7 +100,7 @@ public CompletableFuture receiveAsync() { "Cannot use receive() when a listener has been set")); } - switch (state.get()) { + switch (getState()) { case Ready: case Connecting: break; // Ok @@ -130,7 +130,7 @@ public Message receive(int timeout, TimeUnit unit) throws PulsarClientException "Cannot use receive() when a listener has been set"); } - switch (state.get()) { + switch (getState()) { case Ready: case Connecting: break; // Ok diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index 2b18bb8dcc71d..3debd76731565 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -71,7 +71,9 @@ public class ConsumerImpl extends ConsumerBase { // Number of messages that have delivered to the application. Every once in a while, this number will be sent to the // broker to notify that we are ready to get (and store in the incoming messages queue) more messages - private final AtomicInteger availablePermits; + private static final AtomicIntegerFieldUpdater AVAILABLE_PERMITS_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ConsumerImpl.class, "availablePermits"); + private volatile int availablePermits = 0; private long subscribeTimeout; private final int partitionIndex; @@ -99,7 +101,7 @@ public class ConsumerImpl extends ConsumerBase { ExecutorService listenerExecutor, int partitionIndex, CompletableFuture subscribeFuture) { super(client, topic, subscription, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture); this.consumerId = client.newConsumerId(); - this.availablePermits = new AtomicInteger(0); + AVAILABLE_PERMITS_UPDATER.set(this, 0); this.subscribeTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs(); this.partitionIndex = partitionIndex; this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2; @@ -132,13 +134,13 @@ public UnAckedMessageTracker getUnAckedMessageTracker() { @Override public CompletableFuture unsubscribeAsync() { - if (state.get() == State.Closing || state.get() == State.Closed) { + if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil .failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); } final CompletableFuture unsubscribeFuture = new CompletableFuture<>(); if (isConnected()) { - state.set(State.Closing); + setState(State.Closing); long requestId = client.newRequestId(); ByteBuf unsubscribe = Commands.newUnsubscribe(consumerId, requestId); ClientCnx cnx = cnx(); @@ -148,11 +150,11 @@ public CompletableFuture unsubscribeAsync() { batchMessageAckTracker.clear(); unAckedMessageTracker.close(); unsubscribeFuture.complete(null); - state.set(State.Closed); + setState(State.Closed); }).exceptionally(e -> { log.error("[{}][{}] Failed to unsubscribe: {}", topic, subscription, e.getCause().getMessage()); unsubscribeFuture.completeExceptionally(e.getCause()); - state.set(State.Ready); + setState(State.Ready); return null; }); } else { @@ -381,9 +383,9 @@ public boolean isBatchingAckTrackerEmpty() { @Override protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType) { checkArgument(messageId instanceof MessageIdImpl); - if (state.get() != State.Ready && state.get() != State.Connecting) { + if (getState() != State.Ready && getState() != State.Connecting) { stats.incrementNumAcksFailed(); - return FutureUtil.failedFuture(new PulsarClientException("Consumer not ready. State: " + state.get())); + return FutureUtil.failedFuture(new PulsarClientException("Consumer not ready. State: " + getState())); } if (messageId instanceof BatchMessageIdImpl) { @@ -441,7 +443,7 @@ public void operationComplete(Future future) throws Exception { } else { stats.incrementNumAcksFailed(); ackFuture - .completeExceptionally(new PulsarClientException("Not connected to broker. State: " + state.get())); + .completeExceptionally(new PulsarClientException("Not connected to broker. State: " + getState())); } return ackFuture; @@ -449,7 +451,7 @@ public void operationComplete(Future future) throws Exception { @Override void connectionOpened(final ClientCnx cnx) { - clientCnx.set(cnx); + setClientCnx(cnx); cnx.registerConsumer(consumerId, this); log.info("[{}][{}] Subscribing to topic on cnx {}", topic, subscription, cnx.ctx().channel()); @@ -467,7 +469,7 @@ void connectionOpened(final ClientCnx cnx) { log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", topic, subscription, cnx.channel().remoteAddress(), consumerId); - availablePermits.set(0); + AVAILABLE_PERMITS_UPDATER.set(this, 0); // For zerosize queue : If the connection is reset and someone is waiting for the messages // or queue was not empty: send a flow command if (waitingOnReceiveForZeroQueueSize @@ -477,7 +479,7 @@ void connectionOpened(final ClientCnx cnx) { } else { // Consumer was closed while reconnecting, close the connection to make sure the broker // drops the consumer on its side - state.set(State.Closed); + setState(State.Closed); cnx.removeConsumer(consumerId); cnx.channel().close(); return; @@ -494,7 +496,7 @@ void connectionOpened(final ClientCnx cnx) { } }).exceptionally((e) -> { cnx.removeConsumer(consumerId); - if (state.get() == State.Closing || state.get() == State.Closed) { + if (getState() == State.Closing || getState() == State.Closed) { // Consumer was closed while reconnecting, close the connection to make sure the broker // drops the consumer on its side cnx.channel().close(); @@ -511,7 +513,7 @@ && isRetriableError((PulsarClientException) e.getCause()) if (!subscribeFuture.isDone()) { // unable to create new consumer, fail operation - state.set(State.Failed); + setState(State.Failed); subscribeFuture.completeExceptionally(e); client.cleanupConsumer(this); } else { @@ -538,14 +540,14 @@ void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) { @Override void connectionFailed(PulsarClientException exception) { if (System.currentTimeMillis() > subscribeTimeout && subscribeFuture.completeExceptionally(exception)) { - state.set(State.Failed); + setState(State.Failed); client.cleanupConsumer(this); } } @Override public CompletableFuture closeAsync() { - if (state.get() == State.Closing || state.get() == State.Closed) { + if (getState() == State.Closing || getState() == State.Closed) { batchMessageAckTracker.clear(); unAckedMessageTracker.close(); return CompletableFuture.completedFuture(null); @@ -553,7 +555,7 @@ public CompletableFuture closeAsync() { if (!isConnected()) { log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription); - state.set(State.Closed); + setState(State.Closed); batchMessageAckTracker.clear(); unAckedMessageTracker.close(); client.cleanupConsumer(this); @@ -565,7 +567,7 @@ public CompletableFuture closeAsync() { timeout.cancel(); } - state.set(State.Closing); + setState(State.Closing); long requestId = client.newRequestId(); ByteBuf cmd = Commands.newCloseConsumer(consumerId, requestId); @@ -576,7 +578,7 @@ public CompletableFuture closeAsync() { cnx.removeConsumer(consumerId); if (exception == null || !cnx.ctx().channel().isActive()) { log.info("[{}] [{}] Closed consumer", topic, subscription); - state.set(State.Closed); + setState(State.Closed); batchMessageAckTracker.clear(); unAckedMessageTracker.close(); closeFuture.complete(null); @@ -806,14 +808,14 @@ private void increaseAvailablePermits(ClientCnx currentCnx) { private void increaseAvailablePermits(ClientCnx currentCnx, int delta) { - int available = availablePermits.addAndGet(delta); + int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta); while (available >= receiverQueueRefillThreshold) { - if (availablePermits.compareAndSet(available, 0)) { + if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) { sendFlowPermitsToBroker(currentCnx, available); break; } else { - available = availablePermits.get(); + available = AVAILABLE_PERMITS_UPDATER.get(this); } } } @@ -877,7 +879,7 @@ String getHandlerName() { @Override public boolean isConnected() { - return clientCnx.get() != null && (state.get() == State.Ready); + return getClientCnx() != null && (getState() == State.Ready); } int getPartitionIndex() { @@ -886,7 +888,7 @@ int getPartitionIndex() { @Override public int getAvailablePermits() { - return availablePermits.get(); + return AVAILABLE_PERMITS_UPDATER.get(this); } @Override @@ -915,7 +917,7 @@ public void redeliverUnacknowledgedMessages() { } return; } - if (cnx == null || (state.get() == State.Connecting)) { + if (cnx == null || (getState() == State.Connecting)) { log.warn("[{}] Client Connection needs to be establised for redelivery of unacknowledged messages", this); } else { log.warn("[{}] Reconnecting the client to redeliver the messages.", this); @@ -959,7 +961,7 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { } return; } - if (cnx == null || (state.get() == State.Connecting)) { + if (cnx == null || (getState() == State.Connecting)) { log.warn("[{}] Client Connection needs to be establised for redelivery of unacknowledged messages", this); } else { log.warn("[{}] Reconnecting the client to redeliver the messages.", this); diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/HandlerBase.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/HandlerBase.java index b0b445f84f0f0..27628770b4fd7 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/HandlerBase.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/HandlerBase.java @@ -16,7 +16,7 @@ package com.yahoo.pulsar.client.impl; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,9 +26,13 @@ abstract class HandlerBase { protected final PulsarClientImpl client; protected final String topic; - protected AtomicReference state = new AtomicReference<>(); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, State.class, "state"); + private volatile State state = null; - protected final AtomicReference clientCnx; + private static final AtomicReferenceFieldUpdater CLIENT_CNX_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(HandlerBase.class, ClientCnx.class, "clientCnx"); + private volatile ClientCnx clientCnx = null; protected final Backoff backoff; enum State { @@ -43,20 +47,20 @@ enum State { public HandlerBase(PulsarClientImpl client, String topic) { this.client = client; this.topic = topic; - this.clientCnx = new AtomicReference<>(); this.backoff = new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS); - this.state.set(State.Uninitialized); + STATE_UPDATER.set(this, State.Uninitialized); + CLIENT_CNX_UPDATER.set(this, null); } protected void grabCnx() { - if (clientCnx.get() != null) { + if (CLIENT_CNX_UPDATER.get(this) != null) { log.warn("[{}] [{}] Client cnx already set, ignoring reconnection request", topic, getHandlerName()); return; } if (!isValidStateForReconnection()) { // Ignore connection closed when we are shutting down - log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), state); + log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), STATE_UPDATER.get(this)); return; } @@ -74,7 +78,7 @@ private Void handleConnectionError(Throwable exception) { log.warn("[{}] [{}] Error connecting to broker: {}", topic, getHandlerName(), exception.getMessage()); connectionFailed(new PulsarClientException(exception)); - State state = this.state.get(); + State state = STATE_UPDATER.get(this); if (state == State.Uninitialized || state == State.Connecting || state == State.Ready) { reconnectLater(exception); } @@ -83,15 +87,15 @@ private Void handleConnectionError(Throwable exception) { } protected void reconnectLater(Throwable exception) { - clientCnx.set(null); + CLIENT_CNX_UPDATER.set(this, null); if (!isValidStateForReconnection()) { - log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), state); + log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), STATE_UPDATER.get(this)); return; } long delayMs = backoff.next(); log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try again in {} s", topic, getHandlerName(), exception.getMessage(), delayMs / 1000.0); - state.set(State.Connecting); + STATE_UPDATER.set(this, State.Connecting); client.timer().newTimeout(timeout -> { log.info("[{}] [{}] Reconnecting after connection was closed", topic, getHandlerName()); grabCnx(); @@ -99,13 +103,13 @@ protected void reconnectLater(Throwable exception) { } protected void connectionClosed(ClientCnx cnx) { - if (clientCnx.compareAndSet(cnx, null)) { + if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) { if (!isValidStateForReconnection()) { - log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), state); + log.info("[{}] [{}] Ignoring reconnection request (state: {})", topic, getHandlerName(), STATE_UPDATER.get(this)); return; } long delayMs = backoff.next(); - state.set(State.Connecting); + STATE_UPDATER.set(this, State.Connecting); log.info("[{}] [{}] Closed connection {} -- Will try again in {} s", topic, getHandlerName(), cnx.channel(), delayMs / 1000.0); client.timer().newTimeout(timeout -> { @@ -120,7 +124,7 @@ protected void resetBackoff() { } protected ClientCnx cnx() { - return clientCnx.get(); + return CLIENT_CNX_UPDATER.get(this); } protected boolean isRetriableError(PulsarClientException e) { @@ -129,12 +133,28 @@ protected boolean isRetriableError(PulsarClientException e) { // moves the state to ready if it wasn't closed protected boolean changeToReadyState() { - return (state.compareAndSet(State.Uninitialized, State.Ready) - || state.compareAndSet(State.Connecting, State.Ready)); + return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Ready) + || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Ready)); + } + + protected State getState() { + return STATE_UPDATER.get(this); + } + + protected void setState(State s) { + STATE_UPDATER.set(this, s); + } + + protected ClientCnx getClientCnx() { + return CLIENT_CNX_UPDATER.get(this); + } + + protected void setClientCnx(ClientCnx clientCnx) { + CLIENT_CNX_UPDATER.set(this, clientCnx); } private boolean isValidStateForReconnection() { - State state = this.state.get(); + State state = STATE_UPDATER.get(this); switch (state) { case Uninitialized: case Connecting: diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java index ec99501d9c1bb..5ce7ba662d96d 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java @@ -17,14 +17,12 @@ import static com.google.common.base.Preconditions.checkArgument; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -86,7 +84,7 @@ private void start() { consumers.add(consumer); consumer.subscribeFuture().handle((cons, subscribeException) -> { if (subscribeException != null) { - state.set(State.Failed); + setState(State.Failed); subscribeFail.compareAndSet(null, subscribeException); client.cleanupConsumer(this); } @@ -95,7 +93,7 @@ private void start() { try { // We have successfully created N consumers, so we can start receiving messages now starReceivingMessages(); - state.set(State.Ready); + setState(State.Ready); subscribeFuture().complete(PartitionedConsumerImpl.this); log.info("[{}] [{}] Created partitioned consumer", topic, subscription); return null; @@ -210,7 +208,7 @@ protected CompletableFuture internalReceiveAsync() { protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType) { checkArgument(messageId instanceof MessageIdImpl); - if (state.get() != State.Ready) { + if (getState() != State.Ready) { return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); } @@ -227,11 +225,11 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack @Override public CompletableFuture unsubscribeAsync() { - if (state.get() == State.Closing || state.get() == State.Closed) { + if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil.failedFuture( new PulsarClientException.AlreadyClosedException("Partitioned Consumer was already closed")); } - state.set(State.Closing); + setState(State.Closing); AtomicReference unsubscribeFail = new AtomicReference(); AtomicInteger completed = new AtomicInteger(numPartitions); @@ -244,11 +242,11 @@ public CompletableFuture unsubscribeAsync() { } if (completed.decrementAndGet() == 0) { if (unsubscribeFail.get() == null) { - state.set(State.Closed); + setState(State.Closed); unsubscribeFuture.complete(null); log.info("[{}] [{}] Unsubscribed Partitioned Consumer", topic, subscription); } else { - state.set(State.Failed); + setState(State.Failed); unsubscribeFuture.completeExceptionally(unsubscribeFail.get()); log.error("[{}] [{}] Could not unsubscribe Partitioned Consumer", topic, subscription, unsubscribeFail.get().getCause()); @@ -267,10 +265,10 @@ public CompletableFuture unsubscribeAsync() { @Override public CompletableFuture closeAsync() { - if (state.get() == State.Closing || state.get() == State.Closed) { + if (getState() == State.Closing || getState() == State.Closed) { return CompletableFuture.completedFuture(null); } - state.set(State.Closing); + setState(State.Closing); AtomicReference closeFail = new AtomicReference(); AtomicInteger completed = new AtomicInteger(numPartitions); @@ -283,12 +281,12 @@ public CompletableFuture closeAsync() { } if (completed.decrementAndGet() == 0) { if (closeFail.get() == null) { - state.set(State.Closed); + setState(State.Closed); closeFuture.complete(null); log.info("[{}] [{}] Closed Partitioned Consumer", topic, subscription); client.cleanupConsumer(this); } else { - state.set(State.Failed); + setState(State.Failed); closeFuture.completeExceptionally(closeFail.get()); log.error("[{}] [{}] Could not close Partitioned Consumer", topic, subscription, closeFail.get().getCause()); diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedProducerImpl.java index df134c545ce4e..9f55fdf2d66dd 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedProducerImpl.java @@ -62,7 +62,7 @@ private void start() { producers.add(producer); producer.producerCreatedFuture().handle((prod, createException) -> { if (createException != null) { - state.set(State.Failed); + setState(State.Failed); createFail.compareAndSet(null, createException); } // we mark success if all the partitions are created @@ -72,7 +72,7 @@ private void start() { // created partitions if (completed.incrementAndGet() == numPartitions) { if (createFail.get() == null) { - state.set(State.Ready); + setState(State.Ready); producerCreatedFuture().complete(PartitionedProducerImpl.this); log.info("[{}] Created partitioned producer", topic); } else { @@ -94,7 +94,7 @@ private void start() { @Override public CompletableFuture sendAsync(Message message) { - switch (state.get()) { + switch (getState()) { case Ready: case Connecting: break; // Ok @@ -126,10 +126,10 @@ public boolean isConnected() { @Override public CompletableFuture closeAsync() { - if (state.get() == State.Closing || state.get() == State.Closed) { + if (getState() == State.Closing || getState() == State.Closed) { return CompletableFuture.completedFuture(null); } - state.set(State.Closing); + setState(State.Closing); AtomicReference closeFail = new AtomicReference(); AtomicInteger completed = new AtomicInteger(numPartitions); @@ -142,12 +142,12 @@ public CompletableFuture closeAsync() { } if (completed.decrementAndGet() == 0) { if (closeFail.get() == null) { - state.set(State.Closed); + setState(State.Closed); closeFuture.complete(null); log.info("[{}] Closed Partitioned Producer", topic); client.cleanupProducer(this); } else { - state.set(State.Failed); + setState(State.Failed); closeFuture.completeExceptionally(closeFail.get()); log.error("[{}] Could not close Partitioned Producer", topic, closeFail.get().getCause()); } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerBase.java index 47e14f18996d1..fc68d27585aba 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerBase.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerBase.java @@ -17,7 +17,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicReference; import com.yahoo.pulsar.client.api.Message; import com.yahoo.pulsar.client.api.MessageBuilder; diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java index e5da39985aef6..2306e82587a03 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java @@ -273,8 +273,8 @@ public void sendAsync(Message message, SendCallback callback) { private ByteBuf sendMessage(long producerId, long sequenceId, int numMessages, MessageMetadata msgMetadata, ByteBuf compressedPayload) throws IOException { ChecksumType checksumType; - if (clientCnx.get() == null - || clientCnx.get().getRemoteEndpointProtocolVersion() >= brokerChecksumSupportedVersion()) { + if (getClientCnx() == null + || getClientCnx().getRemoteEndpointProtocolVersion() >= brokerChecksumSupportedVersion()) { checksumType = ChecksumType.Crc32c; } else { checksumType = ChecksumType.None; @@ -293,7 +293,7 @@ private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBuf p } private boolean isValidProducerState(SendCallback callback) { - switch (state.get()) { + switch (getState()) { case Ready: // OK case Connecting: @@ -381,14 +381,14 @@ protected WriteInEventLoopCallback newObject(Handle handle) { @Override public CompletableFuture closeAsync() { - if (state.get() == State.Closing || state.get() == State.Closed) { + if (getState() == State.Closing || getState() == State.Closed) { return CompletableFuture.completedFuture(null); } if (!isConnected()) { log.info("[{}] [{}] Closed Producer (not connected)", topic, producerName); synchronized (this) { - state.set(State.Closed); + setState(State.Closed); client.cleanupProducer(this); pendingMessages.forEach(msg -> { msg.cmd.release(); @@ -400,7 +400,7 @@ public CompletableFuture closeAsync() { return CompletableFuture.completedFuture(null); } - state.set(State.Closing); + setState(State.Closing); Timeout timeout = sendTimeout; if (timeout != null) { @@ -423,7 +423,7 @@ public CompletableFuture closeAsync() { // connection did break in the meantime. In any case, the producer is gone. synchronized (ProducerImpl.this) { log.info("[{}] [{}] Closed Producer", topic, producerName); - state.set(State.Closed); + setState(State.Closed); pendingMessages.forEach(msg -> { msg.cmd.release(); msg.recycle(); @@ -445,11 +445,11 @@ public CompletableFuture closeAsync() { @Override public boolean isConnected() { - return clientCnx.get() != null && (state.get() == State.Ready); + return getClientCnx() != null && (getState() == State.Ready); } public boolean isWritable() { - ClientCnx cnx = clientCnx.get(); + ClientCnx cnx = getClientCnx(); return cnx != null && cnx.channel().isWritable(); } @@ -677,7 +677,7 @@ protected OpSendMsg newObject(Handle handle) { void connectionOpened(final ClientCnx cnx) { // we set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating the // producer, it will try to grab a new cnx - clientCnx.set(cnx); + setClientCnx(cnx); cnx.registerProducer(producerId, this); log.info("[{}] [{}] Creating producer on cnx {}", topic, producerName, cnx.ctx().channel()); @@ -689,7 +689,7 @@ void connectionOpened(final ClientCnx cnx) { // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately synchronized (ProducerImpl.this) { - if (state.get() == State.Closing || state.get() == State.Closed) { + if (getState() == State.Closing || getState() == State.Closed) { // Producer was closed while reconnecting, close the connection to make sure the broker // drops the producer on its side cnx.removeProducer(producerId); @@ -715,7 +715,7 @@ void connectionOpened(final ClientCnx cnx) { } }).exceptionally((e) -> { cnx.removeProducer(producerId); - if (state.get() == State.Closing || state.get() == State.Closed) { + if (getState() == State.Closing || getState() == State.Closed) { // Producer was closed while reconnecting, close the connection to make sure the broker // drops the producer on its side cnx.channel().close(); @@ -751,7 +751,7 @@ && isRetriableError((PulsarClientException) e.getCause()) // still within the initial timeout budget and we are dealing with a retriable error reconnectLater(e.getCause()); } else { - state.set(State.Failed); + setState(State.Failed); producerCreatedFuture.completeExceptionally(e.getCause()); client.cleanupProducer(this); } @@ -765,14 +765,14 @@ void connectionFailed(PulsarClientException exception) { if (System.currentTimeMillis() > createProducerTimeout && producerCreatedFuture.completeExceptionally(exception)) { log.info("[{}] Producer creation failed for producer {}", topic, producerId); - state.set(State.Failed); + setState(State.Failed); } } private void resendMessages(ClientCnx cnx) { cnx.ctx().channel().eventLoop().execute(() -> { synchronized (this) { - if (state.get() == State.Closing || state.get() == State.Closed) { + if (getState() == State.Closing || getState() == State.Closed) { // Producer was closed while reconnecting, close the connection to make sure the broker // drops the producer on its side cnx.channel().close(); diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java index 679af2d0e7224..de3ca532d216d 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java @@ -342,7 +342,7 @@ public void shutdown() throws PulsarClientException { protected CompletableFuture getConnection(final String topic) { DestinationName destinationName = DestinationName.get(topic); - return lookup.getBroker(destinationName).thenCompose((brokerAddress) -> cnxPool.getConnection(brokerAddress)); + return lookup.getBroker(destinationName).thenCompose(cnxPool::getConnection); } protected Timer timer() { diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java index a4c3a18b70afc..ae08ca4b0d4cb 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java @@ -15,18 +15,21 @@ */ package com.yahoo.pulsar.client.impl; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import com.yahoo.pulsar.client.api.Message; import com.yahoo.pulsar.client.api.MessageRouter; public class RoundRobinPartitionMessageRouterImpl implements MessageRouter { - private AtomicInteger partitionIndex = new AtomicInteger(); + private static final AtomicIntegerFieldUpdater PARTITION_INDEX_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(RoundRobinPartitionMessageRouterImpl.class, "partitionIndex"); + private volatile int partitionIndex = 0; private final int numPartitions; public RoundRobinPartitionMessageRouterImpl(int numPartitions) { this.numPartitions = numPartitions; + PARTITION_INDEX_UPDATER.set(this, 0); } @Override @@ -35,7 +38,7 @@ public int choosePartition(Message msg) { if (msg.hasKey()) { return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % numPartitions); } - return ((partitionIndex.getAndIncrement() & Integer.MAX_VALUE) % numPartitions); + return ((PARTITION_INDEX_UPDATER.getAndIncrement(this) & Integer.MAX_VALUE) % numPartitions); } } diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/collections/GrowableArrayBlockingQueue.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/collections/GrowableArrayBlockingQueue.java index 2e4c4c79cc357..7f91447ed9af2 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/collections/GrowableArrayBlockingQueue.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/collections/GrowableArrayBlockingQueue.java @@ -22,6 +22,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -42,7 +43,9 @@ public class GrowableArrayBlockingQueue extends AbstractQueue implements B private final Condition isNotEmpty = headLock.newCondition(); private T[] data; - private final AtomicInteger size = new AtomicInteger(0); + private static final AtomicIntegerFieldUpdater SIZE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(GrowableArrayBlockingQueue.class, "size"); + private volatile int size = 0; public GrowableArrayBlockingQueue() { this(64); @@ -71,10 +74,10 @@ public T remove() { public T poll() { headLock.lock(); try { - if (size.get() > 0) { + if (SIZE_UPDATER.get(this) > 0) { T item = data[headIndex.value]; headIndex.value = (headIndex.value + 1) & (data.length - 1); - size.decrementAndGet(); + SIZE_UPDATER.decrementAndGet(this); return item; } else { return null; @@ -98,7 +101,7 @@ public T element() { public T peek() { headLock.lock(); try { - if (size.get() > 0) { + if (SIZE_UPDATER.get(this) > 0) { return data[headIndex.value]; } else { return null; @@ -122,13 +125,13 @@ public void put(T e) { boolean wasEmpty = false; try { - if (size.get() == data.length) { + if (SIZE_UPDATER.get(this) == data.length) { expandArray(); } data[tailIndex.value] = e; tailIndex.value = (tailIndex.value + 1) & (data.length - 1); - if (size.getAndIncrement() == 0) { + if (SIZE_UPDATER.getAndIncrement(this) == 0) { wasEmpty = true; } } finally { @@ -163,14 +166,14 @@ public T take() throws InterruptedException { headLock.lockInterruptibly(); try { - while (size.get() == 0) { + while (SIZE_UPDATER.get(this) == 0) { isNotEmpty.await(); } T item = data[headIndex.value]; data[headIndex.value] = null; headIndex.value = (headIndex.value + 1) & (data.length - 1); - if (size.decrementAndGet() > 0) { + if (SIZE_UPDATER.decrementAndGet(this) > 0) { // There are still entries to consume isNotEmpty.signal(); } @@ -186,7 +189,7 @@ public T poll(long timeout, TimeUnit unit) throws InterruptedException { try { long timeoutNanos = unit.toNanos(timeout); - while (size.get() == 0) { + while (SIZE_UPDATER.get(this) == 0) { if (timeoutNanos <= 0) { return null; } @@ -197,7 +200,7 @@ public T poll(long timeout, TimeUnit unit) throws InterruptedException { T item = data[headIndex.value]; data[headIndex.value] = null; headIndex.value = (headIndex.value + 1) & (data.length - 1); - if (size.decrementAndGet() > 0) { + if (SIZE_UPDATER.decrementAndGet(this) > 0) { // There are still entries to consume isNotEmpty.signal(); } @@ -223,7 +226,7 @@ public int drainTo(Collection c, int maxElements) { try { int drainedItems = 0; - int size = this.size.get(); + int size = SIZE_UPDATER.get(this); while (size > 0 && drainedItems < maxElements) { T item = data[headIndex.value]; @@ -235,7 +238,7 @@ public int drainTo(Collection c, int maxElements) { ++drainedItems; } - if (this.size.addAndGet(-drainedItems) > 0) { + if (SIZE_UPDATER.addAndGet(this, -drainedItems) > 0) { // There are still entries to consume isNotEmpty.signal(); } @@ -251,14 +254,14 @@ public void clear() { headLock.lock(); try { - int size = this.size.get(); + int size = SIZE_UPDATER.get(this); for (int i = 0; i < size; i++) { data[headIndex.value] = null; headIndex.value = (headIndex.value + 1) & (data.length - 1); } - if (this.size.addAndGet(-size) > 0) { + if (SIZE_UPDATER.addAndGet(this, -size) > 0) { // There are still entries to consume isNotEmpty.signal(); } @@ -269,7 +272,7 @@ public void clear() { @Override public int size() { - return size.get(); + return SIZE_UPDATER.get(this); } @Override @@ -286,7 +289,7 @@ public String toString() { try { int headIndex = this.headIndex.value; - int size = this.size.get(); + int size = SIZE_UPDATER.get(this); sb.append('['); @@ -315,7 +318,7 @@ private void expandArray() { headLock.lock(); try { - int size = this.size.get(); + int size = SIZE_UPDATER.get(this); int newCapacity = data.length * 2; T[] newData = (T[]) new Object[newCapacity];