From e670dac3fbf3324027d28aa1a83ecf78898baf7e Mon Sep 17 00:00:00 2001 From: jonmv Date: Fri, 23 Sep 2022 14:53:47 +0200 Subject: [PATCH 01/15] Add failing unit test for ZOOKEEPER-4541 --- .../zookeeper/server/quorum/Zab1_0Test.java | 276 ++++++++++++++++++ 1 file changed, 276 insertions(+) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 3bdbcd908dc..2a8607f6253 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -23,6 +23,7 @@ import static org.apache.zookeeper.server.quorum.ZabUtils.createMockLeader; import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -35,13 +36,18 @@ import java.io.EOFException; import java.io.File; import java.io.FileReader; +import java.io.Flushable; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; @@ -51,11 +57,16 @@ import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.ByteBufferInputStream; import org.apache.zookeeper.server.ByteBufferOutputStream; import org.apache.zookeeper.server.DataTree; +import org.apache.zookeeper.server.FinalRequestProcessor; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; @@ -701,6 +712,271 @@ private void proposeSetData(QuorumPacket qp, long zxid, String data, int version }); } + /** + * Tests a follower that has queued transactions in SyncRequestProcessor that are also already + * committed, with leader getting quorum for those transactions elsewhere in the ensemble, and + * then that the leader shuts down, triggering a new leader election, and partial resetting of + * state in the follower. + * In particular, this test was written to verify a bug where LearnerZooKeeperServer was not + * shut down, because shutdown() was erroneously called on the super class ZooKeeperServer, + * which led to its SyncRequestProcessor not being flushed during shutdown, and any queued + * transactions lost. This would only happen if the SyncRequestProcessor also crashed; this + * would happen as a consequence of the leader going down, causing the SendAckRequestProcessor + * to throw, and kill the sync thread. + * In the subsequent leader election, the quorum peer would use the committed state, even though + * this was not yet flushed to persistent storage, and never would be, after the sync thread died. + * If the correct server had been shut down, the queued transactions would instead either be + * flushed to persistent storage when the quorum peer shut down the old follower, or this would + * fail, causing state to be recreated from whatever state was already flushed, which again would + * be corrected in a DIFF from the new leader. + * + * @author jonmv + */ + @Test + public void testFollowerWithPendingSyncsOnLeaderReElection() throws Exception { + CountDownLatch followerSetUp = new CountDownLatch(1); + class BlockingRequestProcessor implements RequestProcessor, Flushable { + final Phaser phaser = new Phaser(2); // SyncRequestProcessor and test thread + final SendAckRequestProcessor nextProcessor; // SendAckRequestProcessor + BlockingRequestProcessor(SendAckRequestProcessor nextProcessor) { this.nextProcessor = nextProcessor; } + @Override public void processRequest(Request request) throws RequestProcessorException { + nextProcessor.processRequest(request); + } + @Override public void shutdown() { + phaser.forceTermination(); + nextProcessor.shutdown(); + } + @Override public void flush() throws IOException { + phaser.arriveAndAwaitAdvance(); // Let test thread know we're flushing. + phaser.arriveAndAwaitAdvance(); // Let test thread do more stuff while we wait here, simulating slow fsync, etc.. + nextProcessor.flush(); + } + + } + + class BlockingFollowerZooKeeperServer extends FollowerZooKeeperServer { + BlockingRequestProcessor blocker; + BlockingFollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { + super(logFactory, self, zkDb); + } + @Override protected void setupRequestProcessors() { + RequestProcessor finalProcessor = new FinalRequestProcessor(this); + commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); + commitProcessor.start(); + firstProcessor = new FollowerRequestProcessor(this, commitProcessor); + ((FollowerRequestProcessor) firstProcessor).start(); + blocker = new BlockingRequestProcessor(new SendAckRequestProcessor(getFollower())); + syncProcessor = new SyncRequestProcessor(this, blocker); + syncProcessor.start(); + followerSetUp.countDown(); + } + } + + File followerDir = File.createTempFile("test", "dir", testData); + assertTrue(followerDir.delete()); + assertTrue(followerDir.mkdir()); + + File leaderDir = File.createTempFile("test", "dir", testData); + assertTrue(leaderDir.delete()); + assertTrue(leaderDir.mkdir()); + + Thread followerThread = null; + ConversableFollower follower = null; + QuorumPeer peer = null; + BlockingRequestProcessor blocker = null; + + try (ServerSocket ss = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"))) { + peer = createQuorumPeer(followerDir); + + FileTxnSnapLog logFactory = new FileTxnSnapLog(followerDir, followerDir); + peer.setTxnFactory(logFactory); + ZKDatabase zkDb = new ZKDatabase(logFactory); + BlockingFollowerZooKeeperServer zk = new BlockingFollowerZooKeeperServer(logFactory, peer, zkDb); + peer.setZKDatabase(zkDb); + follower = new ConversableFollower(peer, zk); + follower.setLeaderQuorumServer(new QuorumServer(1, (InetSocketAddress) ss.getLocalSocketAddress())); + peer.follower = follower; + + CompletableFuture followerExit = new CompletableFuture<>(); + final Follower followerForThread = follower; + followerThread = new Thread(() -> { + try { + followerForThread.followLeader(); + followerExit.complete(null); + } catch (Exception e) { + LOG.warn("Unexpected exception in follower thread", e); + followerExit.complete(e); + } + }); + followerThread.start(); + + Socket leaderSocket = ss.accept(); + InputArchive ia = BinaryInputArchive.getArchive(leaderSocket.getInputStream()); + OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket.getOutputStream()); + + assertEquals(0, follower.self.getAcceptedEpoch()); + assertEquals(0, follower.self.getCurrentEpoch()); + + // Set up a database with a single /foo node, on the leader + final long firstZxid = ZxidUtils.makeZxid(1, 1); + ZKDatabase leaderZkDb = new ZKDatabase(new FileTxnSnapLog(leaderDir, leaderDir)); + leaderZkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, OpCode.create), new CreateTxn("/foo", "data1".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1), null); + Stat stat = new Stat(); + assertEquals("data1", new String(leaderZkDb.getData("/foo", stat, null))); + + QuorumPacket qp = new QuorumPacket(); + readPacketSkippingPing(ia, qp); + assertEquals(Leader.FOLLOWERINFO, qp.getType()); + assertEquals(qp.getZxid(), 0); + LearnerInfo learnInfo = new LearnerInfo(); + ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); + assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getServerid(), 0); + + // We are simulating an established leader, so the epoch is 1 + qp.setType(Leader.LEADERINFO); + qp.setZxid(ZxidUtils.makeZxid(1, 0)); + byte[] protoBytes = new byte[4]; + ByteBuffer.wrap(protoBytes).putInt(0x10000); + qp.setData(protoBytes); + oa.writeRecord(qp, null); + + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACKEPOCH, qp.getType()); + assertEquals(0, qp.getZxid()); + assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt()); + assertEquals(1, follower.self.getAcceptedEpoch()); + assertEquals(0, follower.self.getCurrentEpoch()); + + // Send an empty diff + qp.setType(Leader.DIFF); + qp.setData(new byte[0]); + qp.setZxid(leaderZkDb.getDataTreeLastProcessedZxid()); + oa.writeRecord(qp, null); + + // Required for the ZK server to start up. + qp.setType(Leader.NEWLEADER); + qp.setZxid(ZxidUtils.makeZxid(1, 0)); + qp.setData(null); + oa.writeRecord(qp, null); + + qp.setType(Leader.UPTODATE); + qp.setZxid(0); + oa.writeRecord(qp, null); + + // Read the UPTODATE ack. + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + + // Get the ACK of the new leader. + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + assertEquals(1, follower.self.getAcceptedEpoch()); + assertEquals(1, follower.self.getCurrentEpoch()); + + // The follower now starts following the leader. + // We send a PROPOSAL and a COMMIT, and wait for the transaction to be flushed by SyncRequestProcessor. + long createZxid = ZxidUtils.makeZxid(1, 2); + qp.setType(Leader.PROPOSAL); + qp.setZxid(createZxid); + TxnHeader hdr = new TxnHeader(13, 1313, createZxid, 33, OpCode.create); + CreateTxn ct = new CreateTxn("/bar", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputArchive boa = BinaryOutputArchive.getArchive(baos); + boa.writeRecord(hdr, null); + boa.writeRecord(ct, null); + qp.setData(baos.toByteArray()); + oa.writeRecord(qp, null); + + qp.setType(Leader.COMMIT); + qp.setZxid(createZxid); + oa.writeRecord(qp, null); + + // Wait for "fsync" to begin. + assertTrue(followerSetUp.await(10, TimeUnit.SECONDS)); + blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker; + blocker.phaser.arriveAndAwaitAdvance(); + + // Now we send another PROPOSAL and COMMIT, and wait for them to be applied to the data tree. + // They will not be attempted flushed yet, because the ongoing "fsync" is slow (waiting on the phaser). + long createZxid2 = ZxidUtils.makeZxid(1, 3); + qp.setType(Leader.PROPOSAL); + qp.setZxid(createZxid2); + hdr = new TxnHeader(13, 1314, createZxid2, 34, OpCode.create); + ct = new CreateTxn("/baz", "bye".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1); + baos = new ByteArrayOutputStream(); + boa = BinaryOutputArchive.getArchive(baos); + boa.writeRecord(hdr, null); + boa.writeRecord(ct, null); + qp.setData(baos.toByteArray()); + oa.writeRecord(qp, null); + + qp.setType(Leader.COMMIT); + qp.setZxid(createZxid2); + oa.writeRecord(qp, null); + + // Wait for the follower to observe the COMMIT, and apply the PROPOSAL to its data tree. Unfortunately, + // there's nothing to do but sleep here, as watches are triggered before the last processed id is updated. + long doom = System.currentTimeMillis() + 1000; + while (createZxid != follower.fzk.getLastProcessedZxid() && System.currentTimeMillis() < doom) { + Thread.sleep(1); + } + assertEquals(createZxid, follower.fzk.getLastProcessedZxid()); + + // State recap: first create is flushing to disk, second is queued for flush; + // first and second creates are both applied to data tree. + // Leader now goes down, and signals this by closing its socket. The follower will then initiate a new + // leader election, where it is critical that its "last seen transaction" is indeed written; + // otherwise, any transactions queued for flushing to disk will be lost if the follower restarts again + // before taking a new snapshot. + + // Additionally, any writes in-flight should be allowed to complete _before_ the fast-forward-from-edits, + // done when partially shutting down the learner zoo keeper server to prepare for a new leader election, + // to avoid _also_ getting the transactions for those writes in a DIFF from the new leader, appending them + // twice (or more) to the transaction log, which would also give digest mismatches when restoring state. + // This is not tested here, but fixing ZOOKEEPER-4541 also fixes this problem, by flushing writes first. + + // Kill the leader + leaderSocket.close(); + followerExit.get(); // This closes the socket SendAckRequestProcessor uses, and crashes the SyncRequestProcessor. + blocker.phaser.awaitAdvance(blocker.phaser.arriveAndDeregister()); // Let the in-flight "fsync" complete, and crash (above). + + // A real QuorumPeer would now shut down the follower, and proceed to a new leader election. + follower.shutdown(); + + // The sync processor _should_ be dead now. Prior to the resolution of ZOOKEEPER-4541, it would only be + // dead because it had crashed, which was a bug in itself. + follower.fzk.syncProcessor.join(1000); + assertFalse(follower.fzk.syncProcessor.isAlive()); + + // Make sure the recorded data matches what we'll use for leader election. + File logDir = follower.fzk.getTxnLogFactory().getDataDir().getParentFile(); + File snapDir = follower.fzk.getTxnLogFactory().getSnapDir().getParentFile(); + ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); + zkDb2.loadDataBase(); + assertEquals(createZxid2, zkDb.getDataTreeLastProcessedZxid(), "last create zxid is used for leader election"); + assertEquals(createZxid2, zkDb2.getDataTreeLastProcessedZxid(), "last create zxid is written to persistent storage"); + } finally { + if (blocker != null) { + blocker.phaser.forceTermination(); + } + if (follower != null) { + follower.shutdown(); + } + if (followerThread != null) { + followerThread.interrupt(); + followerThread.join(); + } + if (peer != null) { + peer.shutdown(); + } + TestUtils.deleteFileRecursively(leaderDir); + TestUtils.deleteFileRecursively(followerDir); + } + } + @Test public void testNormalFollowerRunWithDiff() throws Exception { testFollowerConversation(new FollowerConversation() { From 0444ede833938cd129a2677a050fc6d22d8a6078 Mon Sep 17 00:00:00 2001 From: jonmv Date: Fri, 23 Sep 2022 14:55:15 +0200 Subject: [PATCH 02/15] Avoid dying when trying to close already closed socket --- .../zookeeper/server/quorum/SendAckRequestProcessor.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java index 8218ddae4ae..b2ca60e792f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -20,6 +20,8 @@ import java.io.Flushable; import java.io.IOException; +import java.net.Socket; + import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; @@ -64,7 +66,8 @@ public void flush() throws IOException { } catch (IOException e) { LOG.warn("Closing connection to leader, exception during packet send", e); try { - if (!learner.sock.isClosed()) { + Socket socket = learner.sock; + if ( socket != null && ! learner.sock.isClosed()) { learner.sock.close(); } } catch (IOException e1) { From 02d9f8a1485913bf293ef9df9a14b998feb8fd0c Mon Sep 17 00:00:00 2001 From: jonmv Date: Fri, 23 Sep 2022 14:58:11 +0200 Subject: [PATCH 03/15] Propagate shutdown correctly to ZooKeeperServer and its children --- .../zookeeper/server/ZooKeeperServer.java | 2 +- .../server/quorum/LeaderZooKeeperServer.java | 4 +-- .../server/quorum/LearnerZooKeeperServer.java | 30 ++++++++++++------- .../quorum/ObserverZooKeeperServer.java | 17 +++-------- .../quorum/ReadOnlyZooKeeperServer.java | 23 +++++++------- 5 files changed, 38 insertions(+), 38 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 0303ca645bd..749b558329a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -883,7 +883,7 @@ public synchronized void shutdown(boolean fullyShutDown) { // * If we fetch a new snapshot from leader, the zkDb will be // cleared anyway before loading the snapshot try { - //This will fast forward the database to the latest recorded transactions + // This will fast-forward the database to the latest recorded transactions zkDb.fastForwardDataBase(); } catch (IOException e) { LOG.error("Error updating DB", e); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index 5a51bc23d7e..7046d9a1fcb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -155,11 +155,11 @@ protected void unregisterMetrics() { } @Override - public synchronized void shutdown() { + public synchronized void shutdown(boolean fullyShutDown) { if (containerManager != null) { containerManager.stop(); } - super.shutdown(); + super.shutdown(fullyShutDown); } @Override diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java index cab95b05621..4d1cc703948 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java @@ -152,23 +152,31 @@ protected void unregisterJMX(Learner peer) { } @Override - public synchronized void shutdown() { + public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - return; } - LOG.info("Shutting down"); - try { - super.shutdown(); - } catch (Exception e) { - LOG.warn("Ignoring unexpected exception during shutdown", e); + else { + LOG.info("Shutting down"); + try { + if (syncProcessor != null) { + // Shutting down the syncProcessor here, first, ensures queued transactions here are written to + // permanent storage, which ensures that crash recovery data is consistent with what is used for a + // leader election immediately following shutdown, because of the old leader going down; and also + // that any state on its way to being written is also loaded in the potential call to + // fast-forward-from-edits, in super.shutdown(...), so we avoid getting a DIFF from the new leader + // that contains entries we have already written to our transaction log. + syncProcessor.shutdown(); + } + } + catch (Exception e) { + LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e); + } } try { - if (syncProcessor != null) { - syncProcessor.shutdown(); - } + super.shutdown(fullyShutDown); } catch (Exception e) { - LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e); + LOG.warn("Ignoring unexpected exception during shutdown", e); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java index a41a9187743..3cd6dc83a58 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java @@ -73,7 +73,7 @@ public Learner getLearner() { * @param request */ public void commitRequest(Request request) { - if (syncRequestProcessorEnabled) { + if (syncProcessor != null) { // Write to txnlog and take periodic snapshot syncProcessor.processRequest(request); } @@ -107,6 +107,9 @@ protected void setupRequestProcessors() { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } + else { + syncProcessor = null; + } } /* @@ -127,18 +130,6 @@ public String getState() { return "observer"; } - @Override - public synchronized void shutdown() { - if (!canShutdown()) { - LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - return; - } - super.shutdown(); - if (syncRequestProcessorEnabled && syncProcessor != null) { - syncProcessor.shutdown(); - } - } - @Override public void dumpMonitorValues(BiConsumer response) { super.dumpMonitorValues(response); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java index d2f6b39b69a..c768a4f0096 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java @@ -190,23 +190,24 @@ public long getServerId() { } @Override - public synchronized void shutdown() { + public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { + super.shutdown(fullyShutDown); LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - return; } - shutdown = true; - unregisterJMX(this); - - // set peer's server to null - self.setZooKeeperServer(null); - // clear all the connections - self.closeAllConnections(); + else { + shutdown = true; + unregisterJMX(this); - self.adminServer.setZooKeeperServer(null); + // set peer's server to null + self.setZooKeeperServer(null); + // clear all the connections + self.closeAllConnections(); + self.adminServer.setZooKeeperServer(null); + } // shutdown the server itself - super.shutdown(); + super.shutdown(fullyShutDown); } @Override From 96f2104fec2fd5a2c89588caad9770d1e4641d7c Mon Sep 17 00:00:00 2001 From: jonmv Date: Mon, 3 Oct 2022 09:34:16 +0200 Subject: [PATCH 04/15] Use local variable for socket --- .../server/quorum/LearnerZooKeeperServer.java | 6 ++-- .../quorum/ObserverZooKeeperServer.java | 3 +- .../quorum/ReadOnlyZooKeeperServer.java | 3 +- .../quorum/SendAckRequestProcessor.java | 3 +- .../zookeeper/server/quorum/Zab1_0Test.java | 29 ++++++++++++++----- 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java index 4d1cc703948..19cd10ba2ff 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java @@ -155,8 +155,7 @@ protected void unregisterJMX(Learner peer) { public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - } - else { + } else { LOG.info("Shutting down"); try { if (syncProcessor != null) { @@ -168,8 +167,7 @@ public synchronized void shutdown(boolean fullyShutDown) { // that contains entries we have already written to our transaction log. syncProcessor.shutdown(); } - } - catch (Exception e) { + } catch (Exception e) { LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java index 3cd6dc83a58..508670ec8d8 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java @@ -106,8 +106,7 @@ protected void setupRequestProcessors() { if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); - } - else { + } else { syncProcessor = null; } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java index c768a4f0096..3d7a9858cdc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java @@ -194,8 +194,7 @@ public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { super.shutdown(fullyShutDown); LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - } - else { + } else { shutdown = true; unregisterJMX(this); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java index b2ca60e792f..c40c83d66a0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -21,7 +21,6 @@ import java.io.Flushable; import java.io.IOException; import java.net.Socket; - import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; @@ -67,7 +66,7 @@ public void flush() throws IOException { LOG.warn("Closing connection to leader, exception during packet send", e); try { Socket socket = learner.sock; - if ( socket != null && ! learner.sock.isClosed()) { + if (socket != null && !learner.sock.isClosed()) { learner.sock.close(); } } catch (IOException e1) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 2a8607f6253..2d910abb47c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -729,24 +729,34 @@ private void proposeSetData(QuorumPacket qp, long zxid, String data, int version * flushed to persistent storage when the quorum peer shut down the old follower, or this would * fail, causing state to be recreated from whatever state was already flushed, which again would * be corrected in a DIFF from the new leader. - * - * @author jonmv */ @Test public void testFollowerWithPendingSyncsOnLeaderReElection() throws Exception { + CountDownLatch followerSetUp = new CountDownLatch(1); + class BlockingRequestProcessor implements RequestProcessor, Flushable { final Phaser phaser = new Phaser(2); // SyncRequestProcessor and test thread + final SendAckRequestProcessor nextProcessor; // SendAckRequestProcessor - BlockingRequestProcessor(SendAckRequestProcessor nextProcessor) { this.nextProcessor = nextProcessor; } - @Override public void processRequest(Request request) throws RequestProcessorException { + + BlockingRequestProcessor(SendAckRequestProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public void processRequest(Request request) throws RequestProcessorException { nextProcessor.processRequest(request); } - @Override public void shutdown() { + + @Override + public void shutdown() { phaser.forceTermination(); nextProcessor.shutdown(); } - @Override public void flush() throws IOException { + + @Override + public void flush() throws IOException { phaser.arriveAndAwaitAdvance(); // Let test thread know we're flushing. phaser.arriveAndAwaitAdvance(); // Let test thread do more stuff while we wait here, simulating slow fsync, etc.. nextProcessor.flush(); @@ -755,11 +765,15 @@ class BlockingRequestProcessor implements RequestProcessor, Flushable { } class BlockingFollowerZooKeeperServer extends FollowerZooKeeperServer { + BlockingRequestProcessor blocker; + BlockingFollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { super(logFactory, self, zkDb); } - @Override protected void setupRequestProcessors() { + + @Override + protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); @@ -770,6 +784,7 @@ class BlockingFollowerZooKeeperServer extends FollowerZooKeeperServer { syncProcessor.start(); followerSetUp.countDown(); } + } File followerDir = File.createTempFile("test", "dir", testData); From 83812a94027794ac31f5de1bbef93e5646bca6c8 Mon Sep 17 00:00:00 2001 From: jonmv Date: Mon, 3 Oct 2022 09:49:29 +0200 Subject: [PATCH 05/15] Modify test to crash Follower on COMMIT after NEWLEADER during DIFF sync --- .../zookeeper/server/quorum/Zab1_0Test.java | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 2d910abb47c..0d5c0f63569 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -863,18 +863,35 @@ protected void setupRequestProcessors() { assertEquals(1, follower.self.getAcceptedEpoch()); assertEquals(0, follower.self.getCurrentEpoch()); - // Send an empty diff + // Send a diff with a single PROPOSAL, to be COMMITTed after NEWLEADER qp.setType(Leader.DIFF); qp.setData(new byte[0]); qp.setZxid(leaderZkDb.getDataTreeLastProcessedZxid()); oa.writeRecord(qp, null); + long createZxid0 = ZxidUtils.makeZxid(1, 2); + qp.setType(Leader.PROPOSAL); + qp.setZxid(createZxid0); + TxnHeader hdr = new TxnHeader(13, 1313, createZxid0, 33, OpCode.create); + CreateTxn ct = new CreateTxn("/bar", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputArchive boa = BinaryOutputArchive.getArchive(baos); + boa.writeRecord(hdr, null); + boa.writeRecord(ct, null); + qp.setData(baos.toByteArray()); + oa.writeRecord(qp, null); + // Required for the ZK server to start up. qp.setType(Leader.NEWLEADER); qp.setZxid(ZxidUtils.makeZxid(1, 0)); qp.setData(null); oa.writeRecord(qp, null); + // Quorum was acquired for the previous PROPOSAL, which is now COMMITTed. + qp.setType(Leader.COMMIT); + qp.setZxid(createZxid0); + oa.writeRecord(qp, null); + qp.setType(Leader.UPTODATE); qp.setZxid(0); oa.writeRecord(qp, null); @@ -893,20 +910,20 @@ protected void setupRequestProcessors() { // The follower now starts following the leader. // We send a PROPOSAL and a COMMIT, and wait for the transaction to be flushed by SyncRequestProcessor. - long createZxid = ZxidUtils.makeZxid(1, 2); + long createZxid1 = ZxidUtils.makeZxid(1, 3); qp.setType(Leader.PROPOSAL); - qp.setZxid(createZxid); - TxnHeader hdr = new TxnHeader(13, 1313, createZxid, 33, OpCode.create); - CreateTxn ct = new CreateTxn("/bar", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - OutputArchive boa = BinaryOutputArchive.getArchive(baos); + qp.setZxid(createZxid1); + hdr = new TxnHeader(13, 1313, createZxid1, 33, OpCode.create); + ct = new CreateTxn("/bar", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1); + baos = new ByteArrayOutputStream(); + boa = BinaryOutputArchive.getArchive(baos); boa.writeRecord(hdr, null); boa.writeRecord(ct, null); qp.setData(baos.toByteArray()); oa.writeRecord(qp, null); qp.setType(Leader.COMMIT); - qp.setZxid(createZxid); + qp.setZxid(createZxid1); oa.writeRecord(qp, null); // Wait for "fsync" to begin. @@ -916,7 +933,7 @@ protected void setupRequestProcessors() { // Now we send another PROPOSAL and COMMIT, and wait for them to be applied to the data tree. // They will not be attempted flushed yet, because the ongoing "fsync" is slow (waiting on the phaser). - long createZxid2 = ZxidUtils.makeZxid(1, 3); + long createZxid2 = ZxidUtils.makeZxid(1, 4); qp.setType(Leader.PROPOSAL); qp.setZxid(createZxid2); hdr = new TxnHeader(13, 1314, createZxid2, 34, OpCode.create); @@ -935,10 +952,10 @@ protected void setupRequestProcessors() { // Wait for the follower to observe the COMMIT, and apply the PROPOSAL to its data tree. Unfortunately, // there's nothing to do but sleep here, as watches are triggered before the last processed id is updated. long doom = System.currentTimeMillis() + 1000; - while (createZxid != follower.fzk.getLastProcessedZxid() && System.currentTimeMillis() < doom) { + while (createZxid1 != follower.fzk.getLastProcessedZxid() && System.currentTimeMillis() < doom) { Thread.sleep(1); } - assertEquals(createZxid, follower.fzk.getLastProcessedZxid()); + assertEquals(createZxid1, follower.fzk.getLastProcessedZxid()); // State recap: first create is flushing to disk, second is queued for flush; // first and second creates are both applied to data tree. From 4aab1fdb939c270376bcb604c9e26a5afcf1dc7e Mon Sep 17 00:00:00 2001 From: jonmv Date: Mon, 3 Oct 2022 09:59:40 +0200 Subject: [PATCH 06/15] Handle COMMIT after NEWLEADER in DIFF sync without NPE --- .../zookeeper/server/quorum/Learner.java | 50 ++++++++++--------- .../quorum/SendAckRequestProcessor.java | 2 +- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index b6eeb758ac9..b9faa21652d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -555,6 +555,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { boolean syncSnapshot = false; readPacket(qp); Deque packetsCommitted = new ArrayDeque<>(); + Deque packetsNotLogged = new ArrayDeque<>(); Deque packetsNotCommitted = new ArrayDeque<>(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { @@ -643,33 +644,36 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { self.setLastSeenQuorumVerifier(qv, true); } + packetsNotLogged.add(pif); packetsNotCommitted.add(pif); break; case Leader.COMMIT: case Leader.COMMITANDACTIVATE: pif = packetsNotCommitted.peekFirst(); - if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) { - QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); - boolean majorChange = self.processReconfig( - qv, - ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), - true); - if (majorChange) { - throw new Exception("changes proposed in reconfig"); + if (pif.hdr.getZxid() != qp.getZxid()) { + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(qp.getZxid()), + Long.toHexString(pif.hdr.getZxid())); + } else { + if (qp.getType() == Leader.COMMITANDACTIVATE) { + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig( + qv, + ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), + true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } } - } - if (!writeToTxnLog) { - if (pif.hdr.getZxid() != qp.getZxid()) { - LOG.warn( - "Committing 0x{}, but next proposal is 0x{}", - Long.toHexString(qp.getZxid()), - Long.toHexString(pif.hdr.getZxid())); - } else { + if (!writeToTxnLog) { zk.processTxn(pif.hdr, pif.rec); + packetsNotLogged.remove(); + packetsNotCommitted.remove(); + } else { packetsNotCommitted.remove(); + packetsCommitted.add(qp.getZxid()); } - } else { - packetsCommitted.add(qp.getZxid()); } break; case Leader.INFORM: @@ -708,7 +712,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { - packetsNotCommitted.add(packet); + packetsNotLogged.add(packet); packetsCommitted.add(qp.getZxid()); } @@ -756,10 +760,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { zk.startupWithoutServing(); if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { + for (PacketInFlight p : packetsNotLogged) { fzk.logRequest(p.hdr, p.rec, p.digest); } - packetsNotCommitted.clear(); + packetsNotLogged.clear(); } writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); @@ -782,7 +786,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { // We need to log the stuff that came in between the snapshot and the uptodate if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { + for (PacketInFlight p : packetsNotLogged) { fzk.logRequest(p.hdr, p.rec, p.digest); } for (Long zxid : packetsCommitted) { @@ -792,7 +796,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { // Similar to follower, we need to log requests between the snapshot // and UPTODATE ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { + for (PacketInFlight p : packetsNotLogged) { Long zxid = packetsCommitted.peekFirst(); if (p.hdr.getZxid() != zxid) { // log warning message if there is no matching commit diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java index c40c83d66a0..96bf4b776d8 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -66,7 +66,7 @@ public void flush() throws IOException { LOG.warn("Closing connection to leader, exception during packet send", e); try { Socket socket = learner.sock; - if (socket != null && !learner.sock.isClosed()) { + if (socket != null && !socket.isClosed()) { learner.sock.close(); } } catch (IOException e1) { From ed275cb26b5b8bf432342b4db4002bdd5f86b1fc Mon Sep 17 00:00:00 2001 From: jonmv Date: Mon, 3 Oct 2022 18:36:05 +0200 Subject: [PATCH 07/15] Ensure TXNs are flushed before ACK of NEWLEADER, and ensure expected ACK order --- .../server/SyncRequestProcessor.java | 88 +++++++++++++++++-- .../zookeeper/server/quorum/Learner.java | 8 ++ .../quorum/SendAckRequestProcessor.java | 2 +- .../quorum/DIFFSyncConsistencyTest.java | 4 +- .../zookeeper/server/quorum/Zab1_0Test.java | 31 ++++--- 5 files changed, 112 insertions(+), 21 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java index 4df319f86a4..e03e0b07944 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -55,6 +56,55 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private static final Request REQUEST_OF_DEATH = Request.requestOfDeath; + private static class FlushRequest extends Request { + private final CountDownLatch latch = new CountDownLatch(1); + public FlushRequest() { + super(null, 0, 0, 0, null, null); + } + } + + private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null); + private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null); + + private static class DelayingProcessor implements RequestProcessor, Flushable { + private final RequestProcessor next; + private Queue delayed = null; + private DelayingProcessor(RequestProcessor next) { + this.next = next; + } + @Override + public void flush() throws IOException { + if (delayed == null && next instanceof Flushable) { + ((Flushable) next).flush(); + } + } + @Override + public void processRequest(Request request) throws RequestProcessorException { + if (delayed == null) { + next.processRequest(request); + } else { + delayed.add(request); + } + } + @Override + public void shutdown() { + next.shutdown(); + } + private void close() { + if (delayed == null) { + delayed = new ArrayDeque<>(); + } + } + private void open() throws RequestProcessorException { + if (delayed != null) { + for (Request request : delayed) { + next.processRequest(request); + } + delayed = null; + } + } + } + /** The number of log entries to log before starting a snapshot */ private static int snapCount = ZooKeeperServer.getSnapCount(); @@ -75,7 +125,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private final ZooKeeperServer zks; - private final RequestProcessor nextProcessor; + private final DelayingProcessor nextProcessor; /** * Transactions that have been written and are waiting to be flushed to @@ -88,7 +138,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener()); this.zks = zks; - this.nextProcessor = nextProcessor; + this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor); this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize()); } @@ -174,6 +224,21 @@ public void run() { break; } + if (si == turnForwardingDelayOn) { + nextProcessor.close(); + continue; + } + if (si == turnForwardingDelayOff) { + nextProcessor.open(); + continue; + } + + if (si instanceof FlushRequest) { + flush(); + ((FlushRequest) si).latch.countDown(); + continue; + } + long startProcessTime = Time.currentElapsedTime(); ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); @@ -206,9 +271,7 @@ public void run() { // and there are no pending flushes (writes), then just pass this to the next processor if (nextProcessor != null) { nextProcessor.processRequest(si); - if (nextProcessor instanceof Flushable) { - ((Flushable) nextProcessor).flush(); - } + nextProcessor.flush(); } continue; } @@ -224,6 +287,17 @@ public void run() { LOG.info("SyncRequestProcessor exited!"); } + /** Flushes all pending writes, and waits for this to complete. */ + public void syncFlush() throws InterruptedException { + FlushRequest marker = new FlushRequest(); + queuedRequests.add(marker); + marker.latch.await(); + } + + public void setDelayForwarding(boolean delayForwarding) { + queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff); + } + private void flush() throws IOException, RequestProcessorException { if (this.toFlush.isEmpty()) { return; @@ -244,9 +318,7 @@ private void flush() throws IOException, RequestProcessorException { ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); this.nextProcessor.processRequest(i); } - if (this.nextProcessor instanceof Flushable) { - ((Flushable) this.nextProcessor).flush(); - } + nextProcessor.flush(); } lastFlushTime = Time.currentElapsedTime(); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index b9faa21652d..e6b22f16731 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -760,13 +760,21 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { zk.startupWithoutServing(); if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(true); for (PacketInFlight p : packetsNotLogged) { fzk.logRequest(p.hdr, p.rec, p.digest); } packetsNotLogged.clear(); + fzk.syncProcessor.syncFlush(); } writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); + + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(false); + fzk.syncProcessor.syncFlush(); + } break; } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java index 96bf4b776d8..d65ead216f0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -32,7 +32,7 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable { private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class); - Learner learner; + final Learner learner; SendAckRequestProcessor(Learner peer) { this.learner = peer; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java index 9b9ea55f3f6..056b5b246e0 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java @@ -49,7 +49,7 @@ public class DIFFSyncConsistencyTest extends QuorumPeerTestBase { private MainThread[] mt = new MainThread[SERVER_COUNT]; @Test - @Timeout(value = 120) + @Timeout(value = 20) public void testInconsistentDueToUncommittedLog() throws Exception { final int LEADER_TIMEOUT_MS = 10_000; final int[] clientPorts = new int[SERVER_COUNT]; @@ -251,7 +251,7 @@ void readPacket(QuorumPacket pp) throws IOException { super.readPacket(pp); if (injectError && pp.getType() == Leader.UPTODATE) { String type = LearnerHandler.packetToString(pp); - throw new SocketTimeoutException("Socket timeout while reading the packet for operation " + throw new SocketTimeoutException("TEST CODE Socket timeout while reading the packet for operation " + type); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 0d5c0f63569..8c5c0138bd4 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -736,7 +736,7 @@ public void testFollowerWithPendingSyncsOnLeaderReElection() throws Exception { CountDownLatch followerSetUp = new CountDownLatch(1); class BlockingRequestProcessor implements RequestProcessor, Flushable { - final Phaser phaser = new Phaser(2); // SyncRequestProcessor and test thread + final Phaser phaser = new Phaser(1); // SyncRequestProcessor; test thread will register later. final SendAckRequestProcessor nextProcessor; // SendAckRequestProcessor @@ -896,20 +896,27 @@ protected void setupRequestProcessors() { qp.setZxid(0); oa.writeRecord(qp, null); - // Read the UPTODATE ack. + // Get the ACK of the new leader. readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + assertEquals(1, follower.self.getAcceptedEpoch()); + assertEquals(1, follower.self.getCurrentEpoch()); - // Get the ACK of the new leader. + // Read the PROPOSAL ack. + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(createZxid0, qp.getZxid()); + + // Read the UPTODATE ack. readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(1, follower.self.getAcceptedEpoch()); - assertEquals(1, follower.self.getCurrentEpoch()); // The follower now starts following the leader. // We send a PROPOSAL and a COMMIT, and wait for the transaction to be flushed by SyncRequestProcessor. + blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker; + blocker.phaser.register(); long createZxid1 = ZxidUtils.makeZxid(1, 3); qp.setType(Leader.PROPOSAL); qp.setZxid(createZxid1); @@ -928,7 +935,6 @@ protected void setupRequestProcessors() { // Wait for "fsync" to begin. assertTrue(followerSetUp.await(10, TimeUnit.SECONDS)); - blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker; blocker.phaser.arriveAndAwaitAdvance(); // Now we send another PROPOSAL and COMMIT, and wait for them to be applied to the data tree. @@ -1075,17 +1081,22 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) qp.setZxid(0); oa.writeRecord(qp, null); - // Read the uptodate ack + // Get the ack of the new leader readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + assertEquals(1, f.self.getAcceptedEpoch()); + assertEquals(1, f.self.getCurrentEpoch()); - // Get the ack of the new leader + // Read the create session ack + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(createSessionZxid, qp.getZxid()); + + // Read the uptodate ack readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(1, f.self.getAcceptedEpoch()); - assertEquals(1, f.self.getCurrentEpoch()); //Wait for the transactions to be written out. The thread that writes them out // does not send anything back when it is done. From 51663e1ab14e82956c982a9ea945404f57950726 Mon Sep 17 00:00:00 2001 From: jonmv Date: Mon, 17 Oct 2022 16:05:25 +0200 Subject: [PATCH 08/15] Extract SyncHelper to deal with packets inflight --- .../zookeeper/server/quorum/Learner.java | 291 ++++++++++-------- .../zookeeper/server/quorum/QuorumPeer.java | 1 - 2 files changed, 161 insertions(+), 131 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index e6b22f16731..46bc41209ea 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -543,36 +543,153 @@ protected long registerWithLeader(int pktType) throws IOException { * @throws InterruptedException */ protected void syncWithLeader(long newLeaderZxid) throws Exception { - QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); - QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); - QuorumVerifier newLeaderQV = null; + boolean isPreZAB1_0 = true; // Whether the leader is pre-ZAB1.0 + + class SyncHelper { + + // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot. + // For SNAP and TRUNC the snapshot is needed to save that history. + boolean willSnapshot = true; + boolean syncSnapshot = false; + + // PROPOSALs received during sync, for matching up with COMMITs. + Deque proposals = new ArrayDeque<>(); + + // PROPOSALs we delay forwarding to the ZK server until sync is done. + Deque delayedProposals = new ArrayDeque<>(); + + // COMMITs we delay forwarding to the ZK server until sync is done. + Deque delayedCommits = new ArrayDeque<>(); + + public void syncSnapshot() { + syncSnapshot = true; + } + + public void noSnapshot() { + willSnapshot = false; + } + + public void propose(PacketInFlight pif) { + proposals.add(pif); + delayedProposals.add(pif); + } + + public PacketInFlight nextProposal() { + return proposals.peekFirst(); + } + + public void commit() { + PacketInFlight packet = proposals.remove(); + if (willSnapshot) { + zk.processTxn(packet.hdr, packet.rec); + delayedProposals.remove(); + } else { + delayedCommits.add(packet.hdr.getZxid()); + } + } + + public void proposeAndCommit(PacketInFlight packet) { + // Should be able to do propose(packet); commit(); because INFORM with non-empty proposals would be an error ... + if (willSnapshot) { + zk.processTxn(packet.hdr, packet.rec); + } else { + delayedProposals.add(packet); + delayedCommits.add(packet.hdr.getZxid()); + } + } + + public void writeState() throws IOException, InterruptedException { + // Ensure all received transaction PROPOSALs are written before we ACK the NEWLEADER, + // since this allows the leader to apply those transactions to its served state: + if (willSnapshot) { + zk.takeSnapshot(syncSnapshot); // either, the snapshot contains the transactions, + willSnapshot = false; // but anything after this needs to go to the transaction log; or + } + + self.setCurrentEpoch(newEpoch); + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); + zk.startupWithoutServing(); + + // if we're a follower, we need to ensure the transactions are safely logged before ACK'ing. + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + // The leader expects the NEWLEADER ACK to precede all the PROPOSAL ACKs, so we only write them first. + fzk.syncProcessor.setDelayForwarding(true); + for (PacketInFlight p : delayedProposals) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + delayedProposals.clear(); + fzk.syncProcessor.syncFlush(); + } + } - // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot - // For SNAP and TRUNC the snapshot is needed to save that history - boolean snapshotNeeded = true; - boolean syncSnapshot = false; + public void flushAcks() throws InterruptedException { + if (zk instanceof FollowerZooKeeperServer) { + // The NEWLEADER is ACK'ed, and we can now ACK the PROPOSALs we wrote in writeState. + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(false); + fzk.syncProcessor.syncFlush(); // Ensure these are all ACK'ed before the UPTODATE ACK. + } + } + + public void applyDelayedPackets() { + // Any delayed packets must now be applied: all PROPOSALs first, then any COMMITs. + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + for (PacketInFlight p : delayedProposals) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + for (Long zxid : delayedCommits) { + fzk.commit(zxid); + } + } else if (zk instanceof ObserverZooKeeperServer) { + ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; + for (PacketInFlight p : delayedProposals) { + Long zxid = delayedCommits.peekFirst(); + if (p.hdr.getZxid() != zxid) { + // log warning message if there is no matching commit + // old leader send outstanding proposal to observer + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(zxid), + Long.toHexString(p.hdr.getZxid())); + continue; + } + delayedCommits.remove(); + Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); + request.setTxn(p.rec); + request.setHdr(p.hdr); + request.setTxnDigest(p.digest); + ozk.commitRequest(request); + } + } else { + // New server type need to handle in-flight packets + throw new UnsupportedOperationException("Unknown server type"); + } + } + + } + + SyncHelper helper = new SyncHelper(); + QuorumPacket qp = new QuorumPacket(); readPacket(qp); - Deque packetsCommitted = new ArrayDeque<>(); - Deque packetsNotLogged = new ArrayDeque<>(); - Deque packetsNotCommitted = new ArrayDeque<>(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); self.setSyncMode(QuorumPeer.SyncMode.DIFF); if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) { LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading."); - snapshotNeeded = true; - syncSnapshot = true; + helper.syncSnapshot(); } else { - snapshotNeeded = false; + helper.noSnapshot(); } } else if (qp.getType() == Leader.SNAP) { self.setSyncMode(QuorumPeer.SyncMode.SNAP); LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid())); // The leader is going to dump the database - // db is clear as part of deserializeSnapshot() zk.getZKDatabase().deserializeSnapshot(leaderIs); // ZOOKEEPER-2819: overwrite config node content extracted // from leader snapshot with local config, to avoid potential @@ -588,20 +705,18 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); - // immediately persist the latest snapshot when there is txn log gap - syncSnapshot = true; + // Immediately persist the latest snapshot when there is txn log gap + helper.syncSnapshot(); } else if (qp.getType() == Leader.TRUNC) { - //we need to truncate the log to the lastzxid of the leader + // We need to truncate the log to the lastZxid of the leader self.setSyncMode(QuorumPeer.SyncMode.TRUNC); LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid())); boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); if (!truncated) { - // not able to truncate the log LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid())); ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); - } else { LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp)); ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); @@ -609,17 +724,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); zk.createSessionTracker(); - long lastQueued = 0; - // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 - // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) - // we need to make sure that we don't take the snapshot twice. - boolean isPreZAB1_0 = true; - //If we are not going to take the snapshot be sure the transactions are not applied in memory - // but written out to the transaction log - boolean writeToTxnLog = !snapshotNeeded; - TxnLogEntry logEntry; // we are now going to start getting transactions to apply followed by an UPTODATE + long lastQueued = 0; + TxnLogEntry logEntry; outerLoop: while (self.isRunning()) { readPacket(qp); @@ -643,13 +751,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8)); self.setLastSeenQuorumVerifier(qv, true); } - - packetsNotLogged.add(pif); - packetsNotCommitted.add(pif); + helper.propose(pif); break; case Leader.COMMIT: case Leader.COMMITANDACTIVATE: - pif = packetsNotCommitted.peekFirst(); + pif = helper.nextProposal(); if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn( "Committing 0x{}, but next proposal is 0x{}", @@ -657,43 +763,24 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { Long.toHexString(pif.hdr.getZxid())); } else { if (qp.getType() == Leader.COMMITANDACTIVATE) { - QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); - boolean majorChange = self.processReconfig( - qv, - ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), - true); - if (majorChange) { - throw new Exception("changes proposed in reconfig"); - } - } - if (!writeToTxnLog) { - zk.processTxn(pif.hdr, pif.rec); - packetsNotLogged.remove(); - packetsNotCommitted.remove(); - } else { - packetsNotCommitted.remove(); - packetsCommitted.add(qp.getZxid()); + tryReconfig(pif, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid()); } + helper.commit(); } break; case Leader.INFORM: case Leader.INFORMANDACTIVATE: PacketInFlight packet = new PacketInFlight(); - if (qp.getType() == Leader.INFORMANDACTIVATE) { ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); long suggestedLeaderId = buffer.getLong(); - byte[] remainingdata = new byte[buffer.remaining()]; - buffer.get(remainingdata); - logEntry = SerializeUtils.deserializeTxn(remainingdata); + byte[] remainingData = new byte[buffer.remaining()]; + buffer.get(remainingData); + logEntry = SerializeUtils.deserializeTxn(remainingData); packet.hdr = logEntry.getHeader(); packet.rec = logEntry.getTxn(); packet.digest = logEntry.getDigest(); - QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8)); - boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); - if (majorChange) { - throw new Exception("changes proposed in reconfig"); - } + tryReconfig(packet, suggestedLeaderId, qp.getZxid()); } else { logEntry = SerializeUtils.deserializeTxn(qp.getData()); packet.rec = logEntry.getTxn(); @@ -708,14 +795,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } lastQueued = packet.hdr.getZxid(); } - if (!writeToTxnLog) { - // Apply to db directly if we haven't taken the snapshot - zk.processTxn(packet.hdr, packet.rec); - } else { - packetsNotLogged.add(packet); - packetsCommitted.add(qp.getZxid()); - } - + helper.proposeAndCommit(packet); break; case Leader.UPTODATE: LOG.info("Learner received UPTODATE message"); @@ -725,15 +805,18 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { throw new Exception("changes proposed in reconfig"); } } + // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 + // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) + // we need to make sure that we don't take the snapshot twice. if (isPreZAB1_0) { - zk.takeSnapshot(syncSnapshot); - self.setCurrentEpoch(newEpoch); + helper.writeState(); } self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery // means this is Zab 1.0 + isPreZAB1_0 = false; LOG.info("Learner received NEWLEADER message"); if (qp.getData() != null && qp.getData().length > 1) { try { @@ -745,40 +828,14 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } } - if (snapshotNeeded) { - zk.takeSnapshot(syncSnapshot); - } - - self.setCurrentEpoch(newEpoch); - writeToTxnLog = true; - //Anything after this needs to go to the transaction log, not applied directly in memory - isPreZAB1_0 = false; - - // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). - sock.setSoTimeout(self.tickTime * self.syncLimit); - self.setSyncMode(QuorumPeer.SyncMode.NONE); - zk.startupWithoutServing(); - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - fzk.syncProcessor.setDelayForwarding(true); - for (PacketInFlight p : packetsNotLogged) { - fzk.logRequest(p.hdr, p.rec, p.digest); - } - packetsNotLogged.clear(); - fzk.syncProcessor.syncFlush(); - } - + helper.writeState(); writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); - - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - fzk.syncProcessor.setDelayForwarding(false); - fzk.syncProcessor.syncFlush(); - } + helper.flushAcks(); break; } } } + QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); zk.startServing(); @@ -791,40 +848,14 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { */ self.updateElectionVote(newEpoch); - // We need to log the stuff that came in between the snapshot and the uptodate - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : packetsNotLogged) { - fzk.logRequest(p.hdr, p.rec, p.digest); - } - for (Long zxid : packetsCommitted) { - fzk.commit(zxid); - } - } else if (zk instanceof ObserverZooKeeperServer) { - // Similar to follower, we need to log requests between the snapshot - // and UPTODATE - ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; - for (PacketInFlight p : packetsNotLogged) { - Long zxid = packetsCommitted.peekFirst(); - if (p.hdr.getZxid() != zxid) { - // log warning message if there is no matching commit - // old leader send outstanding proposal to observer - LOG.warn( - "Committing 0x{}, but next proposal is 0x{}", - Long.toHexString(zxid), - Long.toHexString(p.hdr.getZxid())); - continue; - } - packetsCommitted.remove(); - Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); - request.setTxn(p.rec); - request.setHdr(p.hdr); - request.setTxnDigest(p.digest); - ozk.commitRequest(request); - } - } else { - // New server type need to handle in-flight packets - throw new UnsupportedOperationException("Unknown server type"); + helper.applyDelayedPackets(); + } + + private void tryReconfig(PacketInFlight pif, long newLeader, long zxid) throws Exception { + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig(qv, newLeader, zxid, true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 18e97bb2f0c..3f448148a83 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -2231,7 +2231,6 @@ public long getAcceptedEpoch() throws IOException { public void setCurrentEpoch(long e) throws IOException { writeLongToFile(CURRENT_EPOCH_FILENAME, e); currentEpoch = e; - } public void setAcceptedEpoch(long e) throws IOException { From 47eb1e8e321de0f7d60aff345312df34d83b992c Mon Sep 17 00:00:00 2001 From: jonmv Date: Mon, 17 Oct 2022 19:03:06 +0200 Subject: [PATCH 09/15] Flush delayed acks later, on UPTODATE --- .../main/java/org/apache/zookeeper/server/quorum/Learner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 46bc41209ea..c70a4612259 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -811,6 +811,7 @@ public void applyDelayedPackets() { if (isPreZAB1_0) { helper.writeState(); } + helper.flushAcks(); self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; @@ -830,7 +831,6 @@ public void applyDelayedPackets() { helper.writeState(); writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); - helper.flushAcks(); break; } } From 84be071453c541403200c26a6e7f6c9e612caad0 Mon Sep 17 00:00:00 2001 From: jonmv Date: Mon, 17 Oct 2022 19:05:35 +0200 Subject: [PATCH 10/15] Remove pre-ZAB1.0 legacy code --- .../org/apache/zookeeper/server/quorum/Learner.java | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index c70a4612259..035425fd119 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -545,7 +545,6 @@ protected long registerWithLeader(int pktType) throws IOException { protected void syncWithLeader(long newLeaderZxid) throws Exception { long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); QuorumVerifier newLeaderQV = null; - boolean isPreZAB1_0 = true; // Whether the leader is pre-ZAB1.0 class SyncHelper { @@ -659,9 +658,7 @@ public void applyDelayedPackets() { continue; } delayedCommits.remove(); - Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); - request.setTxn(p.rec); - request.setHdr(p.hdr); + Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1); request.setTxnDigest(p.digest); ozk.commitRequest(request); } @@ -805,19 +802,11 @@ public void applyDelayedPackets() { throw new Exception("changes proposed in reconfig"); } } - // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 - // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) - // we need to make sure that we don't take the snapshot twice. - if (isPreZAB1_0) { - helper.writeState(); - } helper.flushAcks(); self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery - // means this is Zab 1.0 - isPreZAB1_0 = false; LOG.info("Learner received NEWLEADER message"); if (qp.getData() != null && qp.getData().length > 1) { try { From 840808cc9c13b13449148e7d09c3118300c39c6f Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Fri, 3 Feb 2023 08:32:53 +0100 Subject: [PATCH 11/15] Remove unintended double shutdown of ReadOnlyZooKeeperServer --- .../apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java index 3d7a9858cdc..4cab5cfa064 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java @@ -192,7 +192,6 @@ public long getServerId() { @Override public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { - super.shutdown(fullyShutDown); LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); } else { shutdown = true; From 882d6402cdb98eddfd355eb22f814ac9caceefca Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Fri, 3 Feb 2023 08:34:34 +0100 Subject: [PATCH 12/15] Restore longer test timeout --- .../apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java index 056b5b246e0..a17c49205da 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java @@ -49,7 +49,7 @@ public class DIFFSyncConsistencyTest extends QuorumPeerTestBase { private MainThread[] mt = new MainThread[SERVER_COUNT]; @Test - @Timeout(value = 20) + @Timeout(value = 120) public void testInconsistentDueToUncommittedLog() throws Exception { final int LEADER_TIMEOUT_MS = 10_000; final int[] clientPorts = new int[SERVER_COUNT]; From 5f5834b7720b0cd124a87ea0adf94f3618fb0395 Mon Sep 17 00:00:00 2001 From: jonmv Date: Fri, 3 Feb 2023 12:31:50 +0100 Subject: [PATCH 13/15] Simplify sync helper by inlining INFORM action --- .../zookeeper/server/quorum/Learner.java | 29 +++++++------------ 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 035425fd119..0561f0c0cb2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -562,24 +562,24 @@ class SyncHelper { // COMMITs we delay forwarding to the ZK server until sync is done. Deque delayedCommits = new ArrayDeque<>(); - public void syncSnapshot() { + void syncSnapshot() { syncSnapshot = true; } - public void noSnapshot() { + void noSnapshot() { willSnapshot = false; } - public void propose(PacketInFlight pif) { + void propose(PacketInFlight pif) { proposals.add(pif); delayedProposals.add(pif); } - public PacketInFlight nextProposal() { + PacketInFlight nextProposal() { return proposals.peekFirst(); } - public void commit() { + void commit() { PacketInFlight packet = proposals.remove(); if (willSnapshot) { zk.processTxn(packet.hdr, packet.rec); @@ -589,17 +589,7 @@ public void commit() { } } - public void proposeAndCommit(PacketInFlight packet) { - // Should be able to do propose(packet); commit(); because INFORM with non-empty proposals would be an error ... - if (willSnapshot) { - zk.processTxn(packet.hdr, packet.rec); - } else { - delayedProposals.add(packet); - delayedCommits.add(packet.hdr.getZxid()); - } - } - - public void writeState() throws IOException, InterruptedException { + void writeState() throws IOException, InterruptedException { // Ensure all received transaction PROPOSALs are written before we ACK the NEWLEADER, // since this allows the leader to apply those transactions to its served state: if (willSnapshot) { @@ -625,7 +615,7 @@ public void writeState() throws IOException, InterruptedException { } } - public void flushAcks() throws InterruptedException { + void flushAcks() throws InterruptedException { if (zk instanceof FollowerZooKeeperServer) { // The NEWLEADER is ACK'ed, and we can now ACK the PROPOSALs we wrote in writeState. FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; @@ -634,7 +624,7 @@ public void flushAcks() throws InterruptedException { } } - public void applyDelayedPackets() { + void applyDelayedPackets() { // Any delayed packets must now be applied: all PROPOSALs first, then any COMMITs. if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; @@ -792,7 +782,8 @@ public void applyDelayedPackets() { } lastQueued = packet.hdr.getZxid(); } - helper.proposeAndCommit(packet); + helper.propose(packet); + helper.commit(); break; case Leader.UPTODATE: LOG.info("Learner received UPTODATE message"); From 3483274d76e9916954232ff11bbba7bc48639093 Mon Sep 17 00:00:00 2001 From: jonmv Date: Thu, 30 Mar 2023 19:29:45 +0200 Subject: [PATCH 14/15] Improve naming, and remove redundant null assignment --- .../zookeeper/server/SyncRequestProcessor.java | 18 +++++++++--------- .../server/quorum/ObserverZooKeeperServer.java | 2 -- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java index e03e0b07944..bc9c3f482e2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -63,8 +63,8 @@ public FlushRequest() { } } - private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null); - private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null); + private static final Request TURN_FORWARDING_DELAY_ON_REQUEST = new Request(null, 0, 0, 0, null, null); + private static final Request TURN_FORWARDING_DELAY_OFF_REQUEST = new Request(null, 0, 0, 0, null, null); private static class DelayingProcessor implements RequestProcessor, Flushable { private final RequestProcessor next; @@ -90,12 +90,12 @@ public void processRequest(Request request) throws RequestProcessorException { public void shutdown() { next.shutdown(); } - private void close() { + private void startDelaying() { if (delayed == null) { delayed = new ArrayDeque<>(); } } - private void open() throws RequestProcessorException { + private void flushAndStopDelaying() throws RequestProcessorException { if (delayed != null) { for (Request request : delayed) { next.processRequest(request); @@ -224,12 +224,12 @@ public void run() { break; } - if (si == turnForwardingDelayOn) { - nextProcessor.close(); + if (si == TURN_FORWARDING_DELAY_ON_REQUEST) { + nextProcessor.startDelaying(); continue; } - if (si == turnForwardingDelayOff) { - nextProcessor.open(); + if (si == TURN_FORWARDING_DELAY_OFF_REQUEST) { + nextProcessor.flushAndStopDelaying(); continue; } @@ -295,7 +295,7 @@ public void syncFlush() throws InterruptedException { } public void setDelayForwarding(boolean delayForwarding) { - queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff); + queuedRequests.add(delayForwarding ? TURN_FORWARDING_DELAY_ON_REQUEST : TURN_FORWARDING_DELAY_OFF_REQUEST); } private void flush() throws IOException, RequestProcessorException { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java index 508670ec8d8..383aa5de88b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java @@ -106,8 +106,6 @@ protected void setupRequestProcessors() { if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); - } else { - syncProcessor = null; } } From 0507f7c78f7dc1785d3c32cd9defecafd90fed41 Mon Sep 17 00:00:00 2001 From: jonmv Date: Tue, 11 Apr 2023 09:01:56 +0200 Subject: [PATCH 15/15] Wait for correct zxid in unit test --- .../java/org/apache/zookeeper/server/quorum/Zab1_0Test.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 8c5c0138bd4..332d992546d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -958,10 +958,10 @@ protected void setupRequestProcessors() { // Wait for the follower to observe the COMMIT, and apply the PROPOSAL to its data tree. Unfortunately, // there's nothing to do but sleep here, as watches are triggered before the last processed id is updated. long doom = System.currentTimeMillis() + 1000; - while (createZxid1 != follower.fzk.getLastProcessedZxid() && System.currentTimeMillis() < doom) { + while (createZxid2 != follower.fzk.getLastProcessedZxid() && System.currentTimeMillis() < doom) { Thread.sleep(1); } - assertEquals(createZxid1, follower.fzk.getLastProcessedZxid()); + assertEquals(createZxid2, follower.fzk.getLastProcessedZxid()); // State recap: first create is flushing to disk, second is queued for flush; // first and second creates are both applied to data tree. @@ -971,7 +971,7 @@ protected void setupRequestProcessors() { // before taking a new snapshot. // Additionally, any writes in-flight should be allowed to complete _before_ the fast-forward-from-edits, - // done when partially shutting down the learner zoo keeper server to prepare for a new leader election, + // done when partially shutting down the learner zookeeper server to prepare for a new leader election, // to avoid _also_ getting the transactions for those writes in a DIFF from the new leader, appending them // twice (or more) to the transaction log, which would also give digest mismatches when restoring state. // This is not tested here, but fixing ZOOKEEPER-4541 also fixes this problem, by flushing writes first.