Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-4541 Ephemeral znode owned by closed session visible in 1 of 3 servers #1925

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.Flushable;
import java.io.IOException;
import java.net.Socket;

import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
Expand Down Expand Up @@ -64,7 +66,8 @@ public void flush() throws IOException {
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
if (!learner.sock.isClosed()) {
Socket socket = learner.sock;
if ( socket != null && ! learner.sock.isClosed()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably use socket in the second condition too? In case it changes after the first check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that was of course the intention :) Fixed!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi jonmv, i have read the jira-4541. I am confusing. ZK1 does not send ack to leader , ZK1 recieves commit from leader. It seems not conform ZAB protocol. Please help me figure out, thanks

learner.sock.close();
}
} catch (IOException e1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.zookeeper.server.quorum.ZabUtils.createMockLeader;
import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand All @@ -35,13 +36,18 @@
import java.io.EOFException;
import java.io.File;
import java.io.FileReader;
import java.io.Flushable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
Expand All @@ -51,11 +57,16 @@
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.ByteBufferOutputStream;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
Expand Down Expand Up @@ -701,6 +712,271 @@ private void proposeSetData(QuorumPacket qp, long zxid, String data, int version
});
}

/**
* Tests a follower that has queued transactions in SyncRequestProcessor that are also already
* committed, with leader getting quorum for those transactions elsewhere in the ensemble, and
* then that the leader shuts down, triggering a new leader election, and partial resetting of
* state in the follower.
* In particular, this test was written to verify a bug where LearnerZooKeeperServer was not
* shut down, because shutdown() was erroneously called on the super class ZooKeeperServer,
* which led to its SyncRequestProcessor not being flushed during shutdown, and any queued
* transactions lost. This would only happen if the SyncRequestProcessor also crashed; this
* would happen as a consequence of the leader going down, causing the SendAckRequestProcessor
* to throw, and kill the sync thread.
* In the subsequent leader election, the quorum peer would use the committed state, even though
* this was not yet flushed to persistent storage, and never would be, after the sync thread died.
* If the correct server had been shut down, the queued transactions would instead either be
* flushed to persistent storage when the quorum peer shut down the old follower, or this would
* fail, causing state to be recreated from whatever state was already flushed, which again would
* be corrected in a DIFF from the new leader.
*
* @author jonmv
*/
@Test
public void testFollowerWithPendingSyncsOnLeaderReElection() throws Exception {
CountDownLatch followerSetUp = new CountDownLatch(1);
class BlockingRequestProcessor implements RequestProcessor, Flushable {
final Phaser phaser = new Phaser(2); // SyncRequestProcessor and test thread
final SendAckRequestProcessor nextProcessor; // SendAckRequestProcessor
BlockingRequestProcessor(SendAckRequestProcessor nextProcessor) { this.nextProcessor = nextProcessor; }
@Override public void processRequest(Request request) throws RequestProcessorException {
nextProcessor.processRequest(request);
}
@Override public void shutdown() {
phaser.forceTermination();
nextProcessor.shutdown();
}
@Override public void flush() throws IOException {
phaser.arriveAndAwaitAdvance(); // Let test thread know we're flushing.
phaser.arriveAndAwaitAdvance(); // Let test thread do more stuff while we wait here, simulating slow fsync, etc..
nextProcessor.flush();
}

}

class BlockingFollowerZooKeeperServer extends FollowerZooKeeperServer {
BlockingRequestProcessor blocker;
BlockingFollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
super(logFactory, self, zkDb);
}
@Override protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
blocker = new BlockingRequestProcessor(new SendAckRequestProcessor(getFollower()));
syncProcessor = new SyncRequestProcessor(this, blocker);
syncProcessor.start();
followerSetUp.countDown();
}
}

File followerDir = File.createTempFile("test", "dir", testData);
assertTrue(followerDir.delete());
assertTrue(followerDir.mkdir());

File leaderDir = File.createTempFile("test", "dir", testData);
assertTrue(leaderDir.delete());
assertTrue(leaderDir.mkdir());

Thread followerThread = null;
ConversableFollower follower = null;
QuorumPeer peer = null;
BlockingRequestProcessor blocker = null;

