Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-3911: Data inconsistency caused by DIFF sync uncommitted log #1445

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,19 @@ public void startdata() throws IOException, InterruptedException {
}

public synchronized void startup() {
startupWithServerState(State.RUNNING);
}

public synchronized void startupWithoutServing() {
startupWithServerState(State.INITIAL);
}

public synchronized void startServing() {
setState(State.RUNNING);
hanm marked this conversation as resolved.
Show resolved Hide resolved
notifyAll();
}

private void startupWithServerState(State state) {
if (sessionTracker == null) {
createSessionTracker();
}
Expand All @@ -676,11 +689,12 @@ public synchronized void startup() {

registerMetrics();

setState(State.RUNNING);
setState(state);

requestPathMetricsCollector.start();

localSessionEnabled = sessionTracker.isLocalSessionsEnabled();

notifyAll();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,18 +741,30 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
}

self.setCurrentEpoch(newEpoch);
writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
writeToTxnLog = true;
//Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;

// ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
packetsNotCommitted.clear();
}

writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startup();
zk.startServing();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question:

this "startServing" works if you executed 'zk.startupWithoutServing();' in your new code.
but in other code paths we are not calling "startupWithoutServing"
so when we come to this point zk.startServing() will only change the state to RUNNING, and we had not called 'startupWithServerState'

can you please explain ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but in other code paths we are not calling "startupWithoutServing"

The invariant here is that a call to zk.startServing will always follow a call to zk.startupWithoutServing. This is because the ZAB protocol implementation requires this order of invocation (in syncWithLeader). In other words, there is no other code paths that involve both calls. To finish the sync, the leader and follower must follow these events in order.

  • First, learner will receive NEWLEADER from leader. This is where we will call zk.startupWithoutServing.
  • Then, learner will receive UPTODATE from leader. This is where we breaks the outer sync loop, afterwards we call zk.startServing.

Hope this make sense. A more detailed description can be found on phase 2 "Sync with followers" on https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zab1.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other code path the server state is already in running, so this has no effect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now thinking about this... it's a bit surprising we never needed this control before, to have a running server "active" or not.

/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.server.quorum;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Map;
import javax.security.sasl.SaslException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class DIFFSyncConsistencyTest extends QuorumPeerTestBase {

private static int SERVER_COUNT = 3;
private MainThread[] mt = new MainThread[SERVER_COUNT];

@Test
@Timeout(value = 120)
public void testInconsistentDueToUncommittedLog() throws Exception {
final int LEADER_TIMEOUT_MS = 10_000;
final int[] clientPorts = new int[SERVER_COUNT];

StringBuilder sb = new StringBuilder();
String server;
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();

for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
@Override
public TestQPMain getTestQPMain() {
return new MockTestQPMain();
}
};
mt[i].start();
}

for (int i = 0; i < SERVER_COUNT; i++) {
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
"waiting for server " + i + " being up");
}

int leader = findLeader(mt);
CountdownWatcher watch = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[leader], ClientBase.CONNECTION_TIMEOUT, watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);

Map<Long, Proposal> outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals;
// Increase the tick time to delay the leader going to looking to allow us proposal a transaction while other
// followers are offline.
int previousTick = mt[leader].main.quorumPeer.tickTime;
mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
// Let the previous tick on the leader exhaust itself so the new tick time takes effect
Thread.sleep(previousTick);

LOG.info("LEADER ELECTED {}", leader);

// Shutdown followers to make sure we don't accidentally send the proposal we are going to make to follower.
// In other words, we want to make sure the followers get the proposal later through DIFF sync.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
mt[i].shutdown();
}
}

// Send a create request to old leader and make sure it's synced to disk.
try {
zk.create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
fail("create /zk" + leader + " should have failed");
} catch (KeeperException e) {
}

// Make sure that we actually did get it in process at the leader; there can be extra sessionClose proposals.
assertTrue(outstanding.size() > 0);
Proposal p = findProposalOfType(outstanding, OpCode.create);
LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding);
assertNotNull(p, "Old leader doesn't have 'create' proposal");

// Make sure leader sync the proposal to disk.
int sleepTime = 0;
Long longLeader = (long) leader;
while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) {
if (sleepTime > 2000) {
fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset() + " expected " + leader);
}
Thread.sleep(100);
sleepTime += 100;
}

