diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 9b6defd82417c2..a58ea58edafa7b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2793,6 +2793,8 @@ private void persistPositionMetaStore(long cursorsLedgerId, Position position, M new CursorAlreadyClosedException(name + " cursor already closed")))); return; } + log.info("[{}][{}] Persisting cursor metadata into metadata store (persistIndividualDeletedMessageRanges: {})", + ledger.getName(), name, persistIndividualDeletedMessageRanges); final Stat lastCursorLedgerStat = cursorLedgerStat; @@ -2807,7 +2809,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, Position position, M info.addAllProperties(buildPropertiesMap(properties)); info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties)); if (persistIndividualDeletedMessageRanges) { - info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()); + info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges(true)); if (getConfig().isDeletionAtBatchIndexLevelEnabled()) { info.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList()); } @@ -3153,7 +3155,7 @@ private static List buildStringPropertiesMap(Map return stringProperties; } - private List buildIndividualDeletedMessageRanges() { + private List buildIndividualDeletedMessageRanges(boolean forMetastore) { lock.writeLock().lock(); try { log.info("[{}] [{}] buildIndividualDeletedMessageRanges, numRanges {}", @@ -3188,19 +3190,28 @@ private List buildIndividualDeletedMessageRanges() { .setUpperEndpoint(upperPosition) .build(); - acksSerializedSize.addAndGet(messageRange.getSerializedSize()); + int currentSize = acksSerializedSize.addAndGet(messageRange.getSerializedSize()); rangeList.add(messageRange); + if (forMetastore && currentSize > (1024 * 1024 - 10 * 1024)) { + log.warn("[{}] [{}] buildIndividualDeletedMessageRanges, " + + "rangeListSize {} " + + "maxUnackedRangesToPersist {}, " + + "reached {} bytes that is too big for the metastore", + ledger.getName(), name, + rangeList.size(), + getConfig().getMaxUnackedRangesToPersist(), currentSize); + return false; + } + return rangeList.size() <= getConfig().getMaxUnackedRangesToPersist(); }); this.individualDeletedMessagesSerializedSize = acksSerializedSize.get(); individualDeletedMessages.resetDirtyKeys(); - log.info("[{}] [{}] buildIndividualDeletedMessageRanges, numRanges {} " - + "individualDeletedMessagesSerializedSize {} rangeListSize {} " + log.info("[{}] [{}] buildIndividualDeletedMessageRanges, rangeListSize {} " + "maxUnackedRangesToPersist {}", - ledger.getName(), name, individualDeletedMessages.size(), - individualDeletedMessagesSerializedSize, rangeList.size(), + ledger.getName(), name, rangeList.size(), getConfig().getMaxUnackedRangesToPersist()); return rangeList; @@ -3230,11 +3241,11 @@ private void scanIndividualDeletedMessageRanges( this.individualDeletedMessagesSerializedSize = acksSerializedSize.get(); individualDeletedMessages.resetDirtyKeys(); - log.info("[{}] [{}] scanIndividualDeletedMessageRanges, numRanges {} " - + "individualDeletedMessagesSerializedSize {} rangeListSize {} " + log.info("[{}] [{}] scanIndividualDeletedMessageRanges, " + + "rangeListSize {} " + "maxUnackedRangesToPersist {}", - ledger.getName(), name, individualDeletedMessages.size(), - individualDeletedMessagesSerializedSize, rangeCount.get(), + ledger.getName(), name, + rangeCount.get(), getConfig().getMaxUnackedRangesToPersist()); } finally { lock.readLock().unlock(); @@ -3387,27 +3398,30 @@ private void writeToBookKeeperLastChunk(LedgerHandle lh, PositionImpl position, Runnable onFinished) { lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> { - if (rc == BKException.Code.OK) { - if (log.isDebugEnabled()) { - log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position, - lh1.getId()); - } + try { + if (rc == BKException.Code.OK) { + if (log.isDebugEnabled()) { + log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, + position, + lh1.getId()); + } - rolloverLedgerIfNeeded(lh1); + rolloverLedgerIfNeeded(lh1); - mbean.persistToLedger(true); - mbean.addWriteCursorLedgerSize(data.readableBytes()); - callback.operationComplete(); - onFinished.run(); - } else { - log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name, - position, lh1.getId(), BKException.getMessage(rc)); - // If we've had a write error, the ledger will be automatically closed, we need to create a new one, - // in the meantime the mark-delete will be queued. - STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger); - - // Before giving up, try to persist the position in the metadata store. - persistPositionToMetaStore(mdEntry, callback); + mbean.persistToLedger(true); + mbean.addWriteCursorLedgerSize(data.readableBytes()); + callback.operationComplete(); + } else { + log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name, + position, lh1.getId(), BKException.getMessage(rc)); + // If we've had a write error, the ledger will be automatically closed, we need to create a new one, + // in the meantime the mark-delete will be queued. + STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger); + + // Before giving up, try to persist the position in the metadata store. + persistPositionToMetaStore(mdEntry, callback); + } + } finally { onFinished.run(); } }, null); @@ -3502,6 +3516,7 @@ boolean rolloverLedgerIfNeeded(LedgerHandle lh1) { } void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final VoidCallback callback) { + log.info("[{}][{}] Persisting cursor metadata into metadata store", ledger.getName(), name); final Position newPosition = mdEntry.newPosition; STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger); mbean.persistToLedger(false); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index a9117848e58bcd..28e60cbf5b95cd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -249,6 +249,8 @@ public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedC String path = PREFIX + ledgerName + "/" + cursorName; byte[] content = compressCursorInfo(info); + log.info("[{}] Persisting cursor={} info with content size {} bytes to metastore", + ledgerName, cursorName, content.length); long expectedVersion; @@ -267,6 +269,7 @@ public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedC .thenAcceptAsync(optStat -> callback.operationComplete(null, optStat), executor .chooseThread(ledgerName)) .exceptionally(ex -> { + log.error("[{}] [{}] Failed to update cursor info", ledgerName, cursorName, ex); executor.executeOrdered(ledgerName, () -> callback.operationFailed(getException(ex))); return null; @@ -525,6 +528,8 @@ private byte[] compressManagedInfo(byte[] info, byte[] metadata, int metadataSer compositeByteBuf.addComponent(true, encodeByteBuf); byte[] dataBytes = new byte[compositeByteBuf.readableBytes()]; compositeByteBuf.readBytes(dataBytes); + log.info("Compressed cursor info, info size {}, metadata size {}, compressed size: {}", + info.length, metadata.length, dataBytes.length); return dataBytes; } finally { if (metadataByteBuf != null) {