try (ServerSocket ss = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"))) {
peer = createQuorumPeer(followerDir);

FileTxnSnapLog logFactory = new FileTxnSnapLog(followerDir, followerDir);
peer.setTxnFactory(logFactory);
ZKDatabase zkDb = new ZKDatabase(logFactory);
BlockingFollowerZooKeeperServer zk = new BlockingFollowerZooKeeperServer(logFactory, peer, zkDb);
peer.setZKDatabase(zkDb);
follower = new ConversableFollower(peer, zk);
follower.setLeaderQuorumServer(new QuorumServer(1, (InetSocketAddress) ss.getLocalSocketAddress()));
peer.follower = follower;

CompletableFuture<Exception> followerExit = new CompletableFuture<>();
final Follower followerForThread = follower;
followerThread = new Thread(() -> {
try {
followerForThread.followLeader();
followerExit.complete(null);
} catch (Exception e) {
LOG.warn("Unexpected exception in follower thread", e);
followerExit.complete(e);
}
});
followerThread.start();

Socket leaderSocket = ss.accept();
InputArchive ia = BinaryInputArchive.getArchive(leaderSocket.getInputStream());
OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket.getOutputStream());

assertEquals(0, follower.self.getAcceptedEpoch());
assertEquals(0, follower.self.getCurrentEpoch());

// Set up a database with a single /foo node, on the leader
final long firstZxid = ZxidUtils.makeZxid(1, 1);
ZKDatabase leaderZkDb = new ZKDatabase(new FileTxnSnapLog(leaderDir, leaderDir));
leaderZkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, OpCode.create), new CreateTxn("/foo", "data1".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1), null);
Stat stat = new Stat();
assertEquals("data1", new String(leaderZkDb.getData("/foo", stat, null)));

QuorumPacket qp = new QuorumPacket();
readPacketSkippingPing(ia, qp);
assertEquals(Leader.FOLLOWERINFO, qp.getType());
assertEquals(qp.getZxid(), 0);
LearnerInfo learnInfo = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
assertEquals(learnInfo.getProtocolVersion(), 0x10000);
assertEquals(learnInfo.getServerid(), 0);

// We are simulating an established leader, so the epoch is 1
qp.setType(Leader.LEADERINFO);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
byte[] protoBytes = new byte[4];
ByteBuffer.wrap(protoBytes).putInt(0x10000);
qp.setData(protoBytes);
oa.writeRecord(qp, null);

readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACKEPOCH, qp.getType());
assertEquals(0, qp.getZxid());
assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
assertEquals(1, follower.self.getAcceptedEpoch());
assertEquals(0, follower.self.getCurrentEpoch());

// Send an empty diff
qp.setType(Leader.DIFF);
qp.setData(new byte[0]);
qp.setZxid(leaderZkDb.getDataTreeLastProcessedZxid());
oa.writeRecord(qp, null);

// Required for the ZK server to start up.
qp.setType(Leader.NEWLEADER);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
qp.setData(null);
oa.writeRecord(qp, null);

qp.setType(Leader.UPTODATE);
qp.setZxid(0);
oa.writeRecord(qp, null);

// Read the UPTODATE ack.
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());

// Get the ACK of the new leader.
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, follower.self.getAcceptedEpoch());
assertEquals(1, follower.self.getCurrentEpoch());

