Skip to content

Commit

Permalink
Do not swallow exceptions in TimedRunnable (#39856)
Browse files Browse the repository at this point in the history
Executors of type fixed_auto_queue_size (i.e. search / search_throttled) wrap runnables into
TimedRunnable, which is an AbstractRunnable. This is dangerous as it might silently swallow
exceptions, and possibly miss calling a response listener. While this has not triggered any failures in
the tests I have run so far, it might help uncover future problems.

Follow-up to #36137
  • Loading branch information
ywelsch committed Mar 11, 2019
1 parent 292eb8b commit 4f941c6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,20 +163,12 @@ protected void doRun() {

public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException {
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
final boolean expectExceptionOnExecute =
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE;
checkExecutionException(getExecuteRunner(threadPool.executor(executor)), expectExceptionOnExecute);
checkExecutionException(getExecuteRunner(threadPool.executor(executor)), true);

// here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
checkExecutionException(getSubmitRunner(threadPool.executor(executor)), false);

final boolean expectExceptionOnSchedule =
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE;
checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule);
checkExecutionException(getScheduleRunner(executor), true);
}
}

Expand Down Expand Up @@ -213,8 +205,7 @@ public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
checkExecutionException(getExecuteRunner(autoQueueFixedExecutor), false);
checkExecutionException(getExecuteRunner(autoQueueFixedExecutor), true);
checkExecutionException(getSubmitRunner(autoQueueFixedExecutor), false);
} finally {
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.ExceptionsHelper;

/**
* A class used to wrap a {@code Runnable} that allows capturing the time of the task since creation
* through execution as well as only execution time.
Expand Down Expand Up @@ -48,6 +50,8 @@ public void doRun() {
public void onRejection(final Exception e) {
if (original instanceof AbstractRunnable) {
((AbstractRunnable) original).onRejection(e);
} else {
ExceptionsHelper.reThrowIfNotNull(e);
}
}

Expand All @@ -62,6 +66,8 @@ public void onAfter() {
public void onFailure(final Exception e) {
if (original instanceof AbstractRunnable) {
((AbstractRunnable) original).onFailure(e);
} else {
ExceptionsHelper.reThrowIfNotNull(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,34 @@ protected void doRun() throws Exception {
assertTrue(onAfter.get());
}

public void testTimedRunnableRethrowsExceptionWhenNotAbstractRunnable() {
final AtomicBoolean hasRun = new AtomicBoolean();
final RuntimeException exception = new RuntimeException();

final Runnable runnable = () -> {
hasRun.set(true);
throw exception;
};

final TimedRunnable timedRunnable = new TimedRunnable(runnable);
final RuntimeException thrown = expectThrows(RuntimeException.class, () -> timedRunnable.run());
assertTrue(hasRun.get());
assertSame(exception, thrown);
}

public void testTimedRunnableRethrowsRejectionWhenNotAbstractRunnable() {
final AtomicBoolean hasRun = new AtomicBoolean();
final RuntimeException exception = new RuntimeException();

final Runnable runnable = () -> {
hasRun.set(true);
throw new AssertionError("should not run");
};

final TimedRunnable timedRunnable = new TimedRunnable(runnable);
final RuntimeException thrown = expectThrows(RuntimeException.class, () -> timedRunnable.onRejection(exception));
assertFalse(hasRun.get());
assertSame(exception, thrown);
}

}

0 comments on commit 4f941c6

Please sign in to comment.