Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-4541 Ephemeral znode owned by closed session visible in 1 of 3 servers #1925

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -55,6 +56,55 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req

private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;

private static class FlushRequest extends Request {
private final CountDownLatch latch = new CountDownLatch(1);
public FlushRequest() {
super(null, 0, 0, 0, null, null);
}
}

private static final Request TURN_FORWARDING_DELAY_ON_REQUEST = new Request(null, 0, 0, 0, null, null);
private static final Request TURN_FORWARDING_DELAY_OFF_REQUEST = new Request(null, 0, 0, 0, null, null);

private static class DelayingProcessor implements RequestProcessor, Flushable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be a new processor? could we inline this into the SyncRequestProcessor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to, but I found the SyncRequestProcessor to be complicated enough already, and this was a separate set of concerns, so I felt it was cleaner to put it in a separate processor.

private final RequestProcessor next;
private Queue<Request> delayed = null;
private DelayingProcessor(RequestProcessor next) {
this.next = next;
}
@Override
public void flush() throws IOException {
if (delayed == null && next instanceof Flushable) {
((Flushable) next).flush();
}
}
@Override
public void processRequest(Request request) throws RequestProcessorException {
if (delayed == null) {
next.processRequest(request);
} else {
delayed.add(request);
}
}
@Override
public void shutdown() {
next.shutdown();
}
private void startDelaying() {
if (delayed == null) {
delayed = new ArrayDeque<>();
}
}
private void flushAndStopDelaying() throws RequestProcessorException {
if (delayed != null) {
for (Request request : delayed) {
next.processRequest(request);
}
delayed = null;
}
}
}

/** The number of log entries to log before starting a snapshot */
private static int snapCount = ZooKeeperServer.getSnapCount();

Expand All @@ -75,7 +125,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req

private final ZooKeeperServer zks;

private final RequestProcessor nextProcessor;
private final DelayingProcessor nextProcessor;

/**
* Transactions that have been written and are waiting to be flushed to
Expand All @@ -88,7 +138,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor);
this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
}

Expand Down Expand Up @@ -174,6 +224,21 @@ public void run() {
break;
}

if (si == TURN_FORWARDING_DELAY_ON_REQUEST) {
nextProcessor.startDelaying();
continue;
}
if (si == TURN_FORWARDING_DELAY_OFF_REQUEST) {
nextProcessor.flushAndStopDelaying();
continue;
}

if (si instanceof FlushRequest) {
flush();
((FlushRequest) si).latch.countDown();
continue;
}

long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

Expand Down Expand Up @@ -206,9 +271,7 @@ public void run() {
// and there are no pending flushes (writes), then just pass this to the next processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
nextProcessor.flush();
}
continue;
}
Expand All @@ -224,6 +287,17 @@ public void run() {
LOG.info("SyncRequestProcessor exited!");
}

/** Flushes all pending writes, and waits for this to complete. */
public void syncFlush() throws InterruptedException {
FlushRequest marker = new FlushRequest();
queuedRequests.add(marker);
marker.latch.await();
}

public void setDelayForwarding(boolean delayForwarding) {
queuedRequests.add(delayForwarding ? TURN_FORWARDING_DELAY_ON_REQUEST : TURN_FORWARDING_DELAY_OFF_REQUEST);
}

private void flush() throws IOException, RequestProcessorException {
if (this.toFlush.isEmpty()) {
return;
Expand All @@ -244,9 +318,7 @@ private void flush() throws IOException, RequestProcessorException {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
this.nextProcessor.processRequest(i);
}
if (this.nextProcessor instanceof Flushable) {
((Flushable) this.nextProcessor).flush();
}
nextProcessor.flush();
}
lastFlushTime = Time.currentElapsedTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ public synchronized void shutdown(boolean fullyShutDown) {
// * If we fetch a new snapshot from leader, the zkDb will be
// cleared anyway before loading the snapshot
try {
//This will fast forward the database to the latest recorded transactions
// This will fast-forward the database to the latest recorded transactions
zkDb.fastForwardDataBase();
} catch (IOException e) {
LOG.error("Error updating DB", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ protected void unregisterMetrics() {
}

@Override
public synchronized void shutdown() {
public synchronized void shutdown(boolean fullyShutDown) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about the modification here has an impact on the invoking chain.

Before modification: Leader.shutdown(String) -> LeaderZooKeeperServer.shutdown() -> ZooKeeperServer.shutdown()
After modification: Leader.shutdown() -> ZooKeeperServer.shutdown()

LeaderZooKeeperServer.shutdown is skipped and containerManager does not stop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZooKeeperServer.shutdown() only calls shutdown(false), which is implemented in LeaderZooKeeperServer, and which stops the containerManager. shutdown() isn't overridden anywhere anymore.

if (containerManager != null) {
containerManager.stop();
}
super.shutdown();
super.shutdown(fullyShutDown);
}

@Override
Expand Down
Loading