// The follower now starts following the leader.
// We send a PROPOSAL and a COMMIT, and wait for the transaction to be flushed by SyncRequestProcessor.
long createZxid = ZxidUtils.makeZxid(1, 2);
qp.setType(Leader.PROPOSAL);
qp.setZxid(createZxid);
TxnHeader hdr = new TxnHeader(13, 1313, createZxid, 33, OpCode.create);
CreateTxn ct = new CreateTxn("/bar", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeRecord(hdr, null);
boa.writeRecord(ct, null);
qp.setData(baos.toByteArray());
oa.writeRecord(qp, null);

qp.setType(Leader.COMMIT);
qp.setZxid(createZxid);
oa.writeRecord(qp, null);

// Wait for "fsync" to begin.
assertTrue(followerSetUp.await(10, TimeUnit.SECONDS));
blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker;
blocker.phaser.arriveAndAwaitAdvance();

// Now we send another PROPOSAL and COMMIT, and wait for them to be applied to the data tree.
// They will not be attempted flushed yet, because the ongoing "fsync" is slow (waiting on the phaser).
long createZxid2 = ZxidUtils.makeZxid(1, 3);
qp.setType(Leader.PROPOSAL);
qp.setZxid(createZxid2);
hdr = new TxnHeader(13, 1314, createZxid2, 34, OpCode.create);
ct = new CreateTxn("/baz", "bye".getBytes(), Ids.OPEN_ACL_UNSAFE, false, 1);
baos = new ByteArrayOutputStream();
boa = BinaryOutputArchive.getArchive(baos);
boa.writeRecord(hdr, null);
boa.writeRecord(ct, null);
qp.setData(baos.toByteArray());
oa.writeRecord(qp, null);

qp.setType(Leader.COMMIT);
qp.setZxid(createZxid2);
oa.writeRecord(qp, null);

// Wait for the follower to observe the COMMIT, and apply the PROPOSAL to its data tree. Unfortunately,
// there's nothing to do but sleep here, as watches are triggered before the last processed id is updated.
long doom = System.currentTimeMillis() + 1000;
while (createZxid != follower.fzk.getLastProcessedZxid() && System.currentTimeMillis() < doom) {
Thread.sleep(1);
}
assertEquals(createZxid, follower.fzk.getLastProcessedZxid());

// State recap: first create is flushing to disk, second is queued for flush;
// first and second creates are both applied to data tree.
// Leader now goes down, and signals this by closing its socket. The follower will then initiate a new
// leader election, where it is critical that its "last seen transaction" is indeed written;
// otherwise, any transactions queued for flushing to disk will be lost if the follower restarts again
// before taking a new snapshot.

// Additionally, any writes in-flight should be allowed to complete _before_ the fast-forward-from-edits,
// done when partially shutting down the learner zoo keeper server to prepare for a new leader election,
// to avoid _also_ getting the transactions for those writes in a DIFF from the new leader, appending them
// twice (or more) to the transaction log, which would also give digest mismatches when restoring state.
// This is not tested here, but fixing ZOOKEEPER-4541 also fixes this problem, by flushing writes first.

// Kill the leader
leaderSocket.close();
followerExit.get(); // This closes the socket SendAckRequestProcessor uses, and crashes the SyncRequestProcessor.
blocker.phaser.awaitAdvance(blocker.phaser.arriveAndDeregister()); // Let the in-flight "fsync" complete, and crash (above).

// A real QuorumPeer would now shut down the follower, and proceed to a new leader election.
follower.shutdown();

// The sync processor _should_ be dead now. Prior to the resolution of ZOOKEEPER-4541, it would only be
// dead because it had crashed, which was a bug in itself.
follower.fzk.syncProcessor.join(1000);
assertFalse(follower.fzk.syncProcessor.isAlive());

// Make sure the recorded data matches what we'll use for leader election.
File logDir = follower.fzk.getTxnLogFactory().getDataDir().getParentFile();
File snapDir = follower.fzk.getTxnLogFactory().getSnapDir().getParentFile();
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
zkDb2.loadDataBase();
assertEquals(createZxid2, zkDb.getDataTreeLastProcessedZxid(), "last create zxid is used for leader election");
assertEquals(createZxid2, zkDb2.getDataTreeLastProcessedZxid(), "last create zxid is written to persistent storage");
} finally {
if (blocker != null) {
blocker.phaser.forceTermination();
}
if (follower != null) {
follower.shutdown();
}
if (followerThread != null) {
followerThread.interrupt();
followerThread.join();
}
if (peer != null) {
peer.shutdown();
}
TestUtils.deleteFileRecursively(leaderDir);
TestUtils.deleteFileRecursively(followerDir);
}
}

@Test
public void testNormalFollowerRunWithDiff() throws Exception {
testFollowerConversation(new FollowerConversation() {
Expand Down