Skip to content

Commit

Permalink
ZOOKEEPER-3911: Data inconsistency caused by DIFF sync uncommitted log
Browse files Browse the repository at this point in the history
Problem:

During DIFF sync, followers will persist the proposals from leader only after receiving the UPTODATE message. Leader will start serve client requests once receive a quorum of NEWLEADER ACK from followers, and NEWLEADER ACKs are sent from followers before receiving UPTODATE. So it's possible that when leader starting serving, followers might not persistent all of the DIFF sync proposals from leader. If at a certain point in time (e.g. once receiving UPTODATE from leader) all quorum servers crash, then these proposals, where leader they are quorum committed, will lost if the old leader can't come back and the followers form a new quorum. The end result is inconsistent view of the data from client before / after quorum recovers.

Solution:

Make sure follower to persist all DIFF proposals before they ACK NEWLEADER. When leader receives a quorum of NEWLEADER ACK, it's guaranteed that all DIFF sync proposals are quorum persistent and can survive after the recovery.

Author: Michael Han <hanm@apache.org>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Norbert Kalmar <nkalmar@apache.org>

Closes apache#1445 from hanm/ZOOKEEPER-3911
  • Loading branch information
hanm authored and RokLenarcic committed Aug 31, 2022
1 parent 987aeaf commit dff0aae
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,19 @@ public void startdata() throws IOException, InterruptedException {
}

public synchronized void startup() {
startupWithServerState(State.RUNNING);
}

public synchronized void startupWithoutServing() {
startupWithServerState(State.INITIAL);
}

public synchronized void startServing() {
setState(State.RUNNING);
notifyAll();
}

private void startupWithServerState(State state) {
if (sessionTracker == null) {
createSessionTracker();
}
Expand All @@ -681,11 +694,12 @@ public synchronized void startup() {

registerMetrics();

setState(State.RUNNING);
setState(state);

requestPathMetricsCollector.start();

localSessionEnabled = sessionTracker.isLocalSessionsEnabled();

notifyAll();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,18 +741,30 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
}

self.setCurrentEpoch(newEpoch);
writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
writeToTxnLog = true;
//Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;

// ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
packetsNotCommitted.clear();
}

writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startup();
zk.startServing();
/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.server.quorum;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Map;
import javax.security.sasl.SaslException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class DIFFSyncConsistencyTest extends QuorumPeerTestBase {

private static int SERVER_COUNT = 3;
private MainThread[] mt = new MainThread[SERVER_COUNT];

@Test
@Timeout(value = 120)
public void testInconsistentDueToUncommittedLog() throws Exception {
final int LEADER_TIMEOUT_MS = 10_000;
final int[] clientPorts = new int[SERVER_COUNT];

StringBuilder sb = new StringBuilder();
String server;
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();

for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
@Override
public TestQPMain getTestQPMain() {
return new MockTestQPMain();
}
};
mt[i].start();
}

for (int i = 0; i < SERVER_COUNT; i++) {
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
"waiting for server " + i + " being up");
}

int leader = findLeader(mt);
CountdownWatcher watch = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[leader], ClientBase.CONNECTION_TIMEOUT, watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);

Map<Long, Proposal> outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals;
// Increase the tick time to delay the leader going to looking to allow us proposal a transaction while other
// followers are offline.
int previousTick = mt[leader].main.quorumPeer.tickTime;
mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
// Let the previous tick on the leader exhaust itself so the new tick time takes effect
Thread.sleep(previousTick);

LOG.info("LEADER ELECTED {}", leader);

// Shutdown followers to make sure we don't accidentally send the proposal we are going to make to follower.
// In other words, we want to make sure the followers get the proposal later through DIFF sync.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
mt[i].shutdown();
}
}

// Send a create request to old leader and make sure it's synced to disk.
try {
zk.create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
fail("create /zk" + leader + " should have failed");
} catch (KeeperException e) {
}

// Make sure that we actually did get it in process at the leader; there can be extra sessionClose proposals.
assertTrue(outstanding.size() > 0);
Proposal p = findProposalOfType(outstanding, OpCode.create);
LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding);
assertNotNull(p, "Old leader doesn't have 'create' proposal");

// Make sure leader sync the proposal to disk.
int sleepTime = 0;
Long longLeader = (long) leader;
while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) {
if (sleepTime > 2000) {
fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset() + " expected " + leader);
}
Thread.sleep(100);
sleepTime += 100;
}

