diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java index 352eb81da90..d792948bead 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java @@ -47,4 +47,10 @@ public TxnHeader getHeader() { public TxnDigest getDigest() { return digest; } + + public Request toRequest() { + Request request = new Request(header.getClientId(), header.getCxid(), header.getType(), header, txn, header.getZxid()); + request.setTxnDigest(digest); + return request; + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index 8d371ae5790..ae1bc303f37 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -19,6 +19,7 @@ package org.apache.zookeeper.server.quorum; import java.io.IOException; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import javax.management.JMException; @@ -31,6 +32,7 @@ import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.TxnLogEntry; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.txn.TxnDigest; @@ -88,6 +90,20 @@ public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) { syncProcessor.processRequest(request); } + public void syncAndCommitInitialLogEntries(List logEntries) throws IOException { + State state = this.state; + if (state != State.INITIAL) { + String msg = String.format("illegal state %s to sync initial log entries", state); + throw new IllegalStateException(msg); + } + for (TxnLogEntry logEntry : logEntries) { + Request request = logEntry.toRequest(); + getZKDatabase().append(request); + processTxn(logEntry.getHeader(), logEntry.getTxn()); + } + getZKDatabase().commit(); + } + /** * When a COMMIT message is received, eventually this method is called, * which matches up the zxid from the COMMIT with (hopefully) the head of diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index ce8f7999c45..fc27f030e0a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -361,7 +361,7 @@ Optional createServerSocket(InetSocketAddress address, boolean por /** * This message type is sent by the leader to indicate that the follower is - * now uptodate andt can start responding to clients. + * now uptodate and can start responding to clients. */ static final int UPTODATE = 12; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 594c87fb90d..b893f42f7b9 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -30,7 +30,9 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Deque; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -82,6 +84,9 @@ static class PacketInFlight { Record rec; TxnDigest digest; + TxnLogEntry toLogEntry() { + return new TxnLogEntry(rec, hdr, digest); + } } QuorumPeer self; @@ -758,12 +763,28 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { sock.setSoTimeout(self.tickTime * self.syncLimit); self.setSyncMode(QuorumPeer.SyncMode.NONE); zk.startupWithoutServing(); - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { - fzk.logRequest(p.hdr, p.rec, p.digest); + if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) { + List entries = new ArrayList<>(packetsCommitted.size()); + // Pop log entries from packetsNotCommitted according to packetsCommitted. + // In case of mismatch, log warning and keep packetsNotCommitted untouched. + while (!packetsCommitted.isEmpty()) { + long zxid = packetsCommitted.removeFirst(); + pif = packetsNotCommitted.peekFirst(); + if (pif == null) { + LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid)); + continue; + } else if (pif.hdr.getZxid() != zxid) { + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(zxid), + Long.toHexString(pif.hdr.getZxid())); + continue; + } + packetsNotCommitted.removeFirst(); + entries.add(pif.toLogEntry()); } - packetsNotCommitted.clear(); + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncAndCommitInitialLogEntries(entries); } writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 3bdbcd908dc..12aea6061b6 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -743,8 +743,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) readPacketSkippingPing(ia, qp); assertEquals(Leader.ACKEPOCH, qp.getType()); - assertEquals(0, qp.getZxid()); - assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt()); + assertEquals(ZxidUtils.makeZxid(0, 0), qp.getZxid()); + assertEquals(0, ByteBuffer.wrap(qp.getData()).getInt()); assertEquals(1, f.self.getAcceptedEpoch()); assertEquals(0, f.self.getCurrentEpoch()); @@ -779,24 +779,11 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) assertEquals(1, f.self.getAcceptedEpoch()); assertEquals(1, f.self.getCurrentEpoch()); - //Wait for the transactions to be written out. The thread that writes them out - // does not send anything back when it is done. - long start = System.currentTimeMillis(); - while (createSessionZxid != f.fzk.getLastProcessedZxid() - && (System.currentTimeMillis() - start) < 50) { - Thread.sleep(1); - } - assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir)); - start = System.currentTimeMillis(); zkDb2.loadDataBase(); - while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) { - Thread.sleep(1); - zkDb2.loadDataBase(); - } LOG.info("zkdb2 sessions:{}", zkDb2.getSessions()); LOG.info("zkdb2 with timeouts:{}", zkDb2.getSessionWithTimeOuts()); assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));