Skip to content

Commit

Permalink
ZOOKEEPER-2678: Discovery and Sync can take a very long time on large…
Browse files Browse the repository at this point in the history
… DBs
  • Loading branch information
Robert (Bobby) Evans committed Jan 30, 2017
1 parent 8771ffd commit f4c5b0e
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,12 @@ public synchronized void shutdown() {
if (firstProcessor != null) {
firstProcessor.shutdown();
}
if (zkDb != null) {
zkDb.clear();
}

// There is no need to clear the database
// * When a new quorum is established we can still apply the diff
// on top of the same zkDb data
// * If we fetch a new snapshot from leader, the zkDb will be
// cleared anyway before loading the snapshot

unregisterJMX();
}
Expand Down
41 changes: 28 additions & 13 deletions src/java/main/org/apache/zookeeper/server/quorum/Learner.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,16 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception{

QuorumVerifier newLeaderQV = null;

readPacket(qp);
//In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
readPacket(qp);
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
snapshotNeeded = false;
}
else if (qp.getType() == Leader.SNAP) {
LOG.info("Getting a snapshot from leader");
Expand Down Expand Up @@ -400,10 +404,13 @@ else if (qp.getType() == Leader.SNAP) {

long lastQueued = 0;

// in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER)
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
// we need to make sure that we don't take the snapshot twice.
boolean snapshotTaken = false;
boolean isPreZAB1_0 = true;
//If we are not going to take the snapshot be sure the transactions are not applied in memory
// but written out to the transaction log
boolean writeToTxnLog = !snapshotNeeded;
// we are now going to start getting transactions to apply followed by an UPTODATE
outerLoop:
while (self.isRunning()) {
Expand Down Expand Up @@ -440,7 +447,7 @@ else if (qp.getType() == Leader.SNAP) {
throw new Exception("changes proposed in reconfig");
}
}
if (!snapshotTaken) {
if (!writeToTxnLog) {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
} else {
Expand Down Expand Up @@ -479,8 +486,7 @@ else if (qp.getType() == Leader.SNAP) {
}
lastQueued = packet.hdr.getZxid();
}

if (!snapshotTaken) {
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
Expand All @@ -498,14 +504,19 @@ else if (qp.getType() == Leader.SNAP) {
throw new Exception("changes proposed in reconfig");
}
}
if (!snapshotTaken) { // true for the pre v1.0 case
zk.takeSnapshot();
if (isPreZAB1_0) {
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
// means this is Zab 1.0
// Create updatingEpoch file and remove it after current
// epoch is set. QuorumPeer.loadDataBase() uses this file to
// detect the case where the server was terminated after
// taking a snapshot but before setting the current epoch.
LOG.info("Learner received NEWLEADER message");
if (qp.getData()!=null && qp.getData().length > 1) {
try {
Expand All @@ -516,10 +527,14 @@ else if (qp.getType() == Leader.SNAP) {
e.printStackTrace();
}
}

if (snapshotNeeded) {
zk.takeSnapshot();
}

zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
snapshotTaken = true;
writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
Expand Down
13 changes: 13 additions & 0 deletions src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -847,12 +847,25 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
Assert.assertEquals(1, f.self.getAcceptedEpoch());
Assert.assertEquals(1, f.self.getCurrentEpoch());

//Wait for the transactions to be written out. The thread that writes them out
// does not send anything back when it is done.
long start = System.currentTimeMillis();
while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
}

Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());

// Make sure the data was recorded in the filesystem ok
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
start = System.currentTimeMillis();
zkDb2.loadDataBase();
while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
zkDb2.loadDataBase();
}
LOG.info("zkdb2 sessions:" + zkDb2.getSessions());
LOG.info("zkdb2 with timeouts:" + zkDb2.getSessionWithTimeOuts());
Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
} finally {
TestUtils.deleteFileRecursively(tmpDir);
Expand Down

0 comments on commit f4c5b0e

Please sign in to comment.