Skip to content

Commit

Permalink
ZOOKEEPER-2172: Cluster crashes when reconfig a new node as a partici…
Browse files Browse the repository at this point in the history
…pant (Arshad Mohammad via phunt)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1759907 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
phunt committed Sep 8, 2016
1 parent 10dc80a commit c38787f
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,9 @@ BUGFIXES:
"config -c" when client port is mentioned as separate and not like
new style (Rakesh Kumar Singh via phunt)

ZOOKEEPER-2172: Cluster crashes when reconfig a new node as a participant
(Arshad Mohammad via phunt)

IMPROVEMENTS:
ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir Lev-Ari via shralex)

Expand Down
18 changes: 9 additions & 9 deletions src/java/main/org/apache/zookeeper/server/quorum/Learner.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,19 +431,19 @@ else if (qp.getType() == Leader.SNAP) {
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));
boolean majorChange = self.processReconfig(qv, ByteBuffer.wrap(qp.getData()).getLong(),
qp.getZxid(), true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
if (!snapshotTaken) {
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
} else {
if (qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)pif.rec).getData()));
boolean majorChange =
self.processReconfig(qv, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
private static int SERVER_COUNT = 3;
private MainThread[] mt;

/**
* <pre>
* Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2172.
* Cluster crashes when reconfig a new node as a participant.
* </pre>
*
* This issue occurs when reconfig's PROPOSAL and COMMITANDACTIVATE come in
* between the snapshot and the UPTODATE. In this case processReconfig was
* not invoked on the newly added node, and zoo.cfg.dynamic.next wasn't
* deleted.
*/

@Test
public void testDuringLeaderSync() throws Exception {
final int clientPorts[] = new int[SERVER_COUNT + 1];
StringBuilder sb = new StringBuilder();
String[] serverConfig = new String[SERVER_COUNT + 1];

for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
serverConfig[i] = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(serverConfig[i] + "\n");
}
String currentQuorumCfgSection = sb.toString();
mt = new MainThread[SERVER_COUNT + 1];

// start 3 servers
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false);
mt[i].start();
}

// ensure all servers started
for (int i = 0; i < SERVER_COUNT; i++) {
Assert.assertTrue("waiting for server " + i + " being up",
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT));
}
CountdownWatcher watch = new CountdownWatcher();
ZooKeeper preReconfigClient = new ZooKeeper("127.0.0.1:" + clientPorts[0], ClientBase.CONNECTION_TIMEOUT,
watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);

// new server joining
int joinerId = SERVER_COUNT;
clientPorts[joinerId] = PortAssignment.unique();
serverConfig[joinerId] = "server." + joinerId + "=127.0.0.1:" + PortAssignment.unique() + ":"
+ PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[joinerId];

// Find leader id.
int leaderId = -1;
for (int i = 0; i < SERVER_COUNT; i++) {
if (mt[i].main.quorumPeer.leader != null) {
leaderId = i;
break;
}
}
assertFalse(leaderId == -1);

// Joiner initial config consists of itself and the leader.
sb = new StringBuilder();
sb.append(serverConfig[leaderId] + "\n").append(serverConfig[joinerId] + "\n");

/**
* This server will delay the response to a NEWLEADER message, and run
* reconfig command so that message at this processed in bellow order
*
* <pre>
* NEWLEADER
* reconfig's PROPOSAL
* reconfig's COMMITANDACTIVATE
* UPTODATE
* </pre>
*/
mt[joinerId] = new MainThread(joinerId, clientPorts[joinerId], sb.toString(), false) {
@Override
public TestQPMain getTestQPMain() {
return new MockTestQPMain();
}
};
mt[joinerId].start();
CustomQuorumPeer qp = getCustomQuorumPeer(mt[joinerId]);

// delete any already existing .next file
String nextDynamicConfigFilename = qp.getNextDynamicConfigFilename();
File nextDynaFile = new File(nextDynamicConfigFilename);
nextDynaFile.delete();

// call reconfig API when the new server has received
// Leader.NEWLEADER
while (true) {
if (qp.isNewLeaderMessage()) {
preReconfigClient.reconfig(serverConfig[joinerId], null, null, -1, null, null);
break;
} else {
// sleep for 10 millisecond and then again check
Thread.sleep(10);
}
}
watch = new CountdownWatcher();
ZooKeeper postReconfigClient = new ZooKeeper("127.0.0.1:" + clientPorts[joinerId],
ClientBase.CONNECTION_TIMEOUT, watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
// do one successful operation on the newly added node
postReconfigClient.create("/reconfigIssue", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertFalse("zoo.cfg.dynamic.next is not deleted.", nextDynaFile.exists());

// verify that joiner has up-to-date config, including all four servers.
for (long j = 0; j <= SERVER_COUNT; j++) {
assertNotNull("server " + j + " is not present in the new quorum",
qp.getQuorumVerifier().getVotingMembers().get(j));
}

// close clients
preReconfigClient.close();
postReconfigClient.close();
}

private static CustomQuorumPeer getCustomQuorumPeer(MainThread mt) {
while (true) {
QuorumPeer quorumPeer = mt.getQuorumPeer();
if (null != quorumPeer) {
return (CustomQuorumPeer) quorumPeer;
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

@After
public void tearDown() {
// stop all severs
if (null != mt) {
for (int i = 0; i < mt.length; i++) {
try {
mt[i].shutdown();
} catch (InterruptedException e) {
LOG.warn("Quorum Peer interrupted while shutting it down", e);
}
}
}
}

private static class CustomQuorumPeer extends QuorumPeer {
private boolean newLeaderMessage = false;

public CustomQuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort,
int electionAlg, long myid, int tickTime, int initLimit, int syncLimit) throws IOException {
super(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, false,
ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1), new QuorumMaj(quorumPeers));
}

/**
* If true, after 100 millisecond NEWLEADER response is send to leader
*
* @return
*/
public boolean isNewLeaderMessage() {
return newLeaderMessage;
}

@Override
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {

return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {

@Override
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
if (pp != null && pp.getType() == Leader.ACK) {
newLeaderMessage = true;
try {
/**
* Delaying the ACK message, a follower sends as
* response to a NEWLEADER message, so that the
* leader has a chance to send the reconfig and only
* then the UPTODATE message.
*/
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
super.writePacket(pp, flush);
}
};
}
}

private static class MockTestQPMain extends TestQPMain {
@Override
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
quorumPeer = new CustomQuorumPeer(config.getQuorumVerifier().getAllMembers(), config.getDataDir(),
config.getDataLogDir(), config.getClientPortAddress().getPort(), config.getElectionAlg(),
config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit());
quorumPeer.start();
try {
quorumPeer.join();
} catch (InterruptedException e) {
LOG.warn("Quorum Peer interrupted", e);
}
}
}
}

0 comments on commit c38787f

Please sign in to comment.