Skip to content

Commit

Permalink
Actually ensure TXNs are flushed to persistent storage before ACK dur…
Browse files Browse the repository at this point in the history
…ing DIFF sync
  • Loading branch information
jonmv committed Oct 3, 2022
1 parent 4aab1fd commit abaf8af
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -706,19 +706,6 @@ public void startdata() throws IOException, InterruptedException {
}

public synchronized void startup() {
startupWithServerState(State.RUNNING);
}

public synchronized void startupWithoutServing() {
startupWithServerState(State.INITIAL);
}

public synchronized void startServing() {
setState(State.RUNNING);
notifyAll();
}

private void startupWithServerState(State state) {
if (sessionTracker == null) {
createSessionTracker();
}
Expand All @@ -733,7 +720,7 @@ private void startupWithServerState(State state) {

registerMetrics();

setState(state);
setState(State.RUNNING);

requestPathMetricsCollector.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
long lastPreemptivelyLoggedZxid = -1;
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
Expand Down Expand Up @@ -755,15 +756,12 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
isPreZAB1_0 = false;

// ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
zk.getZKDatabase().append(new Request(-1, -1, p.hdr.getType(), p.hdr, p.rec, p.hdr.getZxid()));
lastPreemptivelyLoggedZxid = p.hdr.getZxid();
}
packetsNotLogged.clear();
zk.getZKDatabase().commit(); // Really ensure transactions are written to persistent storage before ACK'ing.
}

writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
Expand All @@ -773,7 +771,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
zk.startServing();
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startup();

/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
Expand All @@ -787,7 +788,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
if (p.hdr.getZxid() <= lastPreemptivelyLoggedZxid) {
fzk.getZKDatabase().processTxn(p.hdr, p.rec, p.digest);
} else {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
}
for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
Expand Down

0 comments on commit abaf8af

Please sign in to comment.