Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bubble-up exceptions from scheduler #38317

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,13 @@

package org.elasticsearch.threadpool;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.junit.After;
import org.junit.Before;

Expand All @@ -44,7 +38,6 @@
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;

Expand Down Expand Up @@ -367,63 +360,28 @@ private void runExecutionTest(
uncaughtExceptionHandlerLatch.countDown();
});


final CountDownLatch supplierLatch = new CountDownLatch(1);

Runnable job = () -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
};

// snoop on logging to also handle the cases where exceptions are simply logged in Scheduler.
final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class);
final MockLogAppender appender = new MockLogAppender();
appender.addExpectation(
new MockLogAppender.LoggingExpectation() {
@Override
public void match(LogEvent event) {
if (event.getLevel() == Level.WARN) {
assertThat("no other warnings than those expected",
event.getMessage().getFormattedMessage(),
equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]"));
assertTrue(expectThrowable);
assertNotNull(event.getThrown());
assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown()));
uncaughtExceptionHandlerLatch.countDown();
}
}

@Override
public void assertMatched() {
try {
runner.accept(() -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
});
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}

appender.start();
Loggers.addAppender(schedulerLogger, appender);
try {
try {
runner.accept(job);
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}

supplierLatch.await();
supplierLatch.await();

if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
}
} finally {
Loggers.removeAppender(schedulerLogger, appender);
appender.stop();
if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
}

consumer.accept(Optional.ofNullable(throwableReference.get()));
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} finally {
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpo
* before we could be cancelled by the notification. In this case, our listener here would
* not be in the map and we should not fire the timeout logic.
*/
removed = listeners.remove(listener).v2() != null;
removed = listeners.remove(listener) != null;
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
Expand Down
19 changes: 9 additions & 10 deletions server/src/main/java/org/elasticsearch/threadpool/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -47,8 +47,8 @@ public interface Scheduler {
/**
* Create a scheduler that can be used client side. Server side, please use <code>ThreadPool.schedule</code> instead.
*
* Notice that if any scheduled jobs fail with an exception, they will be logged as a warning. This includes jobs started
* using execute, submit and schedule.
* Notice that if any scheduled jobs fail with an exception, these will bubble up to the uncaught exception handler where they will
* be logged as a warning. This includes jobs started using execute, submit and schedule.
* @param settings the settings to use
* @return executor
*/
Expand Down Expand Up @@ -250,7 +250,8 @@ public void onAfter() {
}

/**
* This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks
* This subclass ensures to properly bubble up Throwable instances of both type Error and Exception thrown in submitted/scheduled
* tasks to the uncaught exception handler
*/
class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class);
Expand All @@ -272,12 +273,10 @@ public SafeScheduledThreadPoolExecutor(int corePoolSize) {

@Override
protected void afterExecute(Runnable r, Throwable t) {
Throwable exception = EsExecutors.rethrowErrors(r);
if (exception != null) {
logger.warn(() ->
new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()),
exception);
}
if (t != null) return;
// Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we
// accept the wrapped exception in the output.
ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,11 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String e
}

public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) {
if (!Names.SAME.equals(executor)) {
command = new ThreadedRunnableAllowShutdown(command, executor(executor));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason for this change? It's unrelated to this PR, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is related to this PR. If you schedule something on a non-SAME executor and then shutdown the threadpool afterwards (before the delay is passed), the execute on the executor will fail, causing an exception. This exception is thrown on the scheduler thread. So far this caused no issues, since it was ignored (and recently logged). But with the changes in this PR, the exception was bubbled to the uncaught exception handler causing tests to fail.

Above fixes that such that scheduleUnlessShuttingDown allows both the schedule call itself to pass and the subsequent execute on the executor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the regular .schedule method not suffer from the same issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will potentially, though none of the CI runs ran into it.

I guess we could argue that before the warning/exception changes, this would go unnoticed for both schedule and scheduleUnlessShuttingDown and therefore we should silently ignore this in both cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the shutdown procedure is relevant to this question. ThreadPool.terminate does a regular shutdown first on both scheduler and pool executors. This allows any scheduled tasks on the ScheduledThreadPoolExecutor to run to completion but does not allow executing them on the pool executor.

So scheduling jobs using SAME executor with delay until after shutdown will complete, using any other executor will not.

Given that we have both methods, I think the right solution is to make sure that tasks scheduled to run within the terminate timeout will run. I think this belongs in a follow-up PR. Two options:

  1. During terminate, we can do awaitTermination of the scheduler before calling shutdown on the rest of the executors.
  2. If a scheduled job fails to call execute on the executor, we simply call it in current thread (ie. one of the scheduler threads).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this on another channel. Conclusion is to silently ignore scheduled tasks that fail to re-execute on the target executor if the target executor is shutdown (ie. like it was before all our changes).

scheduleUnlessShuttingDown was primarily added to avoid having to catch and handle the rejected exception everywhere. Down the road we should likely merge the two methods into one after analyzing all usages.

}
try {
schedule(command, delay, executor);
scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug(new ParameterizedMessage("could not schedule execution of [{}] after [{}] on [{}] as executor is shut down",
Expand Down Expand Up @@ -520,6 +523,36 @@ public String toString() {
}
}

class ThreadedRunnableAllowShutdown implements Runnable {
private final Runnable runnable;

private final Executor executor;

ThreadedRunnableAllowShutdown(Runnable runnable, Executor executor) {
this.runnable = runnable;
this.executor = executor;
}

@Override
public void run() {
try {
executor.execute(runnable);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug(new ParameterizedMessage("could not schedule execution of [{}] on [{}] as executor is shut down",
runnable, executor), e);
} else {
throw e;
}
}
}

@Override
public String toString() {
return "[threaded-allow-shutdown]" + runnable.toString();
}
}

/**
* A thread to cache millisecond time values from
* {@link System#nanoTime()} and {@link System#currentTimeMillis()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,5 +215,10 @@ public ScheduledCancellable schedule(Runnable task, TimeValue delay, String exec
scheduledTasks.add(new Tuple<>(delay, task));
return null;
}

@Override
public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) {
schedule(command, delay, executor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,12 @@ public boolean isCancelled() {
};
}

@Override
public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) {
// reject on shutdown not implemented for deterministic task queue, so this is equivalent to schedule here.
schedule(command, delay, executor);
}

@Override
public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) {
return super.scheduleWithFixedDelay(command, interval, executor);
Expand Down