From 774a5d42e8342ee50395cf3626b9e7af27da849e Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 10 May 2024 10:37:44 +0800 Subject: [PATCH] [fix][broker] Fix cursor should use latest ledger config (#22644) Signed-off-by: Zixuan Liu --- .../mledger/impl/ManagedCursorImpl.java | 61 +++++++++---------- .../mledger/impl/ManagedCursorMXBeanImpl.java | 3 +- .../mledger/impl/ManagedLedgerImpl.java | 8 +-- .../mledger/impl/NonDurableCursorImpl.java | 5 +- .../bookkeeper/mledger/impl/OpReadEntry.java | 3 +- .../mledger/impl/RangeSetWrapper.java | 2 +- .../mledger/impl/ReadOnlyCursorImpl.java | 5 +- .../impl/ReadOnlyManagedLedgerImpl.java | 2 +- ...edCursorIndividualDeletedMessagesTest.java | 3 +- .../mledger/impl/ManagedCursorTest.java | 7 +-- .../mledger/impl/ManagedLedgerTest.java | 2 +- .../service/BrokerBkEnsemblesTests.java | 8 +-- .../persistent/PersistentTopicTest.java | 25 ++++++++ 13 files changed, 77 insertions(+), 57 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 3671385e60f75..35000361eca68 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -119,7 +119,6 @@ public class ManagedCursorImpl implements ManagedCursor { return 0; }; protected final BookKeeper bookkeeper; - protected final ManagedLedgerConfig config; protected final ManagedLedgerImpl ledger; private final String name; @@ -299,31 +298,30 @@ public interface VoidCallback { void operationFailed(ManagedLedgerException exception); } - ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) { + ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String cursorName) { this.bookkeeper = bookkeeper; this.cursorProperties = Collections.emptyMap(); - this.config = config; this.ledger = ledger; this.name = cursorName; this.individualDeletedMessages = new RangeSetWrapper<>(positionRangeConverter, positionRangeReverseConverter, this); - if (config.isDeletionAtBatchIndexLevelEnabled()) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { this.batchDeletedIndexes = new ConcurrentSkipListMap<>(); } else { this.batchDeletedIndexes = null; } - this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType()); + this.digestType = BookKeeper.DigestType.fromApiDigestType(getConfig().getDigestType()); STATE_UPDATER.set(this, State.Uninitialized); PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0); PENDING_READ_OPS_UPDATER.set(this, 0); RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE); WAITING_READ_OP_UPDATER.set(this, null); - this.clock = config.getClock(); + this.clock = getConfig().getClock(); this.lastActive = this.clock.millis(); this.lastLedgerSwitchTimestamp = this.clock.millis(); - if (config.getThrottleMarkDelete() > 0.0) { - markDeleteLimiter = RateLimiter.create(config.getThrottleMarkDelete()); + if (getConfig().getThrottleMarkDelete() > 0.0) { + markDeleteLimiter = RateLimiter.create(getConfig().getThrottleMarkDelete()); } else { // Disable mark-delete rate limiter markDeleteLimiter = null; @@ -343,7 +341,7 @@ public Map getProperties() { @Override public boolean isCursorDataFullyPersistable() { - return individualDeletedMessages.size() <= config.getMaxUnackedRangesToPersist(); + return individualDeletedMessages.size() <= getConfig().getMaxUnackedRangesToPersist(); } @Override @@ -607,7 +605,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac if (positionInfo.getIndividualDeletedMessagesCount() > 0) { recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); } - if (config.isDeletionAtBatchIndexLevelEnabled() + if (getConfig().isDeletionAtBatchIndexLevelEnabled() && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) { recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); } @@ -616,7 +614,8 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac }, null); }; try { - bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null); + bookkeeper.asyncOpenLedger(ledgerId, digestType, getConfig().getPassword(), openCallback, + null); } catch (Throwable t) { log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}", ledger.getName(), ledgerId, name, t); @@ -973,10 +972,10 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re // Check again for new entries after the configured time, then if still no entries are available register // to be notified - if (config.getNewEntriesCheckDelayInMillis() > 0) { + if (getConfig().getNewEntriesCheckDelayInMillis() > 0) { ledger.getScheduledExecutor() .schedule(() -> checkForNewEntries(op, callback, ctx), - config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS); + getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS); } else { // If there's no delay, check directly from the same thread checkForNewEntries(op, callback, ctx); @@ -1324,7 +1323,7 @@ public void operationComplete() { lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), null, null); individualDeletedMessages.clear(); - if (config.isDeletionAtBatchIndexLevelEnabled()) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle); batchDeletedIndexes.clear(); long[] resetWords = newReadPosition.ackSet; @@ -1583,7 +1582,7 @@ protected long getNumberOfEntries(Range range) { lock.readLock().lock(); try { - if (config.isUnackedRangesOpenCacheSetEnabled()) { + if (getConfig().isUnackedRangesOpenCacheSetEnabled()) { int cardinality = individualDeletedMessages.cardinality( range.lowerEndpoint().ledgerId, range.lowerEndpoint().entryId, range.upperEndpoint().ledgerId, range.upperEndpoint().entryId); @@ -1963,7 +1962,7 @@ public void asyncMarkDelete(final Position position, Map propertie PositionImpl newPosition = (PositionImpl) position; - if (config.isDeletionAtBatchIndexLevelEnabled()) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { if (newPosition.ackSet != null) { AtomicReference bitSetRecyclable = new AtomicReference<>(); BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(newPosition.ackSet); @@ -2146,7 +2145,7 @@ public void operationComplete() { try { individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()); - if (config.isDeletionAtBatchIndexLevelEnabled()) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { Map subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST, false, PositionImpl.get(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()), true); @@ -2284,7 +2283,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } if (isMessageDeleted(position)) { - if (config.isDeletionAtBatchIndexLevelEnabled()) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); if (bitSetRecyclable != null) { bitSetRecyclable.recycle(); @@ -2296,7 +2295,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb continue; } if (position.ackSet == null) { - if (config.isDeletionAtBatchIndexLevelEnabled()) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); if (bitSetRecyclable != null) { bitSetRecyclable.recycle(); @@ -2313,7 +2312,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name, individualDeletedMessages); } - } else if (config.isDeletionAtBatchIndexLevelEnabled()) { + } else if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet); BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet); if (givenBitSet != bitSet) { @@ -2660,8 +2659,8 @@ public void operationFailed(MetaStoreException e) { private boolean shouldPersistUnackRangesToLedger() { return cursorLedger != null && !isCursorLedgerReadOnly - && config.getMaxUnackedRangesToPersist() > 0 - && individualDeletedMessages.size() > config.getMaxUnackedRangesToPersistInMetadataStore(); + && getConfig().getMaxUnackedRangesToPersist() > 0 + && individualDeletedMessages.size() > getConfig().getMaxUnackedRangesToPersistInMetadataStore(); } private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map properties, @@ -2686,7 +2685,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties)); if (persistIndividualDeletedMessageRanges) { info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()); - if (config.isDeletionAtBatchIndexLevelEnabled()) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { info.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList()); } } @@ -2951,7 +2950,7 @@ public void operationFailed(ManagedLedgerException exception) { private CompletableFuture doCreateNewMetadataLedger() { CompletableFuture future = new CompletableFuture<>(); - ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> { + ledger.asyncCreateLedger(bookkeeper, getConfig(), digestType, (rc, lh, ctx) -> { if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) { future.complete(null); @@ -3056,7 +3055,7 @@ private List buildIndividualDeletedMessageRanges() { acksSerializedSize.addAndGet(messageRange.getSerializedSize()); rangeList.add(messageRange); - return rangeList.size() <= config.getMaxUnackedRangesToPersist(); + return rangeList.size() <= getConfig().getMaxUnackedRangesToPersist(); }); this.individualDeletedMessagesSerializedSize = acksSerializedSize.get(); @@ -3070,7 +3069,7 @@ private List buildIndividualDeletedMessageRanges() { private List buildBatchEntryDeletionIndexInfoList() { lock.readLock().lock(); try { - if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) { + if (!getConfig().isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) { return Collections.emptyList(); } MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo @@ -3079,7 +3078,7 @@ private List buildBatchEntryDeletio .BatchedEntryDeletionIndexInfo.newBuilder(); List result = new ArrayList<>(); Iterator> iterator = batchDeletedIndexes.entrySet().iterator(); - while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) { + while (iterator.hasNext() && result.size() < getConfig().getMaxBatchDeletedIndexToPersist()) { Map.Entry entry = iterator.next(); nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId()); nestedPositionBuilder.setEntryId(entry.getKey().getEntryId()); @@ -3199,8 +3198,8 @@ public void operationFailed(MetaStoreException e) { boolean shouldCloseLedger(LedgerHandle lh) { long now = clock.millis(); if (ledger.getFactory().isMetadataServiceAvailable() - && (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger() - || lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000)) + && (lh.getLastAddConfirmed() >= getConfig().getMetadataMaxEntriesPerLedger() + || lastLedgerSwitchTimestamp < (now - getConfig().getLedgerRolloverTimeout() * 1000)) && (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) { // It's safe to modify the timestamp since this method will be only called from a callback, implying that // calls will be serialized on one single thread @@ -3556,7 +3555,7 @@ private ManagedCursorImpl cursorImpl() { @Override public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) { - if (config.isDeletionAtBatchIndexLevelEnabled()) { + if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSet = batchDeletedIndexes.get(position); return bitSet == null ? null : bitSet.toLongArray(); } else { @@ -3657,7 +3656,7 @@ public boolean isCacheReadEntry() { private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class); public ManagedLedgerConfig getConfig() { - return config; + return getManagedLedger().getConfig(); } /*** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java index 48465e6294b0e..a183c0d61ce16 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java @@ -90,7 +90,8 @@ public long getPersistZookeeperErrors() { @Override public void addWriteCursorLedgerSize(final long size) { - writeCursorLedgerSize.add(size * ((ManagedCursorImpl) managedCursor).config.getWriteQuorumSize()); + writeCursorLedgerSize.add( + size * managedCursor.getManagedLedger().getConfig().getWriteQuorumSize()); writeCursorLedgerLogicalSize.add(size); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index b12346cadc96a..ab32806fbae84 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -575,7 +575,7 @@ public void operationComplete(List consumers, Stat s) { for (final String cursorName : consumers) { log.info("[{}] Loading cursor {}", name, cursorName); final ManagedCursorImpl cursor; - cursor = new ManagedCursorImpl(bookKeeper, config, ManagedLedgerImpl.this, cursorName); + cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName); cursor.recover(new VoidCallback() { @Override @@ -606,7 +606,7 @@ public void operationFailed(ManagedLedgerException exception) { log.debug("[{}] Recovering cursor {} lazily", name, cursorName); } final ManagedCursorImpl cursor; - cursor = new ManagedCursorImpl(bookKeeper, config, ManagedLedgerImpl.this, cursorName); + cursor = new ManagedCursorImpl(bookKeeper, ManagedLedgerImpl.this, cursorName); CompletableFuture cursorRecoveryFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorRecoveryFuture); @@ -988,7 +988,7 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP if (log.isDebugEnabled()) { log.debug("[{}] Creating new cursor: {}", name, cursorName); } - final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, config, this, cursorName); + final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, this, cursorName); CompletableFuture cursorFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorFuture); PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition(); @@ -1121,7 +1121,7 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu return cachedCursor; } - NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, config, this, cursorName, + NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, this, cursorName, (PositionImpl) startCursorPosition, initialPosition, isReadCompacted); cursor.setActive(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 77216ce2e4588..734eab20bc58e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -25,7 +25,6 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.slf4j.Logger; @@ -35,10 +34,10 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { private final boolean readCompacted; - NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName, + NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, String cursorName, PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition, boolean isReadCompacted) { - super(bookkeeper, config, ledger, cursorName); + super(bookkeeper, ledger, cursorName); this.readCompacted = isReadCompacted; // Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index a79ba3fb5e23b..534ef3d76cb0d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -111,7 +111,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { callback.readEntriesComplete(entries, ctx); recycle(); }); - } else if (cursor.config.isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) { + } else if (cursor.getConfig().isAutoSkipNonRecoverableData() + && exception instanceof NonRecoverableLedgerException) { log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(), readPosition, exception.getMessage()); final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java index 02e43504482d8..f235ffc63ace5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java @@ -52,7 +52,7 @@ public RangeSetWrapper(LongPairConsumer rangeConverter, RangeBoundConsumer rangeBoundConsumer, ManagedCursorImpl managedCursor) { requireNonNull(managedCursor); - this.config = managedCursor.getConfig(); + this.config = managedCursor.getManagedLedger().getConfig(); this.rangeConverter = rangeConverter; this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled() ? new ConcurrentOpenLongPairRangeSet<>(4096, rangeConverter) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java index 1661613f07d7d..2461bcf780e99 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java @@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.AsyncCallbacks; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; @@ -31,9 +30,9 @@ @Slf4j public class ReadOnlyCursorImpl extends ManagedCursorImpl implements ReadOnlyCursor { - public ReadOnlyCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, + public ReadOnlyCursorImpl(BookKeeper bookkeeper, ManagedLedgerImpl ledger, PositionImpl startPosition, String cursorName) { - super(bookkeeper, config, ledger, cursorName); + super(bookkeeper, ledger, cursorName); if (startPosition.equals(PositionImpl.EARLIEST)) { readPosition = ledger.getFirstPosition().getNext(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java index 707b71c9d9f09..d844963599995 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java @@ -143,7 +143,7 @@ ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) { } } - return new ReadOnlyCursorImpl(bookKeeper, config, this, startPosition, "read-only-cursor"); + return new ReadOnlyCursorImpl(bookKeeper, this, startPosition, "read-only-cursor"); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java index aa0d04783d991..864c25c6c434b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java @@ -56,8 +56,9 @@ void testRecoverIndividualDeletedMessages() throws Exception { ManagedLedgerImpl ledger = mock(ManagedLedgerImpl.class); doReturn(ledgersInfo).when(ledger).getLedgersInfo(); + doReturn(config).when(ledger).getConfig(); - ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper, config, ledger, "test-cursor")); + ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper, ledger, "test-cursor")); LongPairRangeSet deletedMessages = cursor.getIndividuallyDeletedMessagesSet(); Method recoverMethod = ManagedCursorImpl.class.getDeclaredMethod("recoverIndividualDeletedMessages", diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 5c10533e2476b..4c95454e33a92 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -3465,10 +3465,10 @@ public Object answer(InvocationOnMock invocation) { when(ml.getNextValidLedger(markDeleteLedgerId)).thenReturn(3L); when(ml.getNextValidPosition(lastPosition)).thenReturn(nextPosition); when(ml.ledgerExists(markDeleteLedgerId)).thenReturn(false); + when(ml.getConfig()).thenReturn(new ManagedLedgerConfig()); BookKeeper mockBookKeeper = mock(BookKeeper.class); - final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ml, - cursorName); + final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, ml, cursorName); cursor.recover(new VoidCallback() { @Override @@ -4772,8 +4772,7 @@ public void testRecoverCursorWithTerminateManagedLedger() throws Exception { // Reopen the ledger. ledger = (ManagedLedgerImpl) factory.open(mlName, config); BookKeeper mockBookKeeper = mock(BookKeeper.class); - final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ledger, - cursorName); + final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, ledger, cursorName); CompletableFuture recoverFuture = new CompletableFuture<>(); // Recover the cursor. diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index e983523c1b62e..122bada487a44 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3159,7 +3159,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { // (2) test read-timeout for: ManagedLedger.asyncReadEntry(..) AtomicReference responseException2 = new AtomicReference<>(); PositionImpl readPositionRef = PositionImpl.EARLIEST; - ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, "cursor1"); + ManagedCursorImpl cursor = new ManagedCursorImpl(bk, ledger, "cursor1"); OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef, 1, new ReadEntriesCallback() { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 42b9358911a69..82892ad353aa1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -210,10 +210,8 @@ public void testSkipCorruptDataLedger() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); - Field configField = ManagedCursorImpl.class.getDeclaredField("config"); - configField.setAccessible(true); // Create multiple data-ledger - ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor); + ManagedLedgerConfig config = ml.getConfig(); config.setMaxEntriesPerLedger(entriesPerLedger); config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); // bookkeeper client @@ -323,10 +321,8 @@ public void testTruncateCorruptDataLedger() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); - Field configField = ManagedCursorImpl.class.getDeclaredField("config"); - configField.setAccessible(true); // Create multiple data-ledger - ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor); + ManagedLedgerConfig config = ml.getConfig(); config.setMaxEntriesPerLedger(entriesPerLedger); config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); // bookkeeper client diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index d523586c2e2d3..5b750a0b9c2e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -66,6 +66,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.PrometheusMetricsTestUtil; @@ -754,6 +755,30 @@ public void testDynamicConfigurationAutoSkipNonRecoverableData() throws Exceptio admin.topics().delete(topicName); } + @Test + public void testCursorGetConfigAfterTopicPoliciesChanged() throws Exception { + final String topicName = "persistent://prop/ns-abc/" + UUID.randomUUID(); + final String subName = "test_sub"; + + @Cleanup + Consumer subscribe = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + PersistentSubscription subscription = persistentTopic.getSubscription(subName); + + int maxConsumers = 100; + admin.topicPolicies().setMaxConsumers(topicName, 100); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.topicPolicies().getMaxConsumers(topicName, false), maxConsumers); + }); + + ManagedCursorImpl cursor = (ManagedCursorImpl) subscription.getCursor(); + assertEquals(cursor.getConfig(), persistentTopic.getManagedLedger().getConfig()); + + subscribe.close(); + admin.topics().delete(topicName); + } + @Test public void testAddWaitingCursorsForNonDurable() throws Exception { final String ns = "prop/ns-test";