diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 8492dd56de7..f7f99f71571 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -667,6 +667,19 @@ public void startdata() throws IOException, InterruptedException { } public synchronized void startup() { + startupWithServerState(State.RUNNING); + } + + public synchronized void startupWithoutServing() { + startupWithServerState(State.INITIAL); + } + + public synchronized void startServing() { + setState(State.RUNNING); + notifyAll(); + } + + private void startupWithServerState(State state) { if (sessionTracker == null) { createSessionTracker(); } @@ -681,11 +694,12 @@ public synchronized void startup() { registerMetrics(); - setState(State.RUNNING); + setState(state); requestPathMetricsCollector.start(); localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); + notifyAll(); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 9ab62069a85..1b6c25043a2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -741,8 +741,22 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } self.setCurrentEpoch(newEpoch); - writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory + writeToTxnLog = true; + //Anything after this needs to go to the transaction log, not applied directly in memory isPreZAB1_0 = false; + + // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); + zk.startupWithoutServing(); + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + for (PacketInFlight p : packetsNotCommitted) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + packetsNotCommitted.clear(); + } + writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } @@ -750,9 +764,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); - sock.setSoTimeout(self.tickTime * self.syncLimit); - self.setSyncMode(QuorumPeer.SyncMode.NONE); - zk.startup(); + zk.startServing(); /* * Update the election vote here to ensure that all members of the * ensemble report the same vote to new servers that start up and diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java new file mode 100644 index 00000000000..9b9ea55f3f6 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncConsistencyTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.Map; +import javax.security.sasl.SaslException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.Leader.Proposal; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +public class DIFFSyncConsistencyTest extends QuorumPeerTestBase { + + private static int SERVER_COUNT = 3; + private MainThread[] mt = new MainThread[SERVER_COUNT]; + + @Test + @Timeout(value = 120) + public void testInconsistentDueToUncommittedLog() throws Exception { + final int LEADER_TIMEOUT_MS = 10_000; + final int[] clientPorts = new int[SERVER_COUNT]; + + StringBuilder sb = new StringBuilder(); + String server; + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + + ":participant;127.0.0.1:" + clientPorts[i]; + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) { + @Override + public TestQPMain getTestQPMain() { + return new MockTestQPMain(); + } + }; + mt[i].start(); + } + + for (int i = 0; i < SERVER_COUNT; i++) { + assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT), + "waiting for server " + i + " being up"); + } + + int leader = findLeader(mt); + CountdownWatcher watch = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[leader], ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + Map outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals; + // Increase the tick time to delay the leader going to looking to allow us proposal a transaction while other + // followers are offline. + int previousTick = mt[leader].main.quorumPeer.tickTime; + mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS; + // Let the previous tick on the leader exhaust itself so the new tick time takes effect + Thread.sleep(previousTick); + + LOG.info("LEADER ELECTED {}", leader); + + // Shutdown followers to make sure we don't accidentally send the proposal we are going to make to follower. + // In other words, we want to make sure the followers get the proposal later through DIFF sync. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + mt[i].shutdown(); + } + } + + // Send a create request to old leader and make sure it's synced to disk. + try { + zk.create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + fail("create /zk" + leader + " should have failed"); + } catch (KeeperException e) { + } + + // Make sure that we actually did get it in process at the leader; there can be extra sessionClose proposals. + assertTrue(outstanding.size() > 0); + Proposal p = findProposalOfType(outstanding, OpCode.create); + LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding); + assertNotNull(p, "Old leader doesn't have 'create' proposal"); + + // Make sure leader sync the proposal to disk. + int sleepTime = 0; + Long longLeader = (long) leader; + while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) { + if (sleepTime > 2000) { + fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset() + " expected " + leader); + } + Thread.sleep(100); + sleepTime += 100; + } + + // Start controlled followers where we deliberately make the follower fail once follower receive the UPTODATE + // message from leader. Because followers only persist proposals from DIFF sync after UPTODATE, this can + // deterministically simulate the situation where followers ACK NEWLEADER (which makes leader think she has the + // quorum support, but actually not afterwards) but immediately fail afterwards without persisting the proposals + // from DIFF sync. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i == leader) { + continue; + } + + mt[i].start(); + int sleepCount = 0; + while (mt[i].getQuorumPeer() == null) { + ++sleepCount; + if (sleepCount > 100) { + fail("Can't start follower " + i + " !"); + } + Thread.sleep(100); + } + + ((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(true); + LOG.info("Follower {} started.", i); + } + + // Verify leader can see it. The fact that leader can see it implies that + // leader should, at this point in time, get a quorum of ACK of NEWLEADER + // from two followers so leader can start serving requests; this also implies + // that DIFF sync from leader to followers are finished at this point in time. + // We then verify later that followers should have the same view after we shutdown + // this leader, otherwise it's a violation of ZAB / sequential consistency. + int c = 0; + while (c < 100) { + ++c; + try { + Stat stat = zk.exists("/zk" + leader, false); + assertNotNull(stat, "server " + leader + " should have /zk"); + break; + } catch (KeeperException.ConnectionLossException e) { + + } + Thread.sleep(100); + } + + // Shutdown all servers + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + waitForOne(zk, States.CONNECTING); + + // Now restart all servers except the old leader. Only old leader has the transaction sync to disk. + // The old followers only had in memory view of the transaction, and they didn't have a chance + // to sync to disk because we made them fail at UPTODATE. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i == leader) { + continue; + } + mt[i].start(); + int sleepCount = 0; + while (mt[i].getQuorumPeer() == null) { + ++sleepCount; + if (sleepCount > 100) { + fail("Can't start follower " + i + " !"); + } + Thread.sleep(100); + } + + ((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(false); + LOG.info("Follower {} started again.", i); + } + + int newLeader = findLeader(mt); + assertNotEquals(newLeader, leader, "new leader is still the old leader " + leader + " !!"); + + // This simulates the case where clients connected to the old leader had a view of the data + // "/zkX", but clients connect to the new leader does not have the same view of data (missing "/zkX"). + // This inconsistent view of the quorum exposed from leaders is a violation of ZAB. + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != newLeader) { + continue; + } + zk.close(); + zk = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, watch); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + Stat val = zk.exists("/zk" + leader, false); + assertNotNull(val, "Data inconsistency detected! " + + "Server " + i + " should have a view of /zk" + leader + "!"); + } + + zk.close(); + } + + @AfterEach + public void tearDown() { + for (int i = 0; i < mt.length; i++) { + try { + mt[i].shutdown(); + } catch (InterruptedException e) { + LOG.warn("Quorum Peer interrupted while shutting it down", e); + } + } + } + + static class CustomQuorumPeer extends QuorumPeer { + + private volatile boolean injectError = false; + + public CustomQuorumPeer() throws SaslException { + + } + + @Override + protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { + return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) { + + @Override + void readPacket(QuorumPacket pp) throws IOException { + /** + * In real scenario got SocketTimeoutException while reading + * the packet from leader because of network problem, but + * here throwing SocketTimeoutException based on whether + * error is injected or not + */ + super.readPacket(pp); + if (injectError && pp.getType() == Leader.UPTODATE) { + String type = LearnerHandler.packetToString(pp); + throw new SocketTimeoutException("Socket timeout while reading the packet for operation " + + type); + } + } + + }; + } + + public void setInjectError(boolean injectError) { + this.injectError = injectError; + } + + } + + static class MockTestQPMain extends TestQPMain { + + @Override + protected QuorumPeer getQuorumPeer() throws SaslException { + return new CustomQuorumPeer(); + } + + } + + private Proposal findProposalOfType(Map proposals, int type) { + for (Proposal proposal : proposals.values()) { + if (proposal.request.getHdr().getType() == type) { + return proposal; + } + } + return null; + } + + private int findLeader(MainThread[] mt) { + for (int i = 0; i < mt.length; i++) { + if (mt[i].main.quorumPeer.leader != null) { + return i; + } + } + return -1; + } +}