// Start controlled followers where we deliberately make the follower fail once follower receive the UPTODATE
// message from leader. Because followers only persist proposals from DIFF sync after UPTODATE, this can
// deterministically simulate the situation where followers ACK NEWLEADER (which makes leader think she has the
// quorum support, but actually not afterwards) but immediately fail afterwards without persisting the proposals
// from DIFF sync.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i == leader) {
continue;
}

mt[i].start();
int sleepCount = 0;
while (mt[i].getQuorumPeer() == null) {
++sleepCount;
if (sleepCount > 100) {
fail("Can't start follower " + i + " !");
}
Thread.sleep(100);
}

((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(true);
LOG.info("Follower {} started.", i);
}

// Verify leader can see it. The fact that leader can see it implies that
// leader should, at this point in time, get a quorum of ACK of NEWLEADER
// from two followers so leader can start serving requests; this also implies
// that DIFF sync from leader to followers are finished at this point in time.
// We then verify later that followers should have the same view after we shutdown
// this leader, otherwise it's a violation of ZAB / sequential consistency.
int c = 0;
while (c < 100) {
++c;
try {
Stat stat = zk.exists("/zk" + leader, false);
assertNotNull(stat, "server " + leader + " should have /zk");
break;
} catch (KeeperException.ConnectionLossException e) {

}
Thread.sleep(100);
}

// Shutdown all servers
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
}
waitForOne(zk, States.CONNECTING);

// Now restart all servers except the old leader. Only old leader has the transaction sync to disk.
// The old followers only had in memory view of the transaction, and they didn't have a chance
// to sync to disk because we made them fail at UPTODATE.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i == leader) {
continue;
}
mt[i].start();
int sleepCount = 0;
while (mt[i].getQuorumPeer() == null) {
++sleepCount;
if (sleepCount > 100) {
fail("Can't start follower " + i + " !");
}
Thread.sleep(100);
}

((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(false);
LOG.info("Follower {} started again.", i);
}

int newLeader = findLeader(mt);
assertNotEquals(newLeader, leader, "new leader is still the old leader " + leader + " !!");

// This simulates the case where clients connected to the old leader had a view of the data
// "/zkX", but clients connect to the new leader does not have the same view of data (missing "/zkX").
// This inconsistent view of the quorum exposed from leaders is a violation of ZAB.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != newLeader) {
continue;
}
zk.close();
zk = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
Stat val = zk.exists("/zk" + leader, false);
assertNotNull(val, "Data inconsistency detected! "
+ "Server " + i + " should have a view of /zk" + leader + "!");
}

zk.close();
}

@AfterEach
public void tearDown() {
for (int i = 0; i < mt.length; i++) {
try {
mt[i].shutdown();
} catch (InterruptedException e) {
LOG.warn("Quorum Peer interrupted while shutting it down", e);
}
}
}

static class CustomQuorumPeer extends QuorumPeer {

private volatile boolean injectError = false;

public CustomQuorumPeer() throws SaslException {

}

@Override
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {

@Override
void readPacket(QuorumPacket pp) throws IOException {
/**
* In real scenario got SocketTimeoutException while reading
* the packet from leader because of network problem, but
* here throwing SocketTimeoutException based on whether
* error is injected or not
*/
super.readPacket(pp);
if (injectError && pp.getType() == Leader.UPTODATE) {
String type = LearnerHandler.packetToString(pp);
throw new SocketTimeoutException("Socket timeout while reading the packet for operation "
+ type);
}
}

};
}

public void setInjectError(boolean injectError) {
this.injectError = injectError;
}

}

static class MockTestQPMain extends TestQPMain {

@Override
protected QuorumPeer getQuorumPeer() throws SaslException {
return new CustomQuorumPeer();
}

}

private Proposal findProposalOfType(Map<Long, Proposal> proposals, int type) {
for (Proposal proposal : proposals.values()) {
if (proposal.request.getHdr().getType() == type) {
return proposal;
}
}
return null;
}

private int findLeader(MainThread[] mt) {
for (int i = 0; i < mt.length; i++) {
if (mt[i].main.quorumPeer.leader != null) {
return i;
}
}
return -1;
}
}

0 comments on commit dff0aae

Please sign in to comment.