// Start controlled followers where we deliberately make the follower fail once follower receive the UPTODATE
// message from leader. Because followers only persist proposals from DIFF sync after UPTODATE, this can
// deterministically simulate the situation where followers ACK NEWLEADER (which makes leader think she has the
// quorum support, but actually not afterwards) but immediately fail afterwards without persisting the proposals
// from DIFF sync.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i == leader) {
continue;
}

mt[i].start();
int sleepCount = 0;
while (mt[i].getQuorumPeer() == null) {
++sleepCount;
if (sleepCount > 100) {
fail("Can't start follower " + i + " !");
}
Thread.sleep(100);
}

((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(true);
LOG.info("Follower {} started.", i);
}

// Verify leader can see it. The fact that leader can see it implies that
// leader should, at this point in time, get a quorum of ACK of NEWLEADER
// from two followers so leader can start serving requests; this also implies
// that DIFF sync from leader to followers are finished at this point in time.
// We then verify later that followers should have the same view after we shutdown
// this leader, otherwise it's a violation of ZAB / sequential consistency.
int c = 0;
while (c < 100) {
++c;
try {
Stat stat = zk.exists("/zk" + leader, false);
assertNotNull(stat, "server " + leader + " should have /zk");
break;
} catch (KeeperException.ConnectionLossException e) {

}
Thread.sleep(100);
}

// Shutdown all servers
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
}
waitForOne(zk, States.CONNECTING);

// Now restart all servers except the old leader. Only old leader has the transaction sync to disk.
// The old followers only had in memory view of the transaction, and they didn't have a chance
// to sync to disk because we made them fail at UPTODATE.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i == leader) {
continue;
}
mt[i].start();
int sleepCount = 0;
while (mt[i].getQuorumPeer() == null) {
++sleepCount;
if (sleepCount > 100) {
fail("Can't start follower " + i + " !");
}
Thread.sleep(100);
}

((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(false);
LOG.info("Follower {} started again.", i);
}

int newLeader = findLeader(mt);
assertNotEquals(newLeader, leader, "new leader is still the old leader " + leader + " !!");

// This simulates the case where clients connected to the old leader had a view of the data
// "/zkX", but clients connect to the new leader does not have the same view of data (missing "/zkX").
// This inconsistent view of the quorum exposed from leaders is a violation of ZAB.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != newLeader) {
continue;
}
zk.close();
zk = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
Stat val = zk.exists("/zk" + leader, false);
assertNotNull(val, "Data inconsistency detected! "
+ "Server " + i + " should have a view of /zk" + leader + "!");
}

zk.close();
}

@AfterEach
public void tearDown() {
for (int i = 0; i < mt.length; i++) {
try {
mt[i].shutdown();
} catch (InterruptedException e) {
LOG.warn("Quorum Peer interrupted while shutting it down", e);
}
}
}

static class CustomQuorumPeer extends QuorumPeer {

private volatile boolean injectError = false;

public CustomQuorumPeer() throws SaslException {

}

@Override
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {

@Override
void readPacket(QuorumPacket pp) throws IOException {
/**
* In real scenario got SocketTimeoutException while reading
* the packet from leader because of network problem, but
* here throwing SocketTimeoutException based on whether
* error is injected or not
*/
super.readPacket(pp);
if (injectError && pp.getType() == Leader.UPTODATE) {
String type = LearnerHandler.packetToString(pp);
throw new SocketTimeoutException("Socket timeout while reading the packet for operation "
+ type);
}
}

};
}

public void setInjectError(boolean injectError) {
this.injectError = injectError;
}

}

static class MockTestQPMain extends TestQPMain {

@Override
protected QuorumPeer getQuorumPeer() throws SaslException {
return new CustomQuorumPeer();
}

}

private Proposal findProposalOfType(Map<Long, Proposal> proposals, int type) {
for (Proposal proposal : proposals.values()) {
if (proposal.request.getHdr().getType() == type) {
return proposal;
}
}
return null;
}

private int findLeader(MainThread[] mt) {
for (int i = 0; i < mt.length; i++) {
if (mt[i].main.quorumPeer.leader != null) {
return i;
}
}
return -1;
}
}