diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java index 4df319f86a4..bc9c3f482e2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -55,6 +56,55 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private static final Request REQUEST_OF_DEATH = Request.requestOfDeath; + private static class FlushRequest extends Request { + private final CountDownLatch latch = new CountDownLatch(1); + public FlushRequest() { + super(null, 0, 0, 0, null, null); + } + } + + private static final Request TURN_FORWARDING_DELAY_ON_REQUEST = new Request(null, 0, 0, 0, null, null); + private static final Request TURN_FORWARDING_DELAY_OFF_REQUEST = new Request(null, 0, 0, 0, null, null); + + private static class DelayingProcessor implements RequestProcessor, Flushable { + private final RequestProcessor next; + private Queue delayed = null; + private DelayingProcessor(RequestProcessor next) { + this.next = next; + } + @Override + public void flush() throws IOException { + if (delayed == null && next instanceof Flushable) { + ((Flushable) next).flush(); + } + } + @Override + public void processRequest(Request request) throws RequestProcessorException { + if (delayed == null) { + next.processRequest(request); + } else { + delayed.add(request); + } + } + @Override + public void shutdown() { + next.shutdown(); + } + private void startDelaying() { + if (delayed == null) { + delayed = new ArrayDeque<>(); + } + } + private void flushAndStopDelaying() throws RequestProcessorException { + if (delayed != null) { + for (Request request : delayed) { + next.processRequest(request); + } + delayed = null; + } + } + } + /** The number of log entries to log before starting a snapshot */ private static int snapCount = ZooKeeperServer.getSnapCount(); @@ -75,7 +125,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private final ZooKeeperServer zks; - private final RequestProcessor nextProcessor; + private final DelayingProcessor nextProcessor; /** * Transactions that have been written and are waiting to be flushed to @@ -88,7 +138,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener()); this.zks = zks; - this.nextProcessor = nextProcessor; + this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor); this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize()); } @@ -174,6 +224,21 @@ public void run() { break; } + if (si == TURN_FORWARDING_DELAY_ON_REQUEST) { + nextProcessor.startDelaying(); + continue; + } + if (si == TURN_FORWARDING_DELAY_OFF_REQUEST) { + nextProcessor.flushAndStopDelaying(); + continue; + } + + if (si instanceof FlushRequest) { + flush(); + ((FlushRequest) si).latch.countDown(); + continue; + } + long startProcessTime = Time.currentElapsedTime(); ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); @@ -206,9 +271,7 @@ public void run() { // and there are no pending flushes (writes), then just pass this to the next processor if (nextProcessor != null) { nextProcessor.processRequest(si); - if (nextProcessor instanceof Flushable) { - ((Flushable) nextProcessor).flush(); - } + nextProcessor.flush(); } continue; } @@ -224,6 +287,17 @@ public void run() { LOG.info("SyncRequestProcessor exited!"); } + /** Flushes all pending writes, and waits for this to complete. */ + public void syncFlush() throws InterruptedException { + FlushRequest marker = new FlushRequest(); + queuedRequests.add(marker); + marker.latch.await(); + } + + public void setDelayForwarding(boolean delayForwarding) { + queuedRequests.add(delayForwarding ? TURN_FORWARDING_DELAY_ON_REQUEST : TURN_FORWARDING_DELAY_OFF_REQUEST); + } + private void flush() throws IOException, RequestProcessorException { if (this.toFlush.isEmpty()) { return; @@ -244,9 +318,7 @@ private void flush() throws IOException, RequestProcessorException { ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); this.nextProcessor.processRequest(i); } - if (this.nextProcessor instanceof Flushable) { - ((Flushable) this.nextProcessor).flush(); - } + nextProcessor.flush(); } lastFlushTime = Time.currentElapsedTime(); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index f6c2b93ebf5..89e59f137f2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -881,7 +881,7 @@ public synchronized void shutdown(boolean fullyShutDown) { // * If we fetch a new snapshot from leader, the zkDb will be // cleared anyway before loading the snapshot try { - //This will fast forward the database to the latest recorded transactions + // This will fast-forward the database to the latest recorded transactions zkDb.fastForwardDataBase(); } catch (IOException e) { LOG.error("Error updating DB", e); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index 5a51bc23d7e..7046d9a1fcb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -155,11 +155,11 @@ protected void unregisterMetrics() { } @Override - public synchronized void shutdown() { + public synchronized void shutdown(boolean fullyShutDown) { if (containerManager != null) { containerManager.stop(); } - super.shutdown(); + super.shutdown(fullyShutDown); } @Override diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 1818bf9bb95..0561f0c0cb2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -543,35 +543,140 @@ protected long registerWithLeader(int pktType) throws IOException { * @throws InterruptedException */ protected void syncWithLeader(long newLeaderZxid) throws Exception { - QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); - QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); - QuorumVerifier newLeaderQV = null; - // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot - // For SNAP and TRUNC the snapshot is needed to save that history - boolean snapshotNeeded = true; - boolean syncSnapshot = false; + class SyncHelper { + + // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot. + // For SNAP and TRUNC the snapshot is needed to save that history. + boolean willSnapshot = true; + boolean syncSnapshot = false; + + // PROPOSALs received during sync, for matching up with COMMITs. + Deque proposals = new ArrayDeque<>(); + + // PROPOSALs we delay forwarding to the ZK server until sync is done. + Deque delayedProposals = new ArrayDeque<>(); + + // COMMITs we delay forwarding to the ZK server until sync is done. + Deque delayedCommits = new ArrayDeque<>(); + + void syncSnapshot() { + syncSnapshot = true; + } + + void noSnapshot() { + willSnapshot = false; + } + + void propose(PacketInFlight pif) { + proposals.add(pif); + delayedProposals.add(pif); + } + + PacketInFlight nextProposal() { + return proposals.peekFirst(); + } + + void commit() { + PacketInFlight packet = proposals.remove(); + if (willSnapshot) { + zk.processTxn(packet.hdr, packet.rec); + delayedProposals.remove(); + } else { + delayedCommits.add(packet.hdr.getZxid()); + } + } + + void writeState() throws IOException, InterruptedException { + // Ensure all received transaction PROPOSALs are written before we ACK the NEWLEADER, + // since this allows the leader to apply those transactions to its served state: + if (willSnapshot) { + zk.takeSnapshot(syncSnapshot); // either, the snapshot contains the transactions, + willSnapshot = false; // but anything after this needs to go to the transaction log; or + } + + self.setCurrentEpoch(newEpoch); + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); + zk.startupWithoutServing(); + + // if we're a follower, we need to ensure the transactions are safely logged before ACK'ing. + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + // The leader expects the NEWLEADER ACK to precede all the PROPOSAL ACKs, so we only write them first. + fzk.syncProcessor.setDelayForwarding(true); + for (PacketInFlight p : delayedProposals) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + delayedProposals.clear(); + fzk.syncProcessor.syncFlush(); + } + } + + void flushAcks() throws InterruptedException { + if (zk instanceof FollowerZooKeeperServer) { + // The NEWLEADER is ACK'ed, and we can now ACK the PROPOSALs we wrote in writeState. + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(false); + fzk.syncProcessor.syncFlush(); // Ensure these are all ACK'ed before the UPTODATE ACK. + } + } + + void applyDelayedPackets() { + // Any delayed packets must now be applied: all PROPOSALs first, then any COMMITs. + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + for (PacketInFlight p : delayedProposals) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + for (Long zxid : delayedCommits) { + fzk.commit(zxid); + } + } else if (zk instanceof ObserverZooKeeperServer) { + ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; + for (PacketInFlight p : delayedProposals) { + Long zxid = delayedCommits.peekFirst(); + if (p.hdr.getZxid() != zxid) { + // log warning message if there is no matching commit + // old leader send outstanding proposal to observer + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(zxid), + Long.toHexString(p.hdr.getZxid())); + continue; + } + delayedCommits.remove(); + Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1); + request.setTxnDigest(p.digest); + ozk.commitRequest(request); + } + } else { + // New server type need to handle in-flight packets + throw new UnsupportedOperationException("Unknown server type"); + } + } + + } + + SyncHelper helper = new SyncHelper(); + QuorumPacket qp = new QuorumPacket(); readPacket(qp); - Deque packetsCommitted = new ArrayDeque<>(); - Deque packetsNotCommitted = new ArrayDeque<>(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); self.setSyncMode(QuorumPeer.SyncMode.DIFF); if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) { LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading."); - snapshotNeeded = true; - syncSnapshot = true; + helper.syncSnapshot(); } else { - snapshotNeeded = false; + helper.noSnapshot(); } } else if (qp.getType() == Leader.SNAP) { self.setSyncMode(QuorumPeer.SyncMode.SNAP); LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid())); // The leader is going to dump the database - // db is clear as part of deserializeSnapshot() zk.getZKDatabase().deserializeSnapshot(leaderIs); // ZOOKEEPER-2819: overwrite config node content extracted // from leader snapshot with local config, to avoid potential @@ -587,20 +692,18 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); - // immediately persist the latest snapshot when there is txn log gap - syncSnapshot = true; + // Immediately persist the latest snapshot when there is txn log gap + helper.syncSnapshot(); } else if (qp.getType() == Leader.TRUNC) { - //we need to truncate the log to the lastzxid of the leader + // We need to truncate the log to the lastZxid of the leader self.setSyncMode(QuorumPeer.SyncMode.TRUNC); LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid())); boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); if (!truncated) { - // not able to truncate the log LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid())); ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); - } else { LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp)); ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); @@ -608,17 +711,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); zk.createSessionTracker(); - long lastQueued = 0; - // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 - // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) - // we need to make sure that we don't take the snapshot twice. - boolean isPreZAB1_0 = true; - //If we are not going to take the snapshot be sure the transactions are not applied in memory - // but written out to the transaction log - boolean writeToTxnLog = !snapshotNeeded; - TxnLogEntry logEntry; // we are now going to start getting transactions to apply followed by an UPTODATE + long lastQueued = 0; + TxnLogEntry logEntry; outerLoop: while (self.isRunning()) { readPacket(qp); @@ -642,54 +738,36 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8)); self.setLastSeenQuorumVerifier(qv, true); } - - packetsNotCommitted.add(pif); + helper.propose(pif); break; case Leader.COMMIT: case Leader.COMMITANDACTIVATE: - pif = packetsNotCommitted.peekFirst(); - if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) { - QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); - boolean majorChange = self.processReconfig( - qv, - ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), - true); - if (majorChange) { - throw new Exception("changes proposed in reconfig"); - } - } - if (!writeToTxnLog) { - if (pif.hdr.getZxid() != qp.getZxid()) { - LOG.warn( - "Committing 0x{}, but next proposal is 0x{}", - Long.toHexString(qp.getZxid()), - Long.toHexString(pif.hdr.getZxid())); - } else { - zk.processTxn(pif.hdr, pif.rec); - packetsNotCommitted.remove(); - } + pif = helper.nextProposal(); + if (pif.hdr.getZxid() != qp.getZxid()) { + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(qp.getZxid()), + Long.toHexString(pif.hdr.getZxid())); } else { - packetsCommitted.add(qp.getZxid()); + if (qp.getType() == Leader.COMMITANDACTIVATE) { + tryReconfig(pif, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid()); + } + helper.commit(); } break; case Leader.INFORM: case Leader.INFORMANDACTIVATE: PacketInFlight packet = new PacketInFlight(); - if (qp.getType() == Leader.INFORMANDACTIVATE) { ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); long suggestedLeaderId = buffer.getLong(); - byte[] remainingdata = new byte[buffer.remaining()]; - buffer.get(remainingdata); - logEntry = SerializeUtils.deserializeTxn(remainingdata); + byte[] remainingData = new byte[buffer.remaining()]; + buffer.get(remainingData); + logEntry = SerializeUtils.deserializeTxn(remainingData); packet.hdr = logEntry.getHeader(); packet.rec = logEntry.getTxn(); packet.digest = logEntry.getDigest(); - QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8)); - boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); - if (majorChange) { - throw new Exception("changes proposed in reconfig"); - } + tryReconfig(packet, suggestedLeaderId, qp.getZxid()); } else { logEntry = SerializeUtils.deserializeTxn(qp.getData()); packet.rec = logEntry.getTxn(); @@ -704,14 +782,8 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } lastQueued = packet.hdr.getZxid(); } - if (!writeToTxnLog) { - // Apply to db directly if we haven't taken the snapshot - zk.processTxn(packet.hdr, packet.rec); - } else { - packetsNotCommitted.add(packet); - packetsCommitted.add(qp.getZxid()); - } - + helper.propose(packet); + helper.commit(); break; case Leader.UPTODATE: LOG.info("Learner received UPTODATE message"); @@ -721,15 +793,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { throw new Exception("changes proposed in reconfig"); } } - if (isPreZAB1_0) { - zk.takeSnapshot(syncSnapshot); - self.setCurrentEpoch(newEpoch); - } + helper.flushAcks(); self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery - // means this is Zab 1.0 LOG.info("Learner received NEWLEADER message"); if (qp.getData() != null && qp.getData().length > 1) { try { @@ -741,32 +809,13 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } } - if (snapshotNeeded) { - zk.takeSnapshot(syncSnapshot); - } - - self.setCurrentEpoch(newEpoch); - writeToTxnLog = true; - //Anything after this needs to go to the transaction log, not applied directly in memory - isPreZAB1_0 = false; - - // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). - sock.setSoTimeout(self.tickTime * self.syncLimit); - self.setSyncMode(QuorumPeer.SyncMode.NONE); - zk.startupWithoutServing(); - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { - fzk.logRequest(p.hdr, p.rec, p.digest); - } - packetsNotCommitted.clear(); - } - + helper.writeState(); writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } } } + QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); zk.startServing(); @@ -779,38 +828,14 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { */ self.updateElectionVote(newEpoch); - // We need to log the stuff that came in between the snapshot and the uptodate - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { - fzk.logRequest(p.hdr, p.rec, p.digest); - } - for (Long zxid : packetsCommitted) { - fzk.commit(zxid); - } - } else if (zk instanceof ObserverZooKeeperServer) { - // Similar to follower, we need to log requests between the snapshot - // and UPTODATE - ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { - Long zxid = packetsCommitted.peekFirst(); - if (p.hdr.getZxid() != zxid) { - // log warning message if there is no matching commit - // old leader send outstanding proposal to observer - LOG.warn( - "Committing 0x{}, but next proposal is 0x{}", - Long.toHexString(zxid), - Long.toHexString(p.hdr.getZxid())); - continue; - } - packetsCommitted.remove(); - Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1); - request.setTxnDigest(p.digest); - ozk.commitRequest(request); - } - } else { - // New server type need to handle in-flight packets - throw new UnsupportedOperationException("Unknown server type"); + helper.applyDelayedPackets(); + } + + private void tryReconfig(PacketInFlight pif, long newLeader, long zxid) throws Exception { + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig(qv, newLeader, zxid, true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java index cab95b05621..19cd10ba2ff 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java @@ -152,24 +152,30 @@ protected void unregisterJMX(Learner peer) { } @Override - public synchronized void shutdown() { + public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - return; + } else { + LOG.info("Shutting down"); + try { + if (syncProcessor != null) { + // Shutting down the syncProcessor here, first, ensures queued transactions here are written to + // permanent storage, which ensures that crash recovery data is consistent with what is used for a + // leader election immediately following shutdown, because of the old leader going down; and also + // that any state on its way to being written is also loaded in the potential call to + // fast-forward-from-edits, in super.shutdown(...), so we avoid getting a DIFF from the new leader + // that contains entries we have already written to our transaction log. + syncProcessor.shutdown(); + } + } catch (Exception e) { + LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e); + } } - LOG.info("Shutting down"); try { - super.shutdown(); + super.shutdown(fullyShutDown); } catch (Exception e) { LOG.warn("Ignoring unexpected exception during shutdown", e); } - try { - if (syncProcessor != null) { - syncProcessor.shutdown(); - } - } catch (Exception e) { - LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e); - } } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java index a41a9187743..383aa5de88b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java @@ -73,7 +73,7 @@ public Learner getLearner() { * @param request */ public void commitRequest(Request request) { - if (syncRequestProcessorEnabled) { + if (syncProcessor != null) { // Write to txnlog and take periodic snapshot syncProcessor.processRequest(request); } @@ -127,18 +127,6 @@ public String getState() { return "observer"; } - @Override - public synchronized void shutdown() { - if (!canShutdown()) { - LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - return; - } - super.shutdown(); - if (syncRequestProcessorEnabled && syncProcessor != null) { - syncProcessor.shutdown(); - } - } - @Override public void dumpMonitorValues(BiConsumer response) { super.dumpMonitorValues(response); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 220d813f42b..b90fa84b8c7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -2238,7 +2238,6 @@ public long getAcceptedEpoch() throws IOException { public void setCurrentEpoch(long e) throws IOException { writeLongToFile(CURRENT_EPOCH_FILENAME, e); currentEpoch = e; - } public void setAcceptedEpoch(long e) throws IOException { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java index d2f6b39b69a..4cab5cfa064 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java @@ -190,23 +190,22 @@ public long getServerId() { } @Override - public synchronized void shutdown() { + public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - return; - } - shutdown = true; - unregisterJMX(this); + } else { + shutdown = true; + unregisterJMX(this); - // set peer's server to null - self.setZooKeeperServer(null); - // clear all the connections - self.closeAllConnections(); - - self.adminServer.setZooKeeperServer(null); + // set peer's server to null + self.setZooKeeperServer(null); + // clear all the connections + self.closeAllConnections(); + self.adminServer.setZooKeeperServer(null); + } // shutdown the server itself - super.shutdown(); + super.shutdown(fullyShutDown); } @Override diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java index 8218ddae4ae..d65ead216f0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -20,6 +20,7 @@ import java.io.Flushable; import java.io.IOException; +import java.net.Socket; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; @@ -31,7 +32,7 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable { private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class); - Learner learner; + final Learner learner; SendAckRequestProcessor(Learner peer) { this.learner = peer; @@ -64,7 +65,8 @@ public void flush() throws IOException { } catch (IOException e) { LOG.warn("Closing connection to leader, exception during packet send", e); try { - if (!learner.sock.isClosed()) { + Socket socket = learner.sock; + if (socket != null && !socket.isClosed()) { learner.sock.close(); } } catch (IOException e1) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java index 9b9ea55f3f6..a17c49205da 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java @@ -251,7 +251,7 @@ void readPacket(QuorumPacket pp) throws IOException { super.readPacket(pp); if (injectError && pp.getType() == Leader.UPTODATE) { String type = LearnerHandler.packetToString(pp); - throw new SocketTimeoutException("Socket timeout while reading the packet for operation " + throw new SocketTimeoutException("TEST CODE Socket timeout while reading the packet for operation " + type); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 3bdbcd908dc..332d992546d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -23,6 +23,7 @@ import static org.apache.zookeeper.server.quorum.ZabUtils.createMockLeader; import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -35,13 +36,18 @@ import java.io.EOFException; import java.io.File; import java.io.FileReader; +import java.io.Flushable; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; @@ -51,11 +57,16 @@ import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.ByteBufferInputStream; import org.apache.zookeeper.server.ByteBufferOutputStream; import org.apache.zookeeper.server.DataTree; +import org.apache.zookeeper.server.FinalRequestProcessor; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; @@ -701,6 +712,309 @@ private void proposeSetData(QuorumPacket qp, long zxid, String data, int version }); } + /** + * Tests a follower that has queued transactions in SyncRequestProcessor that are also already + * committed, with leader getting quorum for those transactions elsewhere in the ensemble, and + * then that the leader shuts down, triggering a new leader election, and partial resetting of + * state in the follower. + * In particular, this test was written to verify a bug where LearnerZooKeeperServer was not + * shut down, because shutdown() was erroneously called on the super class ZooKeeperServer, + * which led to its SyncRequestProcessor not being flushed during shutdown, and any queued + * transactions lost. This would only happen if the SyncRequestProcessor also crashed; this + * would happen as a consequence of the leader going down, causing the SendAckRequestProcessor + * to throw, and kill the sync thread. + * In the subsequent leader election, the quorum peer would use the committed state, even though + * this was not yet flushed to persistent storage, and never would be, after the sync thread died. + * If the correct server had been shut down, the queued transactions would instead either be + * flushed to persistent storage when the quorum peer shut down the old follower, or this would + * fail, causing state to be recreated from whatever state was already flushed, which again would + * be corrected in a DIFF from the new leader. + */ + @Test + public void testFollowerWithPendingSyncsOnLeaderReElection() throws Exception { + + CountDownLatch followerSetUp = new CountDownLatch(1); + + class BlockingRequestProcessor implements RequestProcessor, Flushable { + final Phaser phaser = new Phaser(1); // SyncRequestProcessor; test thread will register later. + + final SendAckRequestProcessor nextProcessor; // SendAckRequestProcessor + + BlockingRequestProcessor(SendAckRequestProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public void processRequest(Request request) throws RequestProcessorException { + nextProcessor.processRequest(request); + } + + @Override + public void shutdown() { + phaser.forceTermination(); + nextProcessor.shutdown(); + } + + @Override + public void flush() throws IOException { + phaser.arriveAndAwaitAdvance(); // Let test thread know we're flushing. + phaser.arriveAndAwaitAdvance(); // Let test thread do more stuff while we wait here, simulating slow fsync, etc.. + nextProcessor.flush(); + } + + } + + class BlockingFollowerZooKeeperServer extends FollowerZooKeeperServer { + + BlockingRequestProcessor blocker; + + BlockingFollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { + super(logFactory, self, zkDb); + } + + @Override + protected void setupRequestProcessors() { + RequestProcessor finalProcessor = new FinalRequestProcessor(this); + commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); + commitProcessor.start(); + firstProcessor = new FollowerRequestProcessor(this, commitProcessor); + ((FollowerRequestProcessor) firstProcessor).start(); + blocker = new BlockingRequestProcessor(new SendAckRequestProcessor(getFollower())); + syncProcessor = new SyncRequestProcessor(this, blocker); + syncProcessor.start(); + followerSetUp.countDown(); + } + + } + + File followerDir = File.createTempFile("test", "dir", testData); + assertTrue(followerDir.delete()); + assertTrue(followerDir.mkdir()); + + File leaderDir = File.createTempFile("test", "dir", testData); + assertTrue(leaderDir.delete()); + assertTrue(leaderDir.mkdir()); + + Thread followerThread = null; + ConversableFollower follower = null; + QuorumPeer peer = null; + BlockingRequestProcessor blocker = null; + + try (ServerSocket ss = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"))) { + peer = createQuorumPeer(followerDir); + + FileTxnSnapLog logFactory = new FileTxnSnapLog(followerDir, followerDir); + peer.setTxnFactory(logFactory); + ZKDatabase zkDb = new ZKDatabase(logFactory); + BlockingFollowerZooKeeperServer zk = new BlockingFollowerZooKeeperServer(logFactory, peer, zkDb); + peer.setZKDatabase(zkDb); + follower = new ConversableFollower(peer, zk); + follower.setLeaderQuorumServer(new QuorumServer(1, (InetSocketAddress) ss.getLocalSocketAddress())); + peer.follower = follower; + + CompletableFuture followerExit = new CompletableFuture<>(); + final Follower followerForThread = follower; + followerThread = new Thread(() -> { + try { + followerForThread.followLeader(); + followerExit.complete(null); + } catch (Exception e) { + LOG.warn("Unexpected exception in follower thread", e); + followerExit.complete(e); + } + }); + followerThread.start(); + + Socket leaderSocket = ss.accept(); + InputArchive ia = BinaryInputArchive.getArchive(leaderSocket.getInputStream()); + OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket.getOutputStream()); + + assertEquals(0, follower.self.getAcceptedEpoch()); + assertEquals(0, follower.self.getCurrentEpoch()); + + // Set up a database with a single /foo node, on the leader + final long firstZxid = ZxidUtils.makeZxid(1, 1); + ZKDatabase leaderZkDb = new ZKDatabase(new FileTxnSnapLog(leaderDir, leaderDir)); + leaderZkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, OpCode.create), new CreateTxn("/foo", "data1".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1), null); + Stat stat = new Stat(); + assertEquals("data1", new String(leaderZkDb.getData("/foo", stat, null))); + + QuorumPacket qp = new QuorumPacket(); + readPacketSkippingPing(ia, qp); + assertEquals(Leader.FOLLOWERINFO, qp.getType()); + assertEquals(qp.getZxid(), 0); + LearnerInfo learnInfo = new LearnerInfo(); + ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); + assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getServerid(), 0); + + // We are simulating an established leader, so the epoch is 1 + qp.setType(Leader.LEADERINFO); + qp.setZxid(ZxidUtils.makeZxid(1, 0)); + byte[] protoBytes = new byte[4]; + ByteBuffer.wrap(protoBytes).putInt(0x10000); + qp.setData(protoBytes); + oa.writeRecord(qp, null); + + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACKEPOCH, qp.getType()); + assertEquals(0, qp.getZxid()); + assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt()); + assertEquals(1, follower.self.getAcceptedEpoch()); + assertEquals(0, follower.self.getCurrentEpoch()); + + // Send a diff with a single PROPOSAL, to be COMMITTed after NEWLEADER + qp.setType(Leader.DIFF); + qp.setData(new byte[0]); + qp.setZxid(leaderZkDb.getDataTreeLastProcessedZxid()); + oa.writeRecord(qp, null); + + long createZxid0 = ZxidUtils.makeZxid(1, 2); + qp.setType(Leader.PROPOSAL); + qp.setZxid(createZxid0); + TxnHeader hdr = new TxnHeader(13, 1313, createZxid0, 33, OpCode.create); + CreateTxn ct = new CreateTxn("/bar", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputArchive boa = BinaryOutputArchive.getArchive(baos); + boa.writeRecord(hdr, null); + boa.writeRecord(ct, null); + qp.setData(baos.toByteArray()); + oa.writeRecord(qp, null); + + // Required for the ZK server to start up. + qp.setType(Leader.NEWLEADER); + qp.setZxid(ZxidUtils.makeZxid(1, 0)); + qp.setData(null); + oa.writeRecord(qp, null); + + // Quorum was acquired for the previous PROPOSAL, which is now COMMITTed. + qp.setType(Leader.COMMIT); + qp.setZxid(createZxid0); + oa.writeRecord(qp, null); + + qp.setType(Leader.UPTODATE); + qp.setZxid(0); + oa.writeRecord(qp, null); + + // Get the ACK of the new leader. + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + assertEquals(1, follower.self.getAcceptedEpoch()); + assertEquals(1, follower.self.getCurrentEpoch()); + + // Read the PROPOSAL ack. + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(createZxid0, qp.getZxid()); + + // Read the UPTODATE ack. + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + + // The follower now starts following the leader. + // We send a PROPOSAL and a COMMIT, and wait for the transaction to be flushed by SyncRequestProcessor. + blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker; + blocker.phaser.register(); + long createZxid1 = ZxidUtils.makeZxid(1, 3); + qp.setType(Leader.PROPOSAL); + qp.setZxid(createZxid1); + hdr = new TxnHeader(13, 1313, createZxid1, 33, OpCode.create); + ct = new CreateTxn("/bar", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1); + baos = new ByteArrayOutputStream(); + boa = BinaryOutputArchive.getArchive(baos); + boa.writeRecord(hdr, null); + boa.writeRecord(ct, null); + qp.setData(baos.toByteArray()); + oa.writeRecord(qp, null); + + qp.setType(Leader.COMMIT); + qp.setZxid(createZxid1); + oa.writeRecord(qp, null); + + // Wait for "fsync" to begin. + assertTrue(followerSetUp.await(10, TimeUnit.SECONDS)); + blocker.phaser.arriveAndAwaitAdvance(); + + // Now we send another PROPOSAL and COMMIT, and wait for them to be applied to the data tree. + // They will not be attempted flushed yet, because the ongoing "fsync" is slow (waiting on the phaser). + long createZxid2 = ZxidUtils.makeZxid(1, 4); + qp.setType(Leader.PROPOSAL); + qp.setZxid(createZxid2); + hdr = new TxnHeader(13, 1314, createZxid2, 34, OpCode.create); + ct = new CreateTxn("/baz", "bye".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1); + baos = new ByteArrayOutputStream(); + boa = BinaryOutputArchive.getArchive(baos); + boa.writeRecord(hdr, null); + boa.writeRecord(ct, null); + qp.setData(baos.toByteArray()); + oa.writeRecord(qp, null); + + qp.setType(Leader.COMMIT); + qp.setZxid(createZxid2); + oa.writeRecord(qp, null); + + // Wait for the follower to observe the COMMIT, and apply the PROPOSAL to its data tree. Unfortunately, + // there's nothing to do but sleep here, as watches are triggered before the last processed id is updated. + long doom = System.currentTimeMillis() + 1000; + while (createZxid2 != follower.fzk.getLastProcessedZxid() && System.currentTimeMillis() < doom) { + Thread.sleep(1); + } + assertEquals(createZxid2, follower.fzk.getLastProcessedZxid()); + + // State recap: first create is flushing to disk, second is queued for flush; + // first and second creates are both applied to data tree. + // Leader now goes down, and signals this by closing its socket. The follower will then initiate a new + // leader election, where it is critical that its "last seen transaction" is indeed written; + // otherwise, any transactions queued for flushing to disk will be lost if the follower restarts again + // before taking a new snapshot. + + // Additionally, any writes in-flight should be allowed to complete _before_ the fast-forward-from-edits, + // done when partially shutting down the learner zookeeper server to prepare for a new leader election, + // to avoid _also_ getting the transactions for those writes in a DIFF from the new leader, appending them + // twice (or more) to the transaction log, which would also give digest mismatches when restoring state. + // This is not tested here, but fixing ZOOKEEPER-4541 also fixes this problem, by flushing writes first. + + // Kill the leader + leaderSocket.close(); + followerExit.get(); // This closes the socket SendAckRequestProcessor uses, and crashes the SyncRequestProcessor. + blocker.phaser.awaitAdvance(blocker.phaser.arriveAndDeregister()); // Let the in-flight "fsync" complete, and crash (above). + + // A real QuorumPeer would now shut down the follower, and proceed to a new leader election. + follower.shutdown(); + + // The sync processor _should_ be dead now. Prior to the resolution of ZOOKEEPER-4541, it would only be + // dead because it had crashed, which was a bug in itself. + follower.fzk.syncProcessor.join(1000); + assertFalse(follower.fzk.syncProcessor.isAlive()); + + // Make sure the recorded data matches what we'll use for leader election. + File logDir = follower.fzk.getTxnLogFactory().getDataDir().getParentFile(); + File snapDir = follower.fzk.getTxnLogFactory().getSnapDir().getParentFile(); + ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); + zkDb2.loadDataBase(); + assertEquals(createZxid2, zkDb.getDataTreeLastProcessedZxid(), "last create zxid is used for leader election"); + assertEquals(createZxid2, zkDb2.getDataTreeLastProcessedZxid(), "last create zxid is written to persistent storage"); + } finally { + if (blocker != null) { + blocker.phaser.forceTermination(); + } + if (follower != null) { + follower.shutdown(); + } + if (followerThread != null) { + followerThread.interrupt(); + followerThread.join(); + } + if (peer != null) { + peer.shutdown(); + } + TestUtils.deleteFileRecursively(leaderDir); + TestUtils.deleteFileRecursively(followerDir); + } + } + @Test public void testNormalFollowerRunWithDiff() throws Exception { testFollowerConversation(new FollowerConversation() { @@ -767,17 +1081,22 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) qp.setZxid(0); oa.writeRecord(qp, null); - // Read the uptodate ack + // Get the ack of the new leader readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + assertEquals(1, f.self.getAcceptedEpoch()); + assertEquals(1, f.self.getCurrentEpoch()); - // Get the ack of the new leader + // Read the create session ack + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(createSessionZxid, qp.getZxid()); + + // Read the uptodate ack readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(1, f.self.getAcceptedEpoch()); - assertEquals(1, f.self.getCurrentEpoch()); //Wait for the transactions to be written out. The thread that writes them out // does not send anything back when it is done.