diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java index 4df319f86a4..3bc908dae2f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -55,6 +56,56 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private static final Request REQUEST_OF_DEATH = Request.requestOfDeath; + private static class FlushRequest extends Request { + private final CountDownLatch latch = new CountDownLatch(1); + public FlushRequest() { + super(null, 0, 0, 0, null, null); + } + } + + private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null); + private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null); + + private static class DelayingProcessor implements RequestProcessor, Flushable { + private final RequestProcessor next; + private Queue delayed = null; + private DelayingProcessor(RequestProcessor next) { + this.next = next; + } + @Override + public void flush() throws IOException { + if (delayed == null && next instanceof Flushable) { + ((Flushable) next).flush(); + } + } + @Override + public void processRequest(Request request) throws RequestProcessorException { + if (delayed == null) { + next.processRequest(request); + } else { + delayed.add(request); + } + } + @Override + public void shutdown() { + next.shutdown(); + } + private synchronized void close() { + if (delayed != null) { + delayed = new ArrayDeque<>(); + } + } + private synchronized void open() throws RequestProcessorException { + if (delayed == null) { + return; + } + for (Request request : delayed) { + next.processRequest(request); + } + delayed = null; + } + } + /** The number of log entries to log before starting a snapshot */ private static int snapCount = ZooKeeperServer.getSnapCount(); @@ -75,7 +126,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private final ZooKeeperServer zks; - private final RequestProcessor nextProcessor; + private final DelayingProcessor nextProcessor; /** * Transactions that have been written and are waiting to be flushed to @@ -88,7 +139,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener()); this.zks = zks; - this.nextProcessor = nextProcessor; + this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor); this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize()); } @@ -174,6 +225,21 @@ public void run() { break; } + if (si == turnForwardingDelayOn) { + nextProcessor.close(); + continue; + } + if (si == turnForwardingDelayOff) { + nextProcessor.open(); + continue; + } + + if (si instanceof FlushRequest) { + flush(); + ((FlushRequest) si).latch.countDown(); + continue; + } + long startProcessTime = Time.currentElapsedTime(); ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); @@ -224,6 +290,17 @@ public void run() { LOG.info("SyncRequestProcessor exited!"); } + /** Flushes all pending writes, and waits for this to complete. */ + public void syncFlush() throws InterruptedException { + FlushRequest marker = new FlushRequest(); + queuedRequests.add(marker); + marker.latch.await(); + } + + public void setDelayForwarding(boolean delayForwarding) { + queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff); + } + private void flush() throws IOException, RequestProcessorException { if (this.toFlush.isEmpty()) { return; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index 8d371ae5790..a5ad5dff6ed 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -66,6 +66,18 @@ public Follower getFollower() { return self.follower; } + public synchronized void startupWithoutServing() { + super.startupWithoutServing(); + syncProcessor.setDelayForwarding(true); + } + + public synchronized void startServing() { + syncProcessor.setDelayForwarding(false); + setState(State.RUNNING); + notifyAll(); + } + + @Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java index 96bf4b776d8..9408fd2da3e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -21,6 +21,9 @@ import java.io.Flushable; import java.io.IOException; import java.net.Socket; +import java.util.ArrayDeque; +import java.util.Queue; + import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; @@ -32,7 +35,8 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable { private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class); - Learner learner; + final Queue toAck = new ArrayDeque<>(); + final Learner learner; SendAckRequestProcessor(Learner peer) { this.learner = peer; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java index 9b9ea55f3f6..a17c49205da 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java @@ -251,7 +251,7 @@ void readPacket(QuorumPacket pp) throws IOException { super.readPacket(pp); if (injectError && pp.getType() == Leader.UPTODATE) { String type = LearnerHandler.packetToString(pp); - throw new SocketTimeoutException("Socket timeout while reading the packet for operation " + throw new SocketTimeoutException("TEST CODE Socket timeout while reading the packet for operation " + type); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 0d5c0f63569..241daaf09df 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -736,7 +736,7 @@ public void testFollowerWithPendingSyncsOnLeaderReElection() throws Exception { CountDownLatch followerSetUp = new CountDownLatch(1); class BlockingRequestProcessor implements RequestProcessor, Flushable { - final Phaser phaser = new Phaser(2); // SyncRequestProcessor and test thread + final Phaser phaser = new Phaser(1); // SyncRequestProcessor; test thread will register later. final SendAckRequestProcessor nextProcessor; // SendAckRequestProcessor @@ -883,7 +883,7 @@ protected void setupRequestProcessors() { // Required for the ZK server to start up. qp.setType(Leader.NEWLEADER); - qp.setZxid(ZxidUtils.makeZxid(1, 0)); + qp.setZxid(0); qp.setData(null); oa.writeRecord(qp, null); @@ -896,20 +896,27 @@ protected void setupRequestProcessors() { qp.setZxid(0); oa.writeRecord(qp, null); - // Read the UPTODATE ack. + // Get the ACK of the new leader. readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + assertEquals(1, follower.self.getAcceptedEpoch()); + assertEquals(1, follower.self.getCurrentEpoch()); - // Get the ACK of the new leader. + // Read the UPTODATE ack. readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(1, follower.self.getAcceptedEpoch()); - assertEquals(1, follower.self.getCurrentEpoch()); + + // Read the PROPOSAL ack. + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(createZxid0, qp.getZxid()); // The follower now starts following the leader. // We send a PROPOSAL and a COMMIT, and wait for the transaction to be flushed by SyncRequestProcessor. + blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker; + blocker.phaser.register(); long createZxid1 = ZxidUtils.makeZxid(1, 3); qp.setType(Leader.PROPOSAL); qp.setZxid(createZxid1); @@ -928,7 +935,6 @@ protected void setupRequestProcessors() { // Wait for "fsync" to begin. assertTrue(followerSetUp.await(10, TimeUnit.SECONDS)); - blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker; blocker.phaser.arriveAndAwaitAdvance(); // Now we send another PROPOSAL and COMMIT, and wait for them to be applied to the data tree. @@ -1075,17 +1081,22 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) qp.setZxid(0); oa.writeRecord(qp, null); - // Read the uptodate ack + // Get the ack of the new leader readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); + assertEquals(1, f.self.getAcceptedEpoch()); + assertEquals(1, f.self.getCurrentEpoch()); - // Get the ack of the new leader + // Read the uptodate ack readPacketSkippingPing(ia, qp); assertEquals(Leader.ACK, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(1, f.self.getAcceptedEpoch()); - assertEquals(1, f.self.getCurrentEpoch()); + + // Read the create session ack + readPacketSkippingPing(ia, qp); + assertEquals(Leader.ACK, qp.getType()); + assertEquals(createSessionZxid, qp.getZxid()); //Wait for the transactions to be written out. The thread that writes them out // does not send anything back when it is done.