From 5fdcbc3a28df985d181ff687eea6a71309b713ea Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 6 Sep 2023 20:12:53 +0800 Subject: [PATCH] [improve] [broker] Improve logs to troubleshooting --- .../mledger/impl/ManagedCursorImpl.java | 26 +++++++++++-------- .../mledger/impl/NonDurableCursorImpl.java | 8 ++++-- .../service/persistent/PersistentTopic.java | 8 +++--- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index a2420c1c29efb..ff8e0655d03be 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -509,7 +509,7 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) { callback.operationComplete(); } else { // Need to proceed and read the last entry in the specified ledger to find out the last position - log.info("[{}] Consumer {} meta-data recover from ledger {}", ledger.getName(), name, + log.info("[{}] Cursor {} meta-data recover from ledger {}", ledger.getName(), name, info.getCursorsLedgerId()); recoverFromLedger(info, callback); } @@ -529,16 +529,16 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac long ledgerId = info.getCursorsLedgerId(); OpenCallback openCallback = (rc, lh, ctx) -> { if (log.isInfoEnabled()) { - log.info("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc); + log.info("[{}] Opened ledger {} for cursor {}. rc={}", ledger.getName(), ledgerId, name, rc); } if (isBkErrorNotRecoverable(rc)) { - log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, + log.error("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc)); // Rewind to oldest entry available initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback); return; } else if (rc != BKException.Code.OK) { - log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, + log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc)); callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc))); return; @@ -548,7 +548,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac long lastEntryInLedger = lh.getLastAddConfirmed(); if (lastEntryInLedger < 0) { - log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger", + log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No entries in ledger", ledger.getName(), ledgerId, name); // Rewind to last cursor snapshot available initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); @@ -560,13 +560,13 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed()); } if (isBkErrorNotRecoverable(rc1)) { - log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(), + log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc1)); // Rewind to oldest entry available initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; } else if (rc1 != BKException.Code.OK) { - log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(), + log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc1)); callback.operationFailed(createManagedLedgerException(rc1)); @@ -2453,8 +2453,12 @@ List filterReadEntries(List entries) { @Override public synchronized String toString() { - return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("name", name) - .add("ackPos", markDeletePosition).add("readPos", readPosition).toString(); + return MoreObjects.toStringHelper(this) + .add("ledger", ledger.getName()) + .add("name", name) + .add("ackPos", markDeletePosition) + .add("readPos", readPosition) + .toString(); } @Override @@ -3068,7 +3072,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin if (shouldCloseLedger(lh1)) { if (log.isDebugEnabled()) { - log.debug("[{}] Need to create new metadata ledger for consumer {}", ledger.getName(), name); + log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name); } startCreatingNewMetadataLedger(); } @@ -3153,7 +3157,7 @@ public void operationComplete(Void result, Stat stat) { @Override public void operationFailed(MetaStoreException e) { - log.warn("[{}] Failed to update consumer {}", ledger.getName(), name, e); + log.warn("[{}] Failed to update cursor metadata {}", ledger.getName(), name, e); // it means it failed to switch the newly created ledger so, it should be // deleted to prevent leak deleteLedgerAsync(lh).thenRun(() -> callback.operationFailed(e)); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 9d2829b1707f4..51e56158cad55 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -138,8 +138,12 @@ public void rewind() { @Override public synchronized String toString() { - return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition) - .add("readPos", readPosition).toString(); + return MoreObjects.toStringHelper(this) + .add("ledger", ledger.getName()) + .add("cursor", getName()) + .add("ackPos", markDeletePosition) + .add("readPos", readPosition) + .toString(); } private static final Logger log = LoggerFactory.getLogger(NonDurableCursorImpl.class); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c737a73660b78..557421da96fed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -910,8 +910,8 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St consumer.close(); } catch (BrokerServiceException e) { if (e instanceof ConsumerBusyException) { - log.warn("[{}][{}] Consumer {} {} already connected", - topic, subscriptionName, consumerId, consumerName); + log.warn("[{}][{}] Consumer {} {} already connected: {}", + topic, subscriptionName, consumerId, consumerName, e.getMessage()); } else if (e instanceof SubscriptionBusyException) { log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); } @@ -941,8 +941,8 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St decrementUsageCount(); if (ex.getCause() instanceof ConsumerBusyException) { - log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId, - consumerName); + log.warn("[{}][{}] Consumer {} {} already connected: {}", topic, subscriptionName, consumerId, + consumerName, ex.getCause().getMessage()); Consumer consumer = null; try { consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;