Skip to content

Commit

Permalink
[improve] [broker] Improve logs to troubleshooting
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Sep 6, 2023
1 parent 88231f7 commit 5fdcbc3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
callback.operationComplete();
} else {
// Need to proceed and read the last entry in the specified ledger to find out the last position
log.info("[{}] Consumer {} meta-data recover from ledger {}", ledger.getName(), name,
log.info("[{}] Cursor {} meta-data recover from ledger {}", ledger.getName(), name,
info.getCursorsLedgerId());
recoverFromLedger(info, callback);
}
Expand All @@ -529,16 +529,16 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
long ledgerId = info.getCursorsLedgerId();
OpenCallback openCallback = (rc, lh, ctx) -> {
if (log.isInfoEnabled()) {
log.info("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc);
log.info("[{}] Opened ledger {} for cursor {}. rc={}", ledger.getName(), ledgerId, name, rc);
}
if (isBkErrorNotRecoverable(rc)) {
log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
log.error("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback);
return;
} else if (rc != BKException.Code.OK) {
log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
return;
Expand All @@ -548,7 +548,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
long lastEntryInLedger = lh.getLastAddConfirmed();

if (lastEntryInLedger < 0) {
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger",
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: No entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
Expand All @@ -560,13 +560,13 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
}
if (isBkErrorNotRecoverable(rc1)) {
log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
log.error("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
} else if (rc1 != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));

callback.operationFailed(createManagedLedgerException(rc1));
Expand Down Expand Up @@ -2453,8 +2453,12 @@ List<Entry> filterReadEntries(List<Entry> entries) {

@Override
public synchronized String toString() {
return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("name", name)
.add("ackPos", markDeletePosition).add("readPos", readPosition).toString();
return MoreObjects.toStringHelper(this)
.add("ledger", ledger.getName())
.add("name", name)
.add("ackPos", markDeletePosition)
.add("readPos", readPosition)
.toString();
}

@Override
Expand Down Expand Up @@ -3068,7 +3072,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin

if (shouldCloseLedger(lh1)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Need to create new metadata ledger for consumer {}", ledger.getName(), name);
log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name);
}
startCreatingNewMetadataLedger();
}
Expand Down Expand Up @@ -3153,7 +3157,7 @@ public void operationComplete(Void result, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update consumer {}", ledger.getName(), name, e);
log.warn("[{}] Failed to update cursor metadata {}", ledger.getName(), name, e);
// it means it failed to switch the newly created ledger so, it should be
// deleted to prevent leak
deleteLedgerAsync(lh).thenRun(() -> callback.operationFailed(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ public void rewind() {

@Override
public synchronized String toString() {
return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
.add("readPos", readPosition).toString();
return MoreObjects.toStringHelper(this)
.add("ledger", ledger.getName())
.add("cursor", getName())
.add("ackPos", markDeletePosition)
.add("readPos", readPosition)
.toString();
}

private static final Logger log = LoggerFactory.getLogger(NonDurableCursorImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,8 +910,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
consumer.close();
} catch (BrokerServiceException e) {
if (e instanceof ConsumerBusyException) {
log.warn("[{}][{}] Consumer {} {} already connected",
topic, subscriptionName, consumerId, consumerName);
log.warn("[{}][{}] Consumer {} {} already connected: {}",
topic, subscriptionName, consumerId, consumerName, e.getMessage());
} else if (e instanceof SubscriptionBusyException) {
log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage());
}
Expand Down Expand Up @@ -941,8 +941,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
decrementUsageCount();

if (ex.getCause() instanceof ConsumerBusyException) {
log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
consumerName);
log.warn("[{}][{}] Consumer {} {} already connected: {}", topic, subscriptionName, consumerId,
consumerName, ex.getCause().getMessage());
Consumer consumer = null;
try {
consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
Expand Down

0 comments on commit 5fdcbc3

Please sign in to comment.