Skip to content

Commit

Permalink
ZOOKEEPER-4394: Apply only committed requests in sync with leader bef…
Browse files Browse the repository at this point in the history
…ore NEWLEADER ACK (#2188)

Reviewers: kezhuw, anmolnar, kezhuw
Author: AlphaCanisMajoris
Closes #2152 from AlphaCanisMajoris/ZK-4643
  • Loading branch information
AlphaCanisMajoris authored Sep 20, 2024
1 parent 33184fb commit 5a2b99b
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ protected enum State {
private static volatile int maxBatchSize;

/**
* Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
* Starting size of read and write ByteArrayOutputBuffers. Default is 32 bytes.
* Flag not used for small transfers like connectResponses.
*/
public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes";
Expand Down Expand Up @@ -793,19 +793,6 @@ public void startdata() throws IOException, InterruptedException {
}

public synchronized void startup() {
startupWithServerState(State.RUNNING);
}

public synchronized void startupWithoutServing() {
startupWithServerState(State.INITIAL);
}

public synchronized void startServing() {
setState(State.RUNNING);
notifyAll();
}

private void startupWithServerState(State state) {
if (sessionTracker == null) {
createSessionTracker();
}
Expand All @@ -820,7 +807,7 @@ private void startupWithServerState(State state) {

registerMetrics();

setState(state);
setState(State.RUNNING);

requestPathMetricsCollector.start();

Expand Down Expand Up @@ -1829,7 +1816,7 @@ private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader r
int error;
if (authHelper.isSaslAuthRequired()) {
LOG.warn(
"Closing client connection due to server requires client SASL authenticaiton,"
"Closing client connection due to server requires client SASL authentication,"
+ "but client SASL authentication has failed, or client is not configured with SASL "
+ "authentication.");
error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
* @return a request moving through a chain of RequestProcessors
*/
public Request appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
final Request request = buildRequestToProcess(hdr, txn, digest);
public void appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
getZKDatabase().append(request);
return request;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ protected long nanoTime() {

/**
* Overridable helper method to simply call sock.connect(). This can be
* overriden in tests to fake connection success/failure for connectToLeader.
* overridden in tests to fake connection success/failure for connectToLeader.
*/
protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException {
sock.connect(addr, timeout);
Expand Down Expand Up @@ -555,8 +555,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
boolean syncSnapshot = false;
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
Deque<Request> requestsToAck = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();

synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
Expand Down Expand Up @@ -645,11 +644,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
self.setLastSeenQuorumVerifier(qv, true);
}

packetsNotCommitted.add(pif);
packetsNotLogged.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
pif = packetsNotCommitted.peekFirst();
pif = packetsNotLogged.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
boolean majorChange = self.processReconfig(
Expand All @@ -668,7 +667,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
Long.toHexString(pif.hdr.getZxid()));
} else {
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
packetsNotLogged.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
Expand Down Expand Up @@ -710,7 +709,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsNotLogged.add(packet);
packetsCommitted.add(qp.getZxid());
}

Expand Down Expand Up @@ -753,29 +752,55 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
isPreZAB1_0 = false;

// ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest);
requestsToAck.add(request);

/*
* @see https://github.com/apache/zookeeper/pull/1848
* Persist and process the committed txns in "packetsNotLogged"
* according to "packetsCommitted", which have been committed by
* the leader. For these committed proposals, there is no need to
* reply ack.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394
* Keep the outstanding proposals in "packetsNotLogged" to avoid
* NullPointerException when the follower receives COMMIT packet(s)
* right after replying NEWLEADER ack.
*/
while (!packetsCommitted.isEmpty()) {
long zxid = packetsCommitted.removeFirst();
pif = packetsNotLogged.peekFirst();
if (pif == null) {
LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid));
continue;
} else if (pif.hdr.getZxid() != zxid) {
LOG.warn("Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid()));
continue;
}
packetsNotLogged.removeFirst();
fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
fzk.processTxn(pif.hdr, pif.rec);
}

// persist the txns to disk
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646
// Make sure to persist the txns to disk before replying NEWLEADER ack.
fzk.getZKDatabase().commit();
LOG.info("{} txns have been persisted and it took {}ms",
packetsNotCommitted.size(), Time.currentElapsedTime() - startTime);
packetsNotCommitted.clear();
LOG.info("It took {}ms to persist and commit txns in packetsCommitted. "
+ "{} outstanding txns left in packetsNotLogged",
Time.currentElapsedTime() - startTime, packetsNotLogged.size());
}

// set the current epoch after all the tnxs are persisted
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4643
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4785
// Update current epoch after the committed txns are persisted
self.setCurrentEpoch(newEpoch);
LOG.info("Set the current epoch to {}", newEpoch);
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);

// send NEWLEADER ack after all the tnxs are persisted
// send NEWLEADER ack after the committed txns are persisted
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid));
break;
Expand All @@ -784,7 +809,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
zk.startServing();
zk.startup();
/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
Expand All @@ -796,20 +821,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {

// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
// reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout
// on waiting for a quorum of followers
for (final Request request : requestsToAck) {
final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null);
writePacket(ackPacket, false);
}
writePacket(null, true);
requestsToAck.clear();

FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
LOG.info("{} txns have been logged asynchronously", packetsNotCommitted.size());
LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size());

for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
Expand All @@ -819,7 +835,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// Similar to follower, we need to log requests between the snapshot
// and UPTODATE
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
for (PacketInFlight p : packetsNotLogged) {
Long zxid = packetsCommitted.peekFirst();
if (p.hdr.getZxid() != zxid) {
// log warning message if there is no matching commit
Expand Down
Loading

0 comments on commit 5a2b99b

Please sign in to comment.