-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for cursor reset after topic reload #315
Conversation
cc @sschepens |
@@ -240,7 +240,6 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac | |||
} | |||
|
|||
// Read the last entry in the ledger | |||
cursorLedger = lh; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, if we don't initialize cursorLedger
at the time recovery then it will be null initially and at that time internal-stat will not return correct value of cursorLedgerLastEntry
. So, is there any specific reason to not initialize it at recovery time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very good point. I removed it because it is confusing to use the same variable to store 2 different kinds of ledger handler (one read-write and the other read-only)
Anyway I think it still make sense for the internal stats. In this case we would see:
cursorLedger = -1
cursorLedgerLastEntry = -1
and that is appropriate since we are in NoLedger
state.
This below is the code that returns the -1
:
public long getCursorLedger() {
LedgerHandle lh = cursorLedger;
return lh != null ? lh.getId() : -1;
}
public long getCursorLedgerLastEntry() {
LedgerHandle lh = cursorLedger;
return lh != null ? lh.getLastAddConfirmed() : -1;
}
} else { | ||
internalFlushPendingMarkDeletes(); | ||
} | ||
internalFlushPendingMarkDeletes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
internalFlushPendingMarkDeletes
just clears previous pendingMarkDeleteOps
and not completes the callback of PendingMarkDeleteEntry
and there could be possibility if someone is waiting for the callback. One example PersistentReplicator waiting to just do debug-log and which is fine but just concern is that as it has callback so, one can wait on it..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, that is covered 😉
Take a look at internalMarkDelete()
, there's this code there:
// Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
// will ensure that no race condition will happen between the next mark-delete and the switching
// operation.
if (mdEntry.callbackGroup != null) {
// Trigger the callback for every request in the group
for (PendingMarkDeleteEntry e : mdEntry.callbackGroup) {
e.callback.markDeleteComplete(e.ctx);
}
} else {
// Only trigger the callback for the current request
mdEntry.callback.markDeleteComplete(mdEntry.ctx);
}
So, 1 single write and triggers all the callbacks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Fixes apache#315 The original `listeners` config's semantics is wrong that it mixed the `listeners` and `advertised.listeners` semantics of Kafka. So this PR adds a `kafkaAdvertisedListeners` as the listeners exposed to client, and only use `listeners` as the bind address. To avoid conflict with other protocol handlers, mark `listeners` as deprecated and use `kafkaListeners` instead. For convenience, this PR adds an `EndPoint` class to do the listener url parse work, which can be applied to both `kafkaListeners` and `kafkaAdvertisedListeners`. It also handles `SASL_XXX` protocols which were not handled before. And the related tests are added. The existed `KafkaApisTest#testBrokerHandleTopicMetadataRequest` could verify the `kafkaAdvertisedListeners` because the tests' `kafkaAdvertisedListeners` is `127.0.0.1:<port>` while `kafkaListeners` is `localhost:<port>`.
Motivation
As reported in #309, the cursor reset operation will fail after a topic reload when no new mark-delete operation was issued, since the cursor ledger was set to a read-only ledger.
Modifications
Changed the reset logic to rely on
internalAsyncMarkDelete()
which forces to create a new ledger when the cursor is inNoLedger
state.Fix #309