Skip to content

Commit

Permalink
Ensure TXNs are flushed before ACK of NEWLEADER, and ensure expected …
Browse files Browse the repository at this point in the history
…ACK order
  • Loading branch information
jonmv committed Oct 3, 2022
1 parent 4aab1fd commit d30c2f5
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -55,6 +56,56 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req

private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;

private static class FlushRequest extends Request {
private final CountDownLatch latch = new CountDownLatch(1);
public FlushRequest() {
super(null, 0, 0, 0, null, null);
}
}

private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null);
private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null);

private static class DelayingProcessor implements RequestProcessor, Flushable {
private final RequestProcessor next;
private Queue<Request> delayed = null;
private DelayingProcessor(RequestProcessor next) {
this.next = next;
}
@Override
public void flush() throws IOException {
if (delayed == null && next instanceof Flushable) {
((Flushable) next).flush();
}
}
@Override
public void processRequest(Request request) throws RequestProcessorException {
if (delayed == null) {
next.processRequest(request);
} else {
delayed.add(request);
}
}
@Override
public void shutdown() {
next.shutdown();
}
private synchronized void close() {
if (delayed != null) {
delayed = new ArrayDeque<>();
}
}
private synchronized void open() throws RequestProcessorException {
if (delayed == null) {
return;
}
for (Request request : delayed) {
next.processRequest(request);
}
delayed = null;
}
}

/** The number of log entries to log before starting a snapshot */
private static int snapCount = ZooKeeperServer.getSnapCount();

Expand All @@ -75,7 +126,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req

private final ZooKeeperServer zks;

private final RequestProcessor nextProcessor;
private final DelayingProcessor nextProcessor;

/**
* Transactions that have been written and are waiting to be flushed to
Expand All @@ -88,7 +139,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor);
this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
}

Expand Down Expand Up @@ -174,6 +225,21 @@ public void run() {
break;
}

if (si == turnForwardingDelayOn) {
nextProcessor.close();
continue;
}
if (si == turnForwardingDelayOff) {
nextProcessor.open();
continue;
}

if (si instanceof FlushRequest) {
flush();
((FlushRequest) si).latch.countDown();
continue;
}

long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

Expand Down Expand Up @@ -224,6 +290,17 @@ public void run() {
LOG.info("SyncRequestProcessor exited!");
}

/** Flushes all pending writes, and waits for this to complete. */
public void syncFlush() throws InterruptedException {
FlushRequest marker = new FlushRequest();
queuedRequests.add(marker);
marker.latch.await();
}

public void setDelayForwarding(boolean delayForwarding) {
queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff);
}

private void flush() throws IOException, RequestProcessorException {
if (this.toFlush.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ public Follower getFollower() {
return self.follower;
}

public synchronized void startupWithoutServing() {
super.startupWithoutServing();
syncProcessor.setDelayForwarding(true);
}

public synchronized void startServing() {
syncProcessor.setDelayForwarding(false);
setState(State.RUNNING);
notifyAll();
}


@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.Flushable;
import java.io.IOException;
import java.net.Socket;
import java.util.ArrayDeque;
import java.util.Queue;

import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
Expand All @@ -32,7 +35,8 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable {

private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);

Learner learner;
final Queue<Request> toAck = new ArrayDeque<>();
final Learner learner;

SendAckRequestProcessor(Learner peer) {
this.learner = peer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ void readPacket(QuorumPacket pp) throws IOException {
super.readPacket(pp);
if (injectError && pp.getType() == Leader.UPTODATE) {
String type = LearnerHandler.packetToString(pp);
throw new SocketTimeoutException("Socket timeout while reading the packet for operation "
throw new SocketTimeoutException("TEST CODE Socket timeout while reading the packet for operation "
+ type);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ public void testFollowerWithPendingSyncsOnLeaderReElection() throws Exception {
CountDownLatch followerSetUp = new CountDownLatch(1);

class BlockingRequestProcessor implements RequestProcessor, Flushable {
final Phaser phaser = new Phaser(2); // SyncRequestProcessor and test thread
final Phaser phaser = new Phaser(1); // SyncRequestProcessor; test thread will register later.

final SendAckRequestProcessor nextProcessor; // SendAckRequestProcessor

Expand Down Expand Up @@ -883,7 +883,7 @@ protected void setupRequestProcessors() {

// Required for the ZK server to start up.
qp.setType(Leader.NEWLEADER);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
qp.setZxid(0);
qp.setData(null);
oa.writeRecord(qp, null);

Expand All @@ -896,20 +896,27 @@ protected void setupRequestProcessors() {
qp.setZxid(0);
oa.writeRecord(qp, null);

// Read the UPTODATE ack.
// Get the ACK of the new leader.
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, follower.self.getAcceptedEpoch());
assertEquals(1, follower.self.getCurrentEpoch());

// Get the ACK of the new leader.
// Read the UPTODATE ack.
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, follower.self.getAcceptedEpoch());
assertEquals(1, follower.self.getCurrentEpoch());

// Read the PROPOSAL ack.
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(createZxid0, qp.getZxid());

// The follower now starts following the leader.
// We send a PROPOSAL and a COMMIT, and wait for the transaction to be flushed by SyncRequestProcessor.
blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker;
blocker.phaser.register();
long createZxid1 = ZxidUtils.makeZxid(1, 3);
qp.setType(Leader.PROPOSAL);
qp.setZxid(createZxid1);
Expand All @@ -928,7 +935,6 @@ protected void setupRequestProcessors() {

// Wait for "fsync" to begin.
assertTrue(followerSetUp.await(10, TimeUnit.SECONDS));
blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker;
blocker.phaser.arriveAndAwaitAdvance();

// Now we send another PROPOSAL and COMMIT, and wait for them to be applied to the data tree.
Expand Down Expand Up @@ -1075,17 +1081,22 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f)
qp.setZxid(0);
oa.writeRecord(qp, null);

// Read the uptodate ack
// Get the ack of the new leader
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(1, f.self.getCurrentEpoch());

// Get the ack of the new leader
// Read the uptodate ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(1, f.self.getCurrentEpoch());

// Read the create session ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(createSessionZxid, qp.getZxid());

//Wait for the transactions to be written out. The thread that writes them out
// does not send anything back when it is done.
Expand Down

0 comments on commit d30c2f5

Please sign in to comment.