Skip to content

Commit

Permalink
ZOOKEEPER-2678: Discovery and Sync can take a very long time on large…
Browse files Browse the repository at this point in the history
… DBs
  • Loading branch information
Robert (Bobby) Evans committed Jan 26, 2017
1 parent d6bbfd7 commit 5aa2562
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,12 @@ public synchronized void shutdown() {
if (firstProcessor != null) {
firstProcessor.shutdown();
}
if (zkDb != null) {
zkDb.clear();
}

// There is no need to clear the database
// * When a new quorum is established we can still apply the diff
// on top of the same zkDb data
// * If we fetch a new snapshot from leader, the zkDb will be
// cleared anyway before loading the snapshot

unregisterJMX();
}
Expand Down
31 changes: 20 additions & 11 deletions src/java/main/org/apache/zookeeper/server/quorum/Learner.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,16 @@ protected void syncWithLeader(long newLeaderZxid) throws IOException, Interrupte
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

readPacket(qp);
//In the DIFF case we don't need to do a snapshot because the edits will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
readPacket(qp);
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
snapshotNeeded = false;
}
else if (qp.getType() == Leader.SNAP) {
LOG.info("Getting a snapshot from leader");
Expand Down Expand Up @@ -364,10 +367,12 @@ else if (qp.getType() == Leader.SNAP) {

long lastQueued = 0;

// in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER)
// we need to make sure that we don't take the snapshot twice.
boolean snapshotTaken = false;
boolean isPreZAB1_0 = true;
//If we are not going to take the snapshot be sure the edits are not applied in memory
boolean writeToEditLog = !snapshotNeeded;
// we are now going to start getting transactions to apply followed by an UPTODATE
outerLoop:
while (self.isRunning()) {
Expand All @@ -387,7 +392,7 @@ else if (qp.getType() == Leader.SNAP) {
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
if (!snapshotTaken) {
if (!writeToEditLog) {
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
Expand Down Expand Up @@ -415,7 +420,7 @@ else if (qp.getType() == Leader.SNAP) {
+ Long.toHexString(lastQueued + 1));
}
lastQueued = packet.hdr.getZxid();
if (!snapshotTaken) {
if (!writeToEditLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
Expand All @@ -424,13 +429,14 @@ else if (qp.getType() == Leader.SNAP) {
}
break;
case Leader.UPTODATE:
if (!snapshotTaken) { // true for the pre v1.0 case
if (isPreZAB1_0) {
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
}
self.cnxnFactory.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
// means this is Zab 1.0
// Create updatingEpoch file and remove it after current
// epoch is set. QuorumPeer.loadDataBase() uses this file to
// detect the case where the server was terminated after
Expand All @@ -441,13 +447,16 @@ else if (qp.getType() == Leader.SNAP) {
throw new IOException("Failed to create " +
updating.toString());
}
zk.takeSnapshot();
if (snapshotNeeded) {
zk.takeSnapshot();
}
self.setCurrentEpoch(newEpoch);
if (!updating.delete()) {
throw new IOException("Failed to delete " +
updating.toString());
}
snapshotTaken = true;
writeToEditLog = true; //Anything after this needs to go to the edit log, not applied directly in memory
isPreZAB1_0 = false;
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,13 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
Assert.assertEquals(1, f.self.getAcceptedEpoch());
Assert.assertEquals(1, f.self.getCurrentEpoch());

//Wait for the edits to be written out
long start = System.currentTimeMillis();
while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
}
LOG.info("Took < {}ms to sync all edits", System.currentTimeMillis() - start);

Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());

// Make sure the data was recorded in the filesystem ok
Expand Down

0 comments on commit 5aa2562

Please sign in to comment.