diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java index 4df319f86a4..6132fb00966 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -55,6 +56,55 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private static final Request REQUEST_OF_DEATH = Request.requestOfDeath; + private static class FlushRequest extends Request { + private final CountDownLatch latch = new CountDownLatch(1); + public FlushRequest() { + super(null, 0, 0, 0, null, null); + } + } + + private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null); + private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null); + + private static class DelayingProcessor implements RequestProcessor, Flushable { + private final RequestProcessor next; + private Queue delayed = null; + private DelayingProcessor(RequestProcessor next) { + this.next = next; + } + @Override + public void flush() throws IOException { + if (delayed == null && next instanceof Flushable) { + ((Flushable) next).flush(); + } + } + @Override + public void processRequest(Request request) throws RequestProcessorException { + if (delayed == null) { + next.processRequest(request); + } else { + delayed.add(request); + } + } + @Override + public void shutdown() { + next.shutdown(); + } + private void close() { + if (delayed == null) { + delayed = new ArrayDeque<>(); + } + } + private void open() throws RequestProcessorException { + if (delayed != null) { + for (Request request : delayed) { + next.processRequest(request); + } + delayed = null; + } + } + } + /** The number of log entries to log before starting a snapshot */ private static int snapCount = ZooKeeperServer.getSnapCount(); @@ -75,7 +125,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private final ZooKeeperServer zks; - private final RequestProcessor nextProcessor; + private final DelayingProcessor nextProcessor; /** * Transactions that have been written and are waiting to be flushed to @@ -88,7 +138,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener()); this.zks = zks; - this.nextProcessor = nextProcessor; + this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor); this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize()); } @@ -174,6 +224,21 @@ public void run() { break; } + if (si == turnForwardingDelayOn) { + nextProcessor.close(); + continue; + } + if (si == turnForwardingDelayOff) { + nextProcessor.open(); + continue; + } + + if (si instanceof FlushRequest) { + flush(); + ((FlushRequest) si).latch.countDown(); + continue; + } + long startProcessTime = Time.currentElapsedTime(); ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); @@ -224,6 +289,17 @@ public void run() { LOG.info("SyncRequestProcessor exited!"); } + /** Flushes all pending writes, and waits for this to complete. */ + public void syncFlush() throws InterruptedException { + FlushRequest marker = new FlushRequest(); + queuedRequests.add(marker); + marker.latch.await(); + } + + public void setDelayForwarding(boolean delayForwarding) { + queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff); + } + private void flush() throws IOException, RequestProcessorException { if (this.toFlush.isEmpty()) { return; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index b9faa21652d..e6b22f16731 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -760,13 +760,21 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { zk.startupWithoutServing(); if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(true); for (PacketInFlight p : packetsNotLogged) { fzk.logRequest(p.hdr, p.rec, p.digest); } packetsNotLogged.clear(); + fzk.syncProcessor.syncFlush(); } writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); + + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(false); + fzk.syncProcessor.syncFlush(); + } break; } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java index 96bf4b776d8..d65ead216f0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -32,7 +32,7 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable { private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class); - Learner learner; + final Learner learner; SendAckRequestProcessor(Learner peer) { this.learner = peer; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java index 9b9ea55f3f6..056b5b246e0 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java @@ -49,7 +49,7 @@ public class DIFFSyncConsistencyTest extends QuorumPeerTestBase { private MainThread[] mt = new MainThread[SERVER_COUNT]; @Test - @Timeout(value = 120) + @Timeout(value = 20) public void testInconsistentDueToUncommittedLog() throws Exception { final int LEADER_TIMEOUT_MS = 10_000; final int[] clientPorts = new int[SERVER_COUNT]; @@ -251,7 +251,7 @@ void readPacket(QuorumPacket pp) throws IOException { super.readPacket(pp); if (injectError && pp.getType() == Leader.UPTODATE) { String type = LearnerHandler.packetToString(pp); - throw new SocketTimeoutException("Socket timeout while reading the packet for operation " + throw new SocketTimeoutException("TEST CODE Socket timeout while reading the packet for operation " + type); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 0d5c0f63569..8c5c0138bd4 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -736,7 +736,7 @@ public void testFollowerWithPendingSyncsOnLeaderReElection() throws Exception { CountDownLatch followerSetUp = new CountDownLatch(1); class BlockingRequestProcessor implements RequestProcessor, Flushable { - final Phaser phaser = new Phaser(2); // SyncRequestProcessor and test thread + final Phaser phaser = new Phaser(1); // SyncRequestProcessor; test thread will register later. final SendAckRequestProcessor nextProcessor; // SendAckRequestProcessor @@ -896,20 +896,27 @@ protected void setupRequestProcessors() { qp.setZxid(0); oa.writeRecord(qp, null); - // Read the UPTODATE ack. + // Get the ACK of the new leader. readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + assertEquals(1, follower.self.getAcceptedEpoch()); + assertEquals(1, follower.self.getCurrentEpoch()); - // Get the ACK of the new leader. + // Read the PROPOSAL ack. + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(createZxid0, qp.getZxid()); + + // Read the UPTODATE ack. readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(1, follower.self.getAcceptedEpoch()); - assertEquals(1, follower.self.getCurrentEpoch()); // The follower now starts following the leader. // We send a PROPOSAL and a COMMIT, and wait for the transaction to be flushed by SyncRequestProcessor. + blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker; + blocker.phaser.register(); long createZxid1 = ZxidUtils.makeZxid(1, 3); qp.setType(Leader.PROPOSAL); qp.setZxid(createZxid1); @@ -928,7 +935,6 @@ protected void setupRequestProcessors() { // Wait for "fsync" to begin. assertTrue(followerSetUp.await(10, TimeUnit.SECONDS)); - blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker; blocker.phaser.arriveAndAwaitAdvance(); // Now we send another PROPOSAL and COMMIT, and wait for them to be applied to the data tree. @@ -1075,17 +1081,22 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) qp.setZxid(0); oa.writeRecord(qp, null); - // Read the uptodate ack + // Get the ack of the new leader readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + assertEquals(1, f.self.getAcceptedEpoch()); + assertEquals(1, f.self.getCurrentEpoch()); - // Get the ack of the new leader + // Read the create session ack + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(createSessionZxid, qp.getZxid()); + + // Read the uptodate ack readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(1, f.self.getAcceptedEpoch()); - assertEquals(1, f.self.getCurrentEpoch()); //Wait for the transactions to be written out. The thread that writes them out // does not send anything back when it is done.