From 9b9a5ec40582d44283c0949e3f4cc78844c6684e Mon Sep 17 00:00:00 2001 From: li4wang <68786536+li4wang@users.noreply.github.com> Date: Thu, 25 Jan 2024 22:22:32 -0800 Subject: [PATCH] ZOOKEEPER-4785: Txn loss due to race condition in Learner.syncWithLeader() during DIFF sync (#2111) Author: Li Wang Co-authored-by: liwang --- .../server/quorum/FastLeaderElection.java | 15 +- .../quorum/FollowerZooKeeperServer.java | 34 +- .../zookeeper/server/quorum/Learner.java | 38 ++- .../zookeeper/server/quorum/DIFFSyncTest.java | 316 ++++++++++++++++++ .../server/quorum/QuorumPeerTestBase.java | 12 + 5 files changed, 390 insertions(+), 25 deletions(-) create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java index f4a5f9882c7..83555a61644 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java @@ -702,14 +702,14 @@ private void sendNotifications() { qv.toString().getBytes(UTF_8)); LOG.debug( - "Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient)," - + " {} (myid), 0x{} (n.peerEpoch) ", + "Sending Notification: {} (n.leader), 0x{} (n.peerEpoch), 0x{} (n.zxid), 0x{} (n.round), {} (recipient)," + + " {} (myid) ", proposedLeader, Long.toHexString(proposedZxid), + Long.toHexString(proposedEpoch), Long.toHexString(logicalclock.get()), sid, - self.getMyId(), - Long.toHexString(proposedEpoch)); + self.getMyId()); sendqueue.offer(notmsg); } @@ -722,12 +722,13 @@ private void sendNotifications() { */ protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug( - "id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}", + "id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}, epoch: 0x{}, proposed epoch: 0x{}", newId, curId, Long.toHexString(newZxid), - Long.toHexString(curZxid)); - + Long.toHexString(curZxid), + Long.toHexString(newEpoch), + Long.toHexString(curEpoch)); if (self.getQuorumVerifier().getWeight(newId) == 0) { return false; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index 5d7ad0b4ba3..a88176fe83d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -81,11 +81,7 @@ protected void setupRequestProcessors() { LinkedBlockingQueue pendingTxns = new LinkedBlockingQueue<>(); public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) { - Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); - request.setTxnDigest(digest); - if ((request.zxid & 0xffffffffL) != 0) { - pendingTxns.add(request); - } + final Request request = buildRequestToProcess(hdr, txn, digest); syncProcessor.processRequest(request); } @@ -101,6 +97,19 @@ public Request logRequestBeforeAckNewleader(TxnHeader hdr, Record txn, TxnDigest return request; } + /** + * Build a request for the txn and append it to the transaction log + * @param hdr the txn header + * @param txn the txn + * @param digest the digest of txn + * @return a request moving through a chain of RequestProcessors + */ + public Request appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException { + final Request request = buildRequestToProcess(hdr, txn, digest); + getZKDatabase().append(request); + return request; + } + /** * When a COMMIT message is received, eventually this method is called, * which matches up the zxid from the COMMIT with (hopefully) the head of @@ -194,4 +203,19 @@ protected void unregisterMetrics() { } + /** + * Build a request for the txn + * @param hdr the txn header + * @param txn the txn + * @param digest the digest of txn + * @return a request moving through a chain of RequestProcessors + */ + private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) { + final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); + request.setTxnDigest(digest); + if ((request.zxid & 0xffffffffL) != 0) { + pendingTxns.add(request); + } + return request; + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index e34175d4f8b..e3bd13d1165 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -556,7 +556,8 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { readPacket(qp); Deque packetsCommitted = new ArrayDeque<>(); Deque packetsNotCommitted = new ArrayDeque<>(); - Deque requestsToBeReplied = new ArrayDeque<>(); + Deque requestsToAck = new ArrayDeque<>(); + synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); @@ -746,28 +747,37 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { zk.takeSnapshot(syncSnapshot); } + writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory isPreZAB1_0 = false; - // ZOOKEEPER-3911 & 4646: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). + // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). sock.setSoTimeout(self.tickTime * self.syncLimit); self.setSyncMode(QuorumPeer.SyncMode.NONE); zk.startupWithoutServing(); if (zk instanceof FollowerZooKeeperServer) { + long startTime = Time.currentElapsedTime(); FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { - requestsToBeReplied.add(fzk.logRequestBeforeAckNewleader(p.hdr, p.rec, p.digest)); + final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest); + requestsToAck.add(request); } - packetsNotCommitted.clear(); - // persist the transaction logs + + // persist the txns to disk fzk.getZKDatabase().commit(); + LOG.info("{} txns have been persisted and it took {}ms", + packetsNotCommitted.size(), Time.currentElapsedTime() - startTime); + packetsNotCommitted.clear(); } - // ZOOKEEPER-4643: make sure to update currentEpoch only after the transaction logs are synced + // set the current epoch after all the tnxs are persisted self.setCurrentEpoch(newEpoch); + LOG.info("Set the current epoch to {}", newEpoch); + // send NEWLEADER ack after all the tnxs are persisted writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); + LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid)); break; } } @@ -786,23 +796,25 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { // We need to log the stuff that came in between the snapshot and the uptodate if (zk instanceof FollowerZooKeeperServer) { - // Reply queued ACKs that are generated before replying ACK of NEWLEADER - // ZOOKEEPER-4685: make sure to reply ACK of PROPOSAL after replying ACK of NEWLEADER. - for (Request si : requestsToBeReplied) { - QuorumPacket p = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null); - si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY); - writePacket(p, false); + // reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout + // on waiting for a quorum of followers + for (final Request request : requestsToAck) { + final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null); + writePacket(ackPacket, false); } - requestsToBeReplied.clear(); writePacket(null, true); + requestsToAck.clear(); FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotCommitted) { fzk.logRequest(p.hdr, p.rec, p.digest); } + LOG.info("{} txns have been logged asynchronously", packetsNotCommitted.size()); + for (Long zxid : packetsCommitted) { fzk.commit(zxid); } + LOG.info("{} txns have been committed", packetsCommitted.size()); } else if (zk instanceof ObserverZooKeeperServer) { // Similar to follower, we need to log requests between the snapshot // and UPTODATE diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java new file mode 100644 index 00000000000..a9be09f3973 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import javax.security.sasl.SaslException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.FinalRequestProcessor; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServerListener; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.test.ClientBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +public class DIFFSyncTest extends QuorumPeerTestBase { + private static final int SERVER_COUNT = 3; + private static final String PATH_PREFIX = "/test_"; + + private int[] clientPorts; + private MainThread[] mt; + private ZooKeeper[] zkClients; + + @BeforeEach + public void start() throws Exception { + clientPorts = new int[SERVER_COUNT]; + mt = startQuorum(clientPorts); + zkClients = new ZooKeeper[SERVER_COUNT]; + } + + @AfterEach + public void tearDown() throws Exception{ + for (final ZooKeeper zk : zkClients) { + try { + if (zk != null) { + zk.close(); + } + } catch (final InterruptedException e) { + LOG.warn("ZooKeeper interrupted while shutting it down", e); + } + } + + for (final MainThread mainThread : mt) { + try { + mainThread.shutdown(); + } catch (final InterruptedException e) { + LOG.warn("Quorum Peer interrupted while shutting it down", e); + } + } + } + + @Test + @Timeout(value = 120) + public void testTxnLoss_FailToPersistAndCommitTxns() throws Exception { + final List paths = new ArrayList<>(); + assertEquals(2, mt[2].getQuorumPeer().getLeaderId()); + + // create a ZK client to the leader (currentEpoch=1, lastLoggedZxid=<1, 1>) + createZKClient(2); + + // create a znode (currentEpoch=1, lastLoggedZxid=<1, 2>) + paths.add(createNode(zkClients[2], PATH_PREFIX + "0")); + + // shut down S0 + mt[0].shutdown(); + LOG.info("S0 shutdown."); + + // create a znode (currentEpoch=1, lastLoggedZxid=<1, 3>), so S0 is 1 txn behind + paths.add(createNode(zkClients[2], PATH_PREFIX + "1")); + logEpochsAndLastLoggedTxnForAllServers(); + + // shut down S1 + mt[1].shutdown(); + LOG.info("S1 shutdown."); + + // restart S0 and trigger a new leader election (currentEpoch=2) + // S0 starts with MockSyncRequestProcessor and MockCommitProcessor to simulate it writes the + // currentEpoch and sends NEWLEADER ACK but fails to persist and commit txns afterwards + // in DIFF sync + mt[0].start(new MockTestQPMain()); + assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0], CONNECTION_TIMEOUT), + "waiting for server 0 being up"); + LOG.info("S0 restarted."); + logEpochsAndLastLoggedTxnForAllServers(); + + // validate S2 is still the leader + assertEquals(2, mt[2].getQuorumPeer().getLeaderId()); + + // shut down the leader (i.e. S2). This causes S0 disconnects from leader, performs partial + // shutdown, fast forwards its database to the latest persisted tnx (i.e. <1, 3>) and change + // its state to LOOKING + mt[2].shutdown(); + LOG.info("S2 shutdown."); + + // start S1 and trigger a leader election (currentEpoch=3) + mt[1].start(); + assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[1], CONNECTION_TIMEOUT), + "waiting for server 1 being up"); + LOG.info("S1 restarted."); + logEpochsAndLastLoggedTxnForAllServers(); + + // validate S0 is the new leader because of it has higher epoch + assertEquals(0, mt[0].getQuorumPeer().getLeaderId()); + + // connect to the new leader (i.e. S0) (currentEpoch=3, lastLoggedZxid=<3, 1> + createZKClient(0); + + // create a znode (currentEpoch=3, lastLoggedZxid=<3, 2>) + paths.add(createNode(zkClients[0], PATH_PREFIX + "3")); + + // start S2 which is the old leader + mt[2].start(); + assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2], CONNECTION_TIMEOUT), + "waiting for server " + 2 + " being up"); + LOG.info("S2 restarted."); + logEpochsAndLastLoggedTxnForAllServers(); + + // validate all the znodes exist from all the clients + validateDataFromAllClients(paths); + } + + @Test + @Timeout(value = 120) + public void testLeaderShutdown_AckProposalBeforeAckNewLeader() throws Exception { + assertEquals(2, mt[2].getQuorumPeer().getLeaderId()); + + // create a ZK client to the leader (currentEpoch=1, lastLoggedZxid=<1, 1>) + createZKClient(2); + + // create a znode (currentEpoch=1, lastLoggedZxid=<1, 2>) + createNode(zkClients[2], PATH_PREFIX + "0"); + + // shut down S0 + mt[0].shutdown(); + LOG.info("S0 shutdown."); + + // create a znode (currentEpoch=1, lastLoggedZxid=<1, 3>), so S0 is 1 txn behind + createNode(zkClients[2], PATH_PREFIX + "1"); + logEpochsAndLastLoggedTxnForAllServers(); + + // shut down S1 + mt[1].shutdown(); + LOG.info("S1 shutdown."); + + // restart S0 and trigger a new leader election and DIFF sync (currentEpoch=2) + mt[0].start(); + assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0], CONNECTION_TIMEOUT), + "waiting for server 0 being up"); + LOG.info("S0 restarted."); + + // create a znode (currentEpoch=2, lastLoggedZxid=<2, 1>) + createNode(zkClients[2], PATH_PREFIX + "2"); + + // validate quorum is up without additional round of leader election + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != 1) { + final QuorumPeer qp = mt[i].getQuorumPeer(); + assertNotNull(qp); + assertEquals(2, qp.getCurrentEpoch()); + assertEquals(2, qp.getAcceptedEpoch()); + assertEquals("200000001", Long.toHexString(qp.getLastLoggedZxid())); + } + } + } + + private MainThread[] startQuorum(final int[] clientPorts) throws IOException { + final StringBuilder sb = new StringBuilder(); + String server; + + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + + ":participant;127.0.0.1:" + clientPorts[i]; + sb.append(server); + sb.append("\n"); + } + + final MainThread[] mt = new MainThread[SERVER_COUNT]; + + // start all the servers + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i] = new MainThread(i, clientPorts[i], sb.toString(), false); + mt[i].start(); + } + + // ensure all servers started + for (int i = 0; i < SERVER_COUNT; i++) { + assertTrue( + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT), + "waiting for server " + i + " being up"); + } + return mt; + } + + private void createZKClient(final int idx) throws Exception { + zkClients[idx] = null; + final ClientBase.CountdownWatcher watch = new ClientBase.CountdownWatcher(); + zkClients[idx] = new ZooKeeper("127.0.0.1:" + clientPorts[idx], ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + } + + private String createNode(final ZooKeeper zk, final String path) throws Exception { + final String fullPath = zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + assertNotNull(zk.exists(path, false)); + return fullPath; + } + + private static class MockTestQPMain extends TestQPMain { + @Override + protected QuorumPeer getQuorumPeer() throws SaslException { + return new TestQuorumPeer(); + } + } + + private static class TestQuorumPeer extends QuorumPeer { + public TestQuorumPeer() throws SaslException { + } + + @Override + protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { + final FollowerZooKeeperServer followerZookeeperServer = new FollowerZooKeeperServer(logFactory, this, this.getZkDb()) { + @Override + protected void setupRequestProcessors() { + RequestProcessor finalProcessor = new FinalRequestProcessor(this); + commitProcessor = new MockCommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); + commitProcessor.start(); + + firstProcessor = new FollowerRequestProcessor(this, commitProcessor); + ((FollowerRequestProcessor) firstProcessor).start(); + syncProcessor = new MockSyncRequestProcessor(this, new SendAckRequestProcessor(getFollower())); + + syncProcessor.start(); + } + }; + return new Follower(this, followerZookeeperServer); + } + } + + private static class MockSyncRequestProcessor extends SyncRequestProcessor { + public MockSyncRequestProcessor(final ZooKeeperServer zks, final RequestProcessor nextProcessor) { + super(zks, nextProcessor); + } + + @Override + public void processRequest(final Request request) { + LOG.info("Sync request for zxid {} is dropped", Long.toHexString(request.getHdr().getZxid())); + } + } + + private static class MockCommitProcessor extends CommitProcessor { + public MockCommitProcessor(final RequestProcessor nextProcessor, final String id, + final boolean matchSyncs, final ZooKeeperServerListener listener) { + + super(nextProcessor, id, matchSyncs, listener); + } + + @Override + public void commit(final Request request) { + LOG.info("Commit request for zxid {} is dropped", Long.toHexString(request.getHdr().getZxid())); + } + } + + private void logEpochsAndLastLoggedTxnForAllServers() throws Exception { + for (int i = 0; i < SERVER_COUNT; i++) { + final QuorumPeer qp = mt[i].getQuorumPeer(); + if (qp != null) { + LOG.info(String.format("server id=%d, acceptedEpoch=%d, currentEpoch=%d, lastLoggedTxn=%s", + qp.getMyId(), qp.getAcceptedEpoch(), + qp.getCurrentEpoch(), Long.toHexString(qp.getLastLoggedZxid()))); + } + } + } + + private void validateDataFromAllClients(final List paths) throws Exception{ + for (int i = 0; i < SERVER_COUNT; i++) { + if (zkClients[i] == null) { + createZKClient(i); + } + + for (final String path : paths) { + assertNotNull(zkClients[i].exists(path, false), "znode " + path + " is missing"); + } + assertEquals(3, paths.size()); + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java index 2a7b692589f..9318909593d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java @@ -318,6 +318,18 @@ public synchronized void start() { currentThread.start(); } + /** + * start the QuorumPeer with the passed TestQPMain + * + * @param testQPMain the TestQPMain to use + */ + + public synchronized void start(final TestQPMain testQPMain) { + main = testQPMain; + currentThread = new Thread(this); + currentThread.start(); + } + public TestQPMain getTestQPMain() { return new TestQPMain(); }