From 5aa25620e0189b28d7040305272be2fda28126fb Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 19 Jan 2017 13:50:32 -0600 Subject: [PATCH] ZOOKEEPER-2678: Discovery and Sync can take a very long time on large DBs --- .../zookeeper/server/ZooKeeperServer.java | 9 ++++-- .../zookeeper/server/quorum/Learner.java | 31 ++++++++++++------- .../zookeeper/server/quorum/Zab1_0Test.java | 7 +++++ 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index 62ac466bb1a..2aee59f6845 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -507,9 +507,12 @@ public synchronized void shutdown() { if (firstProcessor != null) { firstProcessor.shutdown(); } - if (zkDb != null) { - zkDb.clear(); - } + + // There is no need to clear the database + // * When a new quorum is established we can still apply the diff + // on top of the same zkDb data + // * If we fetch a new snapshot from leader, the zkDb will be + // cleared anyway before loading the snapshot unregisterJMX(); } diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index 749b2741728..7d33cfb60c1 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -321,13 +321,16 @@ protected void syncWithLeader(long newLeaderZxid) throws IOException, Interrupte QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); - - readPacket(qp); + //In the DIFF case we don't need to do a snapshot because the edits will sync on top of any existing snapshot + // For SNAP and TRUNC the snapshot is needed to save that history + boolean snapshotNeeded = true; + readPacket(qp); LinkedList packetsCommitted = new LinkedList(); LinkedList packetsNotCommitted = new LinkedList(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { - LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid())); + LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); + snapshotNeeded = false; } else if (qp.getType() == Leader.SNAP) { LOG.info("Getting a snapshot from leader"); @@ -364,10 +367,12 @@ else if (qp.getType() == Leader.SNAP) { long lastQueued = 0; - // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0 + // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER) // we need to make sure that we don't take the snapshot twice. - boolean snapshotTaken = false; + boolean isPreZAB1_0 = true; + //If we are not going to take the snapshot be sure the edits are not applied in memory + boolean writeToEditLog = !snapshotNeeded; // we are now going to start getting transactions to apply followed by an UPTODATE outerLoop: while (self.isRunning()) { @@ -387,7 +392,7 @@ else if (qp.getType() == Leader.SNAP) { packetsNotCommitted.add(pif); break; case Leader.COMMIT: - if (!snapshotTaken) { + if (!writeToEditLog) { pif = packetsNotCommitted.peekFirst(); if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid()); @@ -415,7 +420,7 @@ else if (qp.getType() == Leader.SNAP) { + Long.toHexString(lastQueued + 1)); } lastQueued = packet.hdr.getZxid(); - if (!snapshotTaken) { + if (!writeToEditLog) { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { @@ -424,13 +429,14 @@ else if (qp.getType() == Leader.SNAP) { } break; case Leader.UPTODATE: - if (!snapshotTaken) { // true for the pre v1.0 case + if (isPreZAB1_0) { zk.takeSnapshot(); self.setCurrentEpoch(newEpoch); } self.cnxnFactory.setZooKeeperServer(zk); break outerLoop; - case Leader.NEWLEADER: // it will be NEWLEADER in v1.0 + case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery + // means this is Zab 1.0 // Create updatingEpoch file and remove it after current // epoch is set. QuorumPeer.loadDataBase() uses this file to // detect the case where the server was terminated after @@ -441,13 +447,16 @@ else if (qp.getType() == Leader.SNAP) { throw new IOException("Failed to create " + updating.toString()); } - zk.takeSnapshot(); + if (snapshotNeeded) { + zk.takeSnapshot(); + } self.setCurrentEpoch(newEpoch); if (!updating.delete()) { throw new IOException("Failed to delete " + updating.toString()); } - snapshotTaken = true; + writeToEditLog = true; //Anything after this needs to go to the edit log, not applied directly in memory + isPreZAB1_0 = false; writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 52e7d279c7e..b5cfdf3a143 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -839,6 +839,13 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Assert.assertEquals(1, f.self.getAcceptedEpoch()); Assert.assertEquals(1, f.self.getCurrentEpoch()); + //Wait for the edits to be written out + long start = System.currentTimeMillis(); + while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis() - start) < 50) { + Thread.sleep(1); + } + LOG.info("Took < {}ms to sync all edits", System.currentTimeMillis() - start); + Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok