Skip to content

Commit

Permalink
ZOOKEEPER-4646: Committed txns may still be lost if followers crash a…
Browse files Browse the repository at this point in the history
…fter replying ACK of NEWLEADER but before writing txns to disk
  • Loading branch information
AlphaCanisMajoris committed Mar 28, 2023
1 parent 3eda8a3 commit 6677c70
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -111,6 +112,22 @@ public Socket getSocket() {

protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);

/**
* The synchronization aid to guarantee that
* in the SYNC phase, all uncommitted transactions are logged before replying ACK of NEWLEADER
*/
CountDownLatch newleaderLatch;

/**
* ACKs of PROPOSALs to be replied in the SYNC phase
* These ACKs are generated before replying ACK of NEWLEADER
*/
private final LinkedBlockingQueue<QuorumPacket> queuedAcksInSync = new LinkedBlockingQueue<>();

public void addQueuedAcksInSync(QuorumPacket pp) {
queuedAcksInSync.add(pp);
}

/**
* Time to wait after connection attempt with the Leader or LearnerMaster before this
* Learner tries to connect again.
Expand Down Expand Up @@ -750,16 +767,24 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
//Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;

// ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
// ZOOKEEPER-3911 & 4646: make sure to sync the uncommitted logs before commit them (ACK NEWLEADER).
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
int packetsNotCommittedCount = packetsNotCommitted.size();
if (packetsNotCommittedCount > 0) {
newleaderLatch = new CountDownLatch(packetsNotCommittedCount);
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
packetsNotCommitted.clear();
if (!newleaderLatch.await(self.getInitLimit() * self.getTickTime(), TimeUnit.MILLISECONDS)) {
throw new InterruptedException("Timeout while waiting for the count of NEWLEADER latch to reach zero");
}
newleaderLatch = null;
}
packetsNotCommitted.clear();
}

writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
Expand All @@ -781,6 +806,13 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {

// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
// Reply queued ACKs that are generated before replying ACK of NEWLEADER
for (QuorumPacket p : queuedAcksInSync) {
writePacket(p, false);
}
writePacket(null, true);
queuedAcksInSync.clear();

FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ public void processRequest(Request si) {
try {
si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);

// ZOOKEEPER-4685: make sure to reply ACK of PROPOSAL after replying ACK of NEWLEADER.
if (learner.newleaderLatch != null && learner.newleaderLatch.getCount() > 0) {
learner.addQueuedAcksInSync(qp);
learner.newleaderLatch.countDown();
return;
}

learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
Expand Down

0 comments on commit 6677c70

Please sign in to comment.