diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 19f1fac4536..168238acd52 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -793,19 +793,6 @@ public void startdata() throws IOException, InterruptedException { } public synchronized void startup() { - startupWithServerState(State.RUNNING); - } - - public synchronized void startupWithoutServing() { - startupWithServerState(State.INITIAL); - } - - public synchronized void startServing() { - setState(State.RUNNING); - notifyAll(); - } - - private void startupWithServerState(State state) { if (sessionTracker == null) { createSessionTracker(); } @@ -820,7 +807,7 @@ private void startupWithServerState(State state) { registerMetrics(); - setState(state); + setState(State.RUNNING); requestPathMetricsCollector.start(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index 5af114dccd1..b6766199988 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -89,12 +89,11 @@ public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) { * @param hdr the txn header * @param txn the txn * @param digest the digest of txn - * @return a request moving through a chain of RequestProcessors */ - public Request appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException { - final Request request = buildRequestToProcess(hdr, txn, digest); + public void appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException { + final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); + request.setTxnDigest(digest); getZKDatabase().append(request); - return request; } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index e3bd13d1165..8f3204392fc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -555,8 +555,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { boolean syncSnapshot = false; readPacket(qp); Deque packetsCommitted = new ArrayDeque<>(); - Deque packetsNotCommitted = new ArrayDeque<>(); - Deque requestsToAck = new ArrayDeque<>(); + Deque packetsNotLogged = new ArrayDeque<>(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { @@ -645,11 +644,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { self.setLastSeenQuorumVerifier(qv, true); } - packetsNotCommitted.add(pif); + packetsNotLogged.add(pif); break; case Leader.COMMIT: case Leader.COMMITANDACTIVATE: - pif = packetsNotCommitted.peekFirst(); + pif = packetsNotLogged.peekFirst(); if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) { QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); boolean majorChange = self.processReconfig( @@ -668,7 +667,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { Long.toHexString(pif.hdr.getZxid())); } else { zk.processTxn(pif.hdr, pif.rec); - packetsNotCommitted.remove(); + packetsNotLogged.remove(); } } else { packetsCommitted.add(qp.getZxid()); @@ -710,7 +709,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { // Apply to db directly if we haven't taken the snapshot zk.processTxn(packet.hdr, packet.rec); } else { - packetsNotCommitted.add(packet); + packetsNotLogged.add(packet); packetsCommitted.add(qp.getZxid()); } @@ -753,29 +752,55 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { isPreZAB1_0 = false; // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). - sock.setSoTimeout(self.tickTime * self.syncLimit); - self.setSyncMode(QuorumPeer.SyncMode.NONE); - zk.startupWithoutServing(); - if (zk instanceof FollowerZooKeeperServer) { + if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) { long startTime = Time.currentElapsedTime(); FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { - final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest); - requestsToAck.add(request); + + /* + * @see https://github.com/apache/zookeeper/pull/1848 + * Persist and process the committed txns in "packetsNotLogged" + * according to "packetsCommitted", which have been committed by + * the leader. For these committed proposals, there is no need to + * reply ack. + * + * @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394 + * Keep the outstanding proposals in "packetsNotLogged" to avoid + * NullPointerException when the follower receives COMMIT packet(s) + * right after replying NEWLEADER ack. + */ + while (!packetsCommitted.isEmpty()) { + long zxid = packetsCommitted.removeFirst(); + pif = packetsNotLogged.peekFirst(); + if (pif == null) { + LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid)); + continue; + } else if (pif.hdr.getZxid() != zxid) { + LOG.warn("Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid())); + continue; + } + packetsNotLogged.removeFirst(); + fzk.appendRequest(pif.hdr, pif.rec, pif.digest); + fzk.processTxn(pif.hdr, pif.rec); } - // persist the txns to disk + // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646 + // Make sure to persist the txns to disk before replying NEWLEADER ack. fzk.getZKDatabase().commit(); - LOG.info("{} txns have been persisted and it took {}ms", - packetsNotCommitted.size(), Time.currentElapsedTime() - startTime); - packetsNotCommitted.clear(); + LOG.info("It took {}ms to persist and commit txns in packetsCommitted. " + + "{} outstanding txns left in packetsNotLogged", + Time.currentElapsedTime() - startTime, packetsNotLogged.size()); } - // set the current epoch after all the tnxs are persisted + // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4643 + // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4785 + // Update current epoch after the committed txns are persisted self.setCurrentEpoch(newEpoch); LOG.info("Set the current epoch to {}", newEpoch); + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); - // send NEWLEADER ack after all the tnxs are persisted + // send NEWLEADER ack after the committed txns are persisted writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid)); break; @@ -784,7 +809,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); - zk.startServing(); + zk.startup(); /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and @@ -796,20 +821,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { // We need to log the stuff that came in between the snapshot and the uptodate if (zk instanceof FollowerZooKeeperServer) { - // reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout - // on waiting for a quorum of followers - for (final Request request : requestsToAck) { - final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null); - writePacket(ackPacket, false); - } - writePacket(null, true); - requestsToAck.clear(); - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { + for (PacketInFlight p : packetsNotLogged) { fzk.logRequest(p.hdr, p.rec, p.digest); } - LOG.info("{} txns have been logged asynchronously", packetsNotCommitted.size()); + LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size()); for (Long zxid : packetsCommitted) { fzk.commit(zxid); @@ -819,7 +835,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { // Similar to follower, we need to log requests between the snapshot // and UPTODATE ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { + for (PacketInFlight p : packetsNotLogged) { Long zxid = packetsCommitted.peekFirst(); if (p.hdr.getZxid() != zxid) { // log warning message if there is no matching commit diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 76a678f501c..d374062e293 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -741,8 +741,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) readPacketSkippingPing(ia, qp); assertEquals(Leader.ACKEPOCH, qp.getType()); - assertEquals(0, qp.getZxid()); - assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt()); + assertEquals(ZxidUtils.makeZxid(0, 0), qp.getZxid()); + assertEquals(0, ByteBuffer.wrap(qp.getData()).getInt()); assertEquals(1, f.self.getAcceptedEpoch()); assertEquals(0, f.self.getCurrentEpoch()); @@ -765,36 +765,22 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) qp.setZxid(0); oa.writeRecord(qp, null); - // Read the uptodate ack - readPacketSkippingPing(ia, qp); - assertEquals(Leader.ACK, qp.getType()); - assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - // Get the ack of the new leader readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); assertEquals(1, f.self.getAcceptedEpoch()); assertEquals(1, f.self.getCurrentEpoch()); - - //Wait for the transactions to be written out. The thread that writes them out - // does not send anything back when it is done. - long start = System.currentTimeMillis(); - while (createSessionZxid != f.fzk.getLastProcessedZxid() - && (System.currentTimeMillis() - start) < 50) { - Thread.sleep(1); - } - assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid()); + // Read the uptodate ack + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + // Make sure the data was recorded in the filesystem ok ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); - start = System.currentTimeMillis(); zkDb2.loadDataBase(); - while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) { - Thread.sleep(1); - zkDb2.loadDataBase(); - } LOG.info("zkdb2 sessions:{}", zkDb2.getSessions()); LOG.info("zkdb2 with timeouts:{}", zkDb2.getSessionWithTimeOuts()); assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L)); @@ -820,6 +806,143 @@ private void proposeNewSession(QuorumPacket qp, long zxid, long sessionId) throw }, testData); } + @Test + public void testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File testData) throws Exception { + testFollowerConversation(new FollowerConversation() { + @Override + public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception { + File tmpDir = File.createTempFile("test", "dir", testData); + tmpDir.delete(); + tmpDir.mkdir(); + File logDir = f.fzk.getTxnLogFactory().getDataLogDir().getParentFile(); + File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile(); + //Spy on ZK so we can check if a snapshot happened or not. + f.zk = spy(f.zk); + try { + assertEquals(0, f.self.getAcceptedEpoch()); + assertEquals(0, f.self.getCurrentEpoch()); + + // Setup a database with a single /foo node + ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir)); + final long firstZxid = ZxidUtils.makeZxid(1, 1); + zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null); + Stat stat = new Stat(); + assertEquals("data1", new String(zkDb.getData("/foo", stat, null))); + + QuorumPacket qp = new QuorumPacket(); + readPacketSkippingPing(ia, qp); + assertEquals(Leader.FOLLOWERINFO, qp.getType()); + assertEquals(qp.getZxid(), 0); + LearnerInfo learnInfo = new LearnerInfo(); + ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); + assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getServerid(), 0); + + // We are simulating an established leader, so the epoch is 1 + qp.setType(Leader.LEADERINFO); + qp.setZxid(ZxidUtils.makeZxid(1, 0)); + byte[] protoBytes = new byte[4]; + ByteBuffer.wrap(protoBytes).putInt(0x10000); + qp.setData(protoBytes); + oa.writeRecord(qp, null); + + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACKEPOCH, qp.getType()); + assertEquals(0, qp.getZxid()); + assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt()); + assertEquals(1, f.self.getAcceptedEpoch()); + assertEquals(0, f.self.getCurrentEpoch()); + + // Send the snapshot we created earlier + qp.setType(Leader.SNAP); + qp.setData(new byte[0]); + qp.setZxid(zkDb.getDataTreeLastProcessedZxid()); + oa.writeRecord(qp, null); + zkDb.serializeSnapshot(oa); + oa.writeString("BenWasHere", null); + Thread.sleep(10); //Give it some time to process the snap + //No Snapshot taken yet, the SNAP was applied in memory + verify(f.zk, never()).takeSnapshot(); + + // Leader sends an outstanding proposal + long proposalZxid = ZxidUtils.makeZxid(1, 1001); + proposeSetData(qp, proposalZxid, "data2", 2); + oa.writeRecord(qp, null); + + qp.setType(Leader.NEWLEADER); + qp.setZxid(ZxidUtils.makeZxid(1, 0)); + qp.setData(null); + oa.writeRecord(qp, null); + + // Get the ack of the new leader + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + assertEquals(1, f.self.getAcceptedEpoch()); + assertEquals(1, f.self.getCurrentEpoch()); + //Make sure that we did take the snapshot now + verify(f.zk).takeSnapshot(true); + assertEquals(firstZxid, f.fzk.getLastProcessedZxid()); + + // The outstanding proposal has not been persisted yet + ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); + long lastZxid = zkDb2.loadDataBase(); + assertEquals("data1", new String(zkDb2.getData("/foo", stat, null))); + assertEquals(firstZxid, lastZxid); + + TrackerWatcher watcher = new TrackerWatcher(); + + // The change should not have happened yet + assertEquals("data1", new String(f.fzk.getZKDatabase().getData("/foo", stat, watcher))); + + // Leader commits proposalZxid right after it sends NEWLEADER to follower + qp.setType(Leader.COMMIT); + qp.setZxid(proposalZxid); + qp.setData(null); + oa.writeRecord(qp, null); + + qp.setType(Leader.UPTODATE); + qp.setZxid(0); + qp.setData(null); + oa.writeRecord(qp, null); + + // Read the uptodate ack + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(proposalZxid, qp.getZxid()); + + // The change should happen now + watcher.waitForChange(); + assertEquals("data2", new String(f.fzk.getZKDatabase().getData("/foo", stat, null))); + + // check and make sure the change is persisted + zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); + lastZxid = zkDb2.loadDataBase(); + assertEquals("data2", new String(zkDb2.getData("/foo", stat, null))); + assertEquals(proposalZxid, lastZxid); + } finally { + TestUtils.deleteFileRecursively(tmpDir); + } + } + + private void proposeSetData(QuorumPacket qp, long zxid, String data, int version) throws IOException { + qp.setType(Leader.PROPOSAL); + qp.setZxid(zxid); + TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData); + SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), version); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputArchive boa = BinaryOutputArchive.getArchive(baos); + boa.writeRecord(hdr, null); + boa.writeRecord(sdt, null); + qp.setData(baos.toByteArray()); + } + }, testData); + } + @Test public void testNormalRun(@TempDir File testData) throws Exception { testLeaderConversation(new LeaderConversation() {