Skip to content

Commit

Permalink
Ensure TXNs are flushed before ACK of NEWLEADER, and ensure expected …
Browse files Browse the repository at this point in the history
…ACK order
  • Loading branch information
jonmv committed Oct 3, 2022
1 parent 4aab1fd commit 5e0c559
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -55,6 +56,55 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req

private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;

private static class FlushRequest extends Request {
private final CountDownLatch latch = new CountDownLatch(1);
public FlushRequest() {
super(null, 0, 0, 0, null, null);
}
}

private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null);
private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null);

private static class DelayingProcessor implements RequestProcessor, Flushable {
private final RequestProcessor next;
private Queue<Request> delayed = null;
private DelayingProcessor(RequestProcessor next) {
this.next = next;
}
@Override
public void flush() throws IOException {
if (delayed == null && next instanceof Flushable) {
((Flushable) next).flush();
}
}
@Override
public void processRequest(Request request) throws RequestProcessorException {
if (delayed == null) {
next.processRequest(request);
} else {
delayed.add(request);
}
}
@Override
public void shutdown() {
next.shutdown();
}
private void close() {
if (delayed == null) {
delayed = new ArrayDeque<>();
}
}
private void open() throws RequestProcessorException {
if (delayed != null) {
for (Request request : delayed) {
next.processRequest(request);
}
delayed = null;
}
}
}

/** The number of log entries to log before starting a snapshot */
private static int snapCount = ZooKeeperServer.getSnapCount();

Expand All @@ -75,7 +125,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req

private final ZooKeeperServer zks;

private final RequestProcessor nextProcessor;
private final DelayingProcessor nextProcessor;

/**
* Transactions that have been written and are waiting to be flushed to
Expand All @@ -88,7 +138,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor);
this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
}

Expand Down Expand Up @@ -174,6 +224,21 @@ public void run() {
break;
}

if (si == turnForwardingDelayOn) {
nextProcessor.close();
continue;
}
if (si == turnForwardingDelayOff) {
nextProcessor.open();
continue;
}

if (si instanceof FlushRequest) {
flush();
((FlushRequest) si).latch.countDown();
continue;
}

long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

Expand Down Expand Up @@ -224,6 +289,17 @@ public void run() {
LOG.info("SyncRequestProcessor exited!");
}

/** Flushes all pending writes, and waits for this to complete. */
public void syncFlush() throws InterruptedException {
FlushRequest marker = new FlushRequest();
queuedRequests.add(marker);
marker.latch.await();
}

public void setDelayForwarding(boolean delayForwarding) {
queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff);
}

private void flush() throws IOException, RequestProcessorException {
if (this.toFlush.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,13 +760,21 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
fzk.syncProcessor.setDelayForwarding(true);
for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
packetsNotLogged.clear();
fzk.syncProcessor.syncFlush();
}

writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);

if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
fzk.syncProcessor.setDelayForwarding(false);
fzk.syncProcessor.syncFlush();
}
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable {

private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);

Learner learner;
final Learner learner;

SendAckRequestProcessor(Learner peer) {
this.learner = peer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class DIFFSyncConsistencyTest extends QuorumPeerTestBase {
private MainThread[] mt = new MainThread[SERVER_COUNT];

@Test
@Timeout(value = 120)
@Timeout(value = 20)
public void testInconsistentDueToUncommittedLog() throws Exception {
final int LEADER_TIMEOUT_MS = 10_000;
final int[] clientPorts = new int[SERVER_COUNT];
Expand Down Expand Up @@ -251,7 +251,7 @@ void readPacket(QuorumPacket pp) throws IOException {
super.readPacket(pp);
if (injectError && pp.getType() == Leader.UPTODATE) {
String type = LearnerHandler.packetToString(pp);
throw new SocketTimeoutException("Socket timeout while reading the packet for operation "
throw new SocketTimeoutException("TEST CODE Socket timeout while reading the packet for operation "
+ type);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ public void testFollowerWithPendingSyncsOnLeaderReElection() throws Exception {
CountDownLatch followerSetUp = new CountDownLatch(1);

class BlockingRequestProcessor implements RequestProcessor, Flushable {
final Phaser phaser = new Phaser(2); // SyncRequestProcessor and test thread
final Phaser phaser = new Phaser(1); // SyncRequestProcessor; test thread will register later.

final SendAckRequestProcessor nextProcessor; // SendAckRequestProcessor

Expand Down Expand Up @@ -896,20 +896,27 @@ protected void setupRequestProcessors() {
qp.setZxid(0);
oa.writeRecord(qp, null);

// Read the UPTODATE ack.
// Get the ACK of the new leader.
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, follower.self.getAcceptedEpoch());
assertEquals(1, follower.self.getCurrentEpoch());

// Get the ACK of the new leader.
// Read the PROPOSAL ack.
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(createZxid0, qp.getZxid());

// Read the UPTODATE ack.
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, follower.self.getAcceptedEpoch());
assertEquals(1, follower.self.getCurrentEpoch());

// The follower now starts following the leader.
// We send a PROPOSAL and a COMMIT, and wait for the transaction to be flushed by SyncRequestProcessor.
blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker;
blocker.phaser.register();
long createZxid1 = ZxidUtils.makeZxid(1, 3);
qp.setType(Leader.PROPOSAL);
qp.setZxid(createZxid1);
Expand All @@ -928,7 +935,6 @@ protected void setupRequestProcessors() {

// Wait for "fsync" to begin.
assertTrue(followerSetUp.await(10, TimeUnit.SECONDS));
blocker = ((BlockingFollowerZooKeeperServer) follower.zk).blocker;
blocker.phaser.arriveAndAwaitAdvance();

// Now we send another PROPOSAL and COMMIT, and wait for them to be applied to the data tree.
Expand Down Expand Up @@ -1075,17 +1081,22 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f)
qp.setZxid(0);
oa.writeRecord(qp, null);

// Read the uptodate ack
// Get the ack of the new leader
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(1, f.self.getCurrentEpoch());

// Get the ack of the new leader
// Read the create session ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(createSessionZxid, qp.getZxid());

// Read the uptodate ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(1, f.self.getCurrentEpoch());

//Wait for the transactions to be written out. The thread that writes them out
// does not send anything back when it is done.
Expand Down

0 comments on commit 5e0c559

Please sign in to comment.