diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 749b558329a..ff8048c2477 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -706,19 +706,6 @@ public void startdata() throws IOException, InterruptedException { } public synchronized void startup() { - startupWithServerState(State.RUNNING); - } - - public synchronized void startupWithoutServing() { - startupWithServerState(State.INITIAL); - } - - public synchronized void startServing() { - setState(State.RUNNING); - notifyAll(); - } - - private void startupWithServerState(State state) { if (sessionTracker == null) { createSessionTracker(); } @@ -733,7 +720,7 @@ private void startupWithServerState(State state) { registerMetrics(); - setState(state); + setState(State.RUNNING); requestPathMetricsCollector.start(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index b9faa21652d..b146d333878 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -557,6 +557,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { Deque packetsCommitted = new ArrayDeque<>(); Deque packetsNotLogged = new ArrayDeque<>(); Deque packetsNotCommitted = new ArrayDeque<>(); + long lastPreemptivelyLoggedZxid = -1; synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); @@ -755,15 +756,12 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { isPreZAB1_0 = false; // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). - sock.setSoTimeout(self.tickTime * self.syncLimit); - self.setSyncMode(QuorumPeer.SyncMode.NONE); - zk.startupWithoutServing(); if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotLogged) { - fzk.logRequest(p.hdr, p.rec, p.digest); + zk.getZKDatabase().append(new Request(-1, -1, p.hdr.getType(), p.hdr, p.rec, p.hdr.getZxid())); + lastPreemptivelyLoggedZxid = p.hdr.getZxid(); } - packetsNotLogged.clear(); + zk.getZKDatabase().commit(); // Really ensure transactions are written to persistent storage before ACK'ing. } writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); @@ -773,7 +771,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); - zk.startServing(); + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); + zk.startup(); + /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and @@ -787,7 +788,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { if (zk instanceof FollowerZooKeeperServer) { FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; for (PacketInFlight p : packetsNotLogged) { - fzk.logRequest(p.hdr, p.rec, p.digest); + if (p.hdr.getZxid() <= lastPreemptivelyLoggedZxid) { + fzk.getZKDatabase().processTxn(p.hdr, p.rec, p.digest); + } else { + fzk.logRequest(p.hdr, p.rec, p.digest); + } } for (Long zxid : packetsCommitted) { fzk.commit(zxid);