From fa0cd590f7b14615910779da878f83ec2549ca10 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 4 Feb 2019 11:24:54 +0100 Subject: [PATCH 1/8] Bubble-up exceptions from scheduler Instead of logging warnings we now rethrow exceptions thrown inside scheduled/submitted tasks. This will still log them as warnings in production but has the added benefit that if they are thrown during unit/integration test runs, the test will be flagged as an error. This is a continuation of #38014 --- .../threadpool/EvilThreadPoolTests.java | 68 ++++--------------- .../elasticsearch/threadpool/Scheduler.java | 19 +++--- 2 files changed, 22 insertions(+), 65 deletions(-) diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java index 02ba33f19c4f8..99bde679f79e9 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java @@ -19,11 +19,6 @@ package org.elasticsearch.threadpool; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.LogEvent; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -31,7 +26,6 @@ import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.MockLogAppender; import org.junit.After; import org.junit.Before; @@ -44,7 +38,6 @@ import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; @@ -367,63 +360,28 @@ private void runExecutionTest( uncaughtExceptionHandlerLatch.countDown(); }); - final CountDownLatch supplierLatch = new CountDownLatch(1); - Runnable job = () -> { - try { - runnable.run(); - } finally { - supplierLatch.countDown(); - } - }; - - // snoop on logging to also handle the cases where exceptions are simply logged in Scheduler. - final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class); - final MockLogAppender appender = new MockLogAppender(); - appender.addExpectation( - new MockLogAppender.LoggingExpectation() { - @Override - public void match(LogEvent event) { - if (event.getLevel() == Level.WARN) { - assertThat("no other warnings than those expected", - event.getMessage().getFormattedMessage(), - equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]")); - assertTrue(expectThrowable); - assertNotNull(event.getThrown()); - assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown())); - uncaughtExceptionHandlerLatch.countDown(); - } - } - - @Override - public void assertMatched() { + try { + runner.accept(() -> { + try { + runnable.run(); + } finally { + supplierLatch.countDown(); } }); + } catch (Throwable t) { + consumer.accept(Optional.of(t)); + return; + } - appender.start(); - Loggers.addAppender(schedulerLogger, appender); - try { - try { - runner.accept(job); - } catch (Throwable t) { - consumer.accept(Optional.of(t)); - return; - } - - supplierLatch.await(); + supplierLatch.await(); - if (expectThrowable) { - uncaughtExceptionHandlerLatch.await(); - } - } finally { - Loggers.removeAppender(schedulerLogger, appender); - appender.stop(); + if (expectThrowable) { + uncaughtExceptionHandlerLatch.await(); } consumer.accept(Optional.ofNullable(throwableReference.get())); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); } finally { Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); } diff --git a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java index 4c1ad6a3715c6..588495dd27d32 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java +++ b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java @@ -21,7 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -47,8 +47,8 @@ public interface Scheduler { /** * Create a scheduler that can be used client side. Server side, please use ThreadPool.schedule instead. * - * Notice that if any scheduled jobs fail with an exception, they will be logged as a warning. This includes jobs started - * using execute, submit and schedule. + * Notice that if any scheduled jobs fail with an exception, these will bubble up to the uncaught exception handler where they will + * be logged as a warning. This includes jobs started using execute, submit and schedule. * @param settings the settings to use * @return executor */ @@ -250,7 +250,8 @@ public void onAfter() { } /** - * This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks + * This subclass ensures to properly bubble up Throwable instances of both type Error and Exception thrown in submitted/scheduled + * tasks to the uncaught exception handler */ class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class); @@ -272,12 +273,10 @@ public SafeScheduledThreadPoolExecutor(int corePoolSize) { @Override protected void afterExecute(Runnable r, Throwable t) { - Throwable exception = EsExecutors.rethrowErrors(r); - if (exception != null) { - logger.warn(() -> - new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()), - exception); - } + if (t != null) return; + // Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we + // accept the wrapped exception in the output. + ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r)); } } } From c75b7d1bae5dd88f81669a0bd83fcffebd90f2d5 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 4 Feb 2019 13:54:52 +0100 Subject: [PATCH 2/8] NPE fix in GlobalCheckPointListeners Fixed NPE that caused CCR tests (IndexFollowingIT and likely others) to fail. --- .../elasticsearch/index/shard/GlobalCheckpointListeners.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index eb9e36eeec0a8..d45a77ddb22ac 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -135,7 +135,7 @@ synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpo * before we could be cancelled by the notification. In this case, our listener here would * not be in the map and we should not fire the timeout logic. */ - removed = listeners.remove(listener).v2() != null; + removed = listeners.remove(listener) != null; } if (removed) { final TimeoutException e = new TimeoutException(timeout.getStringRep()); From 5f851e7ce636e9f0f4816b5843366b3f0ea55ccc Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 4 Feb 2019 15:04:46 +0100 Subject: [PATCH 3/8] Fix ThreadPool.scheduleUnlessShuttingDown scheduleUnlessShuttingDown could bubble rejected exception to uncaught exception handler when not using SAME executor. Now ignore rejected exception if executor is shutdown. --- .../elasticsearch/threadpool/ThreadPool.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index d42abf6b4e94b..46dafeb507d52 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -354,6 +354,9 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String e } public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) { + if (!Names.SAME.equals(executor)) { + command = new ThreadedRunnableAllowShutdown(command, executor(executor)); + } try { schedule(command, delay, executor); } catch (EsRejectedExecutionException e) { @@ -520,6 +523,36 @@ public String toString() { } } + class ThreadedRunnableAllowShutdown implements Runnable { + private final Runnable runnable; + + private final Executor executor; + + ThreadedRunnableAllowShutdown(Runnable runnable, Executor executor) { + this.runnable = runnable; + this.executor = executor; + } + + @Override + public void run() { + try { + executor.execute(runnable); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + logger.debug(new ParameterizedMessage("could not schedule execution of [{}] on [{}] as executor is shut down", + runnable, executor), e); + } else { + throw e; + } + } + } + + @Override + public String toString() { + return "[threaded-allow-shutdown]" + runnable.toString(); + } + } + /** * A thread to cache millisecond time values from * {@link System#nanoTime()} and {@link System#currentTimeMillis()}. From 3e149926f1a143d7ef1a99f5d3bfd8b41ad382f5 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 4 Feb 2019 15:26:42 +0100 Subject: [PATCH 4/8] Fix ThreadPool.scheduleUnlessShuttingDown scheduleUnlessShuttingDown could bubble rejected exception to uncaught exception handler when not using SAME executor. Now ignore rejected exception if executor is shutdown. --- .../src/main/java/org/elasticsearch/threadpool/ThreadPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 46dafeb507d52..d8141a6e59b01 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -358,7 +358,7 @@ public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnabl command = new ThreadedRunnableAllowShutdown(command, executor(executor)); } try { - schedule(command, delay, executor); + scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS); } catch (EsRejectedExecutionException e) { if (e.isExecutorShutdown()) { logger.debug(new ParameterizedMessage("could not schedule execution of [{}] after [{}] on [{}] as executor is shut down", From 8ed0a92d35b7172a363285be1286c3042ecc3834 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 4 Feb 2019 16:46:09 +0100 Subject: [PATCH 5/8] Fix ThreadPool.scheduleUnlessShuttingDown Fixed test failure. --- .../cluster/coordination/DeterministicTaskQueue.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index f14afe3c36534..6294a9b7ed55f 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -22,9 +22,11 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.Counter; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolInfo; import org.elasticsearch.threadpool.ThreadPoolStats; @@ -388,6 +390,12 @@ public boolean isCancelled() { }; } + @Override + public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) { + // reject on shutdown not implemented for deterministic task queue, so this is equivalent to schedule here. + schedule(command, delay, executor); + } + @Override public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) { return super.scheduleWithFixedDelay(command, interval, executor); From 7348731f1040c231cbcb6f12ae828763bba287fb Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 4 Feb 2019 16:51:21 +0100 Subject: [PATCH 6/8] Fix ThreadPool.scheduleUnlessShuttingDown Fixed test failure. --- .../org/elasticsearch/transport/TransportKeepAliveTests.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java b/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java index e400292873411..ceea4c0fc8680 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java @@ -215,5 +215,10 @@ public ScheduledCancellable schedule(Runnable task, TimeValue delay, String exec scheduledTasks.add(new Tuple<>(delay, task)); return null; } + + @Override + public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) { + schedule(command, delay, executor); + } } } From f44bdc850bd1a6afdccef6eefbebb12a951cfb5a Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 4 Feb 2019 17:02:34 +0100 Subject: [PATCH 7/8] Fix ThreadPool.scheduleUnlessShuttingDown Checkstyle fix. --- .../cluster/coordination/DeterministicTaskQueue.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 6294a9b7ed55f..1227e678181b3 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -22,11 +22,9 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.Counter; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolInfo; import org.elasticsearch.threadpool.ThreadPoolStats; From 2518d7ee75c10bbe6f66421b00e13d5807fa6542 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Tue, 5 Feb 2019 12:37:34 +0100 Subject: [PATCH 8/8] ThreadPool.schedule disregard shutdown Like scheduleUnlessShuttingDown, we want to silently ignore exceptions thrown during execute on target executor. Once ThreadPool.terminate() has done shutdown of thread pools, all scheduled tasks not using SAME executor will not run. --- .../elasticsearch/threadpool/ThreadPool.java | 46 +++++-------------- .../transport/TransportKeepAliveTests.java | 5 -- .../coordination/DeterministicTaskQueue.java | 6 --- 3 files changed, 11 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index d8141a6e59b01..5ca2b15d6ffe0 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -354,11 +354,8 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String e } public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) { - if (!Names.SAME.equals(executor)) { - command = new ThreadedRunnableAllowShutdown(command, executor(executor)); - } try { - scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS); + schedule(command, delay, executor); } catch (EsRejectedExecutionException e) { if (e.isExecutorShutdown()) { logger.debug(new ParameterizedMessage("could not schedule execution of [{}] after [{}] on [{}] as executor is shut down", @@ -504,7 +501,16 @@ class ThreadedRunnable implements Runnable { @Override public void run() { - executor.execute(runnable); + try { + executor.execute(runnable); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + logger.debug(new ParameterizedMessage("could not schedule execution of [{}] on [{}] as executor is shut down", + runnable, executor), e); + } else { + throw e; + } + } } @Override @@ -523,36 +529,6 @@ public String toString() { } } - class ThreadedRunnableAllowShutdown implements Runnable { - private final Runnable runnable; - - private final Executor executor; - - ThreadedRunnableAllowShutdown(Runnable runnable, Executor executor) { - this.runnable = runnable; - this.executor = executor; - } - - @Override - public void run() { - try { - executor.execute(runnable); - } catch (EsRejectedExecutionException e) { - if (e.isExecutorShutdown()) { - logger.debug(new ParameterizedMessage("could not schedule execution of [{}] on [{}] as executor is shut down", - runnable, executor), e); - } else { - throw e; - } - } - } - - @Override - public String toString() { - return "[threaded-allow-shutdown]" + runnable.toString(); - } - } - /** * A thread to cache millisecond time values from * {@link System#nanoTime()} and {@link System#currentTimeMillis()}. diff --git a/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java b/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java index ceea4c0fc8680..e400292873411 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java @@ -215,10 +215,5 @@ public ScheduledCancellable schedule(Runnable task, TimeValue delay, String exec scheduledTasks.add(new Tuple<>(delay, task)); return null; } - - @Override - public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) { - schedule(command, delay, executor); - } } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 1227e678181b3..f14afe3c36534 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -388,12 +388,6 @@ public boolean isCancelled() { }; } - @Override - public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) { - // reject on shutdown not implemented for deterministic task queue, so this is equivalent to schedule here. - schedule(command, delay, executor); - } - @Override public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) { return super.scheduleWithFixedDelay(command, interval, executor);