Skip to content

Commit

Permalink
ZOOKEEPER-4785: Txn loss due to race condition in Learner.syncWithLea…
Browse files Browse the repository at this point in the history
…der() during DIFF sync (apache#2111)

Author: Li Wang <liwang@apple.com>

Co-authored-by: liwang <liwang@apple.com>
  • Loading branch information
2 people authored and AlphaCanisMajoris committed Mar 28, 2024
1 parent 324194f commit 9b9a5ec
Show file tree
Hide file tree
Showing 5 changed files with 390 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -702,14 +702,14 @@ private void sendNotifications() {
qv.toString().getBytes(UTF_8));

LOG.debug(
"Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
+ " {} (myid), 0x{} (n.peerEpoch) ",
"Sending Notification: {} (n.leader), 0x{} (n.peerEpoch), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
+ " {} (myid) ",
proposedLeader,
Long.toHexString(proposedZxid),
Long.toHexString(proposedEpoch),
Long.toHexString(logicalclock.get()),
sid,
self.getMyId(),
Long.toHexString(proposedEpoch));
self.getMyId());

sendqueue.offer(notmsg);
}
Expand All @@ -722,12 +722,13 @@ private void sendNotifications() {
*/
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug(
"id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}",
"id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}, epoch: 0x{}, proposed epoch: 0x{}",
newId,
curId,
Long.toHexString(newZxid),
Long.toHexString(curZxid));

Long.toHexString(curZxid),
Long.toHexString(newEpoch),
Long.toHexString(curEpoch));
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,7 @@ protected void setupRequestProcessors() {
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<>();

public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
final Request request = buildRequestToProcess(hdr, txn, digest);
syncProcessor.processRequest(request);
}

Expand All @@ -101,6 +97,19 @@ public Request logRequestBeforeAckNewleader(TxnHeader hdr, Record txn, TxnDigest
return request;
}

/**
* Build a request for the txn and append it to the transaction log
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
* @return a request moving through a chain of RequestProcessors
*/
public Request appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
final Request request = buildRequestToProcess(hdr, txn, digest);
getZKDatabase().append(request);
return request;
}

/**
* When a COMMIT message is received, eventually this method is called,
* which matches up the zxid from the COMMIT with (hopefully) the head of
Expand Down Expand Up @@ -194,4 +203,19 @@ protected void unregisterMetrics() {

}

/**
* Build a request for the txn
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
* @return a request moving through a chain of RequestProcessors
*/
private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) {
final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,8 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
Deque<Request> requestsToBeReplied = new ArrayDeque<>();
Deque<Request> requestsToAck = new ArrayDeque<>();

synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
Expand Down Expand Up @@ -746,28 +747,37 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
zk.takeSnapshot(syncSnapshot);
}


writeToTxnLog = true;
//Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;

// ZOOKEEPER-3911 & 4646: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
// ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
requestsToBeReplied.add(fzk.logRequestBeforeAckNewleader(p.hdr, p.rec, p.digest));
final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest);
requestsToAck.add(request);
}
packetsNotCommitted.clear();
// persist the transaction logs

// persist the txns to disk
fzk.getZKDatabase().commit();
LOG.info("{} txns have been persisted and it took {}ms",
packetsNotCommitted.size(), Time.currentElapsedTime() - startTime);
packetsNotCommitted.clear();
}

// ZOOKEEPER-4643: make sure to update currentEpoch only after the transaction logs are synced
// set the current epoch after all the tnxs are persisted
self.setCurrentEpoch(newEpoch);
LOG.info("Set the current epoch to {}", newEpoch);

// send NEWLEADER ack after all the tnxs are persisted
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid));
break;
}
}
Expand All @@ -786,23 +796,25 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {

// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
// Reply queued ACKs that are generated before replying ACK of NEWLEADER
// ZOOKEEPER-4685: make sure to reply ACK of PROPOSAL after replying ACK of NEWLEADER.
for (Request si : requestsToBeReplied) {
QuorumPacket p = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
writePacket(p, false);
// reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout
// on waiting for a quorum of followers
for (final Request request : requestsToAck) {
final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null);
writePacket(ackPacket, false);
}
requestsToBeReplied.clear();
writePacket(null, true);
requestsToAck.clear();

FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
LOG.info("{} txns have been logged asynchronously", packetsNotCommitted.size());

for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
}
LOG.info("{} txns have been committed", packetsCommitted.size());
} else if (zk instanceof ObserverZooKeeperServer) {
// Similar to follower, we need to log requests between the snapshot
// and UPTODATE
Expand Down
Loading

0 comments on commit 9b9a5ec

Please sign in to comment.