-
Notifications
You must be signed in to change notification settings - Fork 7.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ZOOKEEPER-4394: Apply only committed requests in sync with leader before NEWLEADER ACK #2152
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
cdd13ca
Fix the following issues in Learner.syncWithLeader():
AlphaCanisMajoris f95a261
Rename "packetsNotCommitted" to "packetsNotLogged" in Learner.syncWit…
AlphaCanisMajoris a5a5a1f
Remove "startupWithoutServing" and simplify the process in syncWithLe…
AlphaCanisMajoris f9fc2d9
Move self.getSyncMode(..) right after self.setCurrentEpoch(newEpoch) …
AlphaCanisMajoris File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -555,8 +555,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { | |
boolean syncSnapshot = false; | ||
readPacket(qp); | ||
Deque<Long> packetsCommitted = new ArrayDeque<>(); | ||
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>(); | ||
Deque<Request> requestsToAck = new ArrayDeque<>(); | ||
Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>(); | ||
|
||
synchronized (zk) { | ||
if (qp.getType() == Leader.DIFF) { | ||
|
@@ -645,11 +644,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { | |
self.setLastSeenQuorumVerifier(qv, true); | ||
} | ||
|
||
packetsNotCommitted.add(pif); | ||
packetsNotLogged.add(pif); | ||
break; | ||
case Leader.COMMIT: | ||
case Leader.COMMITANDACTIVATE: | ||
pif = packetsNotCommitted.peekFirst(); | ||
pif = packetsNotLogged.peekFirst(); | ||
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) { | ||
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); | ||
boolean majorChange = self.processReconfig( | ||
|
@@ -668,7 +667,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { | |
Long.toHexString(pif.hdr.getZxid())); | ||
} else { | ||
zk.processTxn(pif.hdr, pif.rec); | ||
packetsNotCommitted.remove(); | ||
packetsNotLogged.remove(); | ||
} | ||
} else { | ||
packetsCommitted.add(qp.getZxid()); | ||
|
@@ -710,7 +709,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { | |
// Apply to db directly if we haven't taken the snapshot | ||
zk.processTxn(packet.hdr, packet.rec); | ||
} else { | ||
packetsNotCommitted.add(packet); | ||
packetsNotLogged.add(packet); | ||
packetsCommitted.add(qp.getZxid()); | ||
} | ||
|
||
|
@@ -753,29 +752,55 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { | |
isPreZAB1_0 = false; | ||
|
||
// ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). | ||
sock.setSoTimeout(self.tickTime * self.syncLimit); | ||
self.setSyncMode(QuorumPeer.SyncMode.NONE); | ||
zk.startupWithoutServing(); | ||
if (zk instanceof FollowerZooKeeperServer) { | ||
if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) { | ||
long startTime = Time.currentElapsedTime(); | ||
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; | ||
for (PacketInFlight p : packetsNotCommitted) { | ||
final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest); | ||
requestsToAck.add(request); | ||
|
||
/* | ||
* @see https://github.com/apache/zookeeper/pull/1848 | ||
* Persist and process the committed txns in "packetsNotLogged" | ||
* according to "packetsCommitted", which have been committed by | ||
* the leader. For these committed proposals, there is no need to | ||
* reply ack. | ||
* | ||
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394 | ||
* Keep the outstanding proposals in "packetsNotLogged" to avoid | ||
* NullPointerException when the follower receives COMMIT packet(s) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given my comments above, then we should not clear |
||
* right after replying NEWLEADER ack. | ||
*/ | ||
while (!packetsCommitted.isEmpty()) { | ||
long zxid = packetsCommitted.removeFirst(); | ||
pif = packetsNotLogged.peekFirst(); | ||
if (pif == null) { | ||
LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid)); | ||
continue; | ||
} else if (pif.hdr.getZxid() != zxid) { | ||
LOG.warn("Committing 0x{}, but next proposal is 0x{}", | ||
Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid())); | ||
continue; | ||
} | ||
packetsNotLogged.removeFirst(); | ||
fzk.appendRequest(pif.hdr, pif.rec, pif.digest); | ||
fzk.processTxn(pif.hdr, pif.rec); | ||
} | ||
|
||
// persist the txns to disk | ||
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646 | ||
// Make sure to persist the txns to disk before replying NEWLEADER ack. | ||
fzk.getZKDatabase().commit(); | ||
LOG.info("{} txns have been persisted and it took {}ms", | ||
packetsNotCommitted.size(), Time.currentElapsedTime() - startTime); | ||
packetsNotCommitted.clear(); | ||
LOG.info("It took {}ms to persist and commit txns in packetsCommitted. " | ||
+ "{} outstanding txns left in packetsNotLogged", | ||
Time.currentElapsedTime() - startTime, packetsNotLogged.size()); | ||
} | ||
|
||
// set the current epoch after all the tnxs are persisted | ||
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4643 | ||
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4785 | ||
// Update current epoch after the committed txns are persisted | ||
self.setCurrentEpoch(newEpoch); | ||
LOG.info("Set the current epoch to {}", newEpoch); | ||
sock.setSoTimeout(self.tickTime * self.syncLimit); | ||
self.setSyncMode(QuorumPeer.SyncMode.NONE); | ||
|
||
// send NEWLEADER ack after all the tnxs are persisted | ||
// send NEWLEADER ack after the committed txns are persisted | ||
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); | ||
LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid)); | ||
break; | ||
|
@@ -784,7 +809,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { | |
} | ||
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); | ||
writePacket(ack, true); | ||
zk.startServing(); | ||
zk.startup(); | ||
/* | ||
* Update the election vote here to ensure that all members of the | ||
* ensemble report the same vote to new servers that start up and | ||
|
@@ -796,20 +821,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { | |
|
||
// We need to log the stuff that came in between the snapshot and the uptodate | ||
if (zk instanceof FollowerZooKeeperServer) { | ||
// reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout | ||
// on waiting for a quorum of followers | ||
for (final Request request : requestsToAck) { | ||
final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null); | ||
writePacket(ackPacket, false); | ||
} | ||
writePacket(null, true); | ||
requestsToAck.clear(); | ||
|
||
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; | ||
for (PacketInFlight p : packetsNotCommitted) { | ||
for (PacketInFlight p : packetsNotLogged) { | ||
fzk.logRequest(p.hdr, p.rec, p.digest); | ||
} | ||
LOG.info("{} txns have been logged asynchronously", packetsNotCommitted.size()); | ||
LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size()); | ||
|
||
for (Long zxid : packetsCommitted) { | ||
fzk.commit(zxid); | ||
|
@@ -819,7 +835,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { | |
// Similar to follower, we need to log requests between the snapshot | ||
// and UPTODATE | ||
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; | ||
for (PacketInFlight p : packetsNotCommitted) { | ||
for (PacketInFlight p : packetsNotLogged) { | ||
Long zxid = packetsCommitted.peekFirst(); | ||
if (p.hdr.getZxid() != zxid) { | ||
// log warning message if there is no matching commit | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is somewhat misunderstanding. The key is to log these committed ones, they are considered committed before election by the paper. All the reason we touch
packetsNotCommitted
here is to make sure it is notlogRequest
again inbroadcast
phase. I think it might be better to renamepacketsNotCommitted
topacketsNotLogged
as @jeffrey-xiao did in #1930. "log" is a disk operation, "commit" is an agreement. What we want here should be "log committed txns agreed in election".Coming into the implementation, new proposals could still be committed before
NEWLEADER
sinceLearnerHandler
does not issueNEWLEADER
right after these committed txns. But it does not harm us here as we are potentially to persist more but not less and new leader expect noack
for committed ones.