From fd6ee0c1a29fa29fe5ab5c358d63cb2b043093d2 Mon Sep 17 00:00:00 2001 From: Jeremy Sears Date: Mon, 13 Nov 2017 10:13:19 -0500 Subject: [PATCH] Bubble Throwables when executing Runnables (#112) This change updates calls to ExecutorService.submit (mostly deamons), to call ExecutorService.execute instead. The submit variant returns a Future which will contain any errors that are thrown from the Runnable. However, nothing was checking these throwables for errors. If an error happens, for example an OutOfMemoryError or RuntimeException thrown from a custom AWSCredentialsProvider, the deamon thread will die without ever logging any error information. Changing this to use execute allows the Thread's UncaughtExceptionHandler or the default UncaughtExceptionHandler to properly handle the failure. At a minimum this allows the error to be logged. Some clients may wish to respond to an OutOfMemoryError by taking a more severe action, such as restaring the service. Given that failure of these deamon threads will likely wedge the KPL, some retry logic or a hard shutdown should probably be implemented in a subsequent commit. --- .../services/kinesis/producer/Daemon.java | 16 ++++++++-------- .../kinesis/producer/KinesisProducer.java | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/Daemon.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/Daemon.java index 8fe140e5..92e5ee9d 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/Daemon.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/Daemon.java @@ -125,7 +125,7 @@ public Daemon(String pathToExecutable, MessageHandler handler, String workingDir lenBuf.order(ByteOrder.BIG_ENDIAN); rcvBuf.order(ByteOrder.BIG_ENDIAN); - executor.submit(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { try{ @@ -285,7 +285,7 @@ private void returnMessage() { * from the child process. */ private void startLoops() { - executor.submit(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { while (!shutdown.get()) { @@ -294,7 +294,7 @@ public void run() { } }); - executor.submit(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { while (!shutdown.get()) { @@ -303,7 +303,7 @@ public void run() { } }); - executor.submit(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { while (!shutdown.get()) { @@ -312,7 +312,7 @@ public void run() { } }); - executor.submit(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { while (!shutdown.get()) { @@ -438,7 +438,7 @@ private void startChildProcess() throws IOException, InterruptedException { } - executor.submit(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { try { @@ -468,8 +468,8 @@ public void apply(Logger logger, String message) { } }); - executor.submit(stdOutReader); - executor.submit(stdErrReader); + executor.execute(stdOutReader); + executor.execute(stdErrReader); try { int code = process.waitFor(); fatalError("Child process exited with code " + code, code != 1); diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java index bce1ec49..69798591 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java @@ -131,7 +131,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { private class MessageHandler implements Daemon.MessageHandler { @Override public void onMessage(final Message m) { - callbackCompletionExecutor.submit(new Runnable() { + callbackCompletionExecutor.execute(new Runnable() { @Override public void run() { if (m.hasPutRecordResult()) { @@ -154,7 +154,7 @@ public void onError(final Throwable t) { // Fail all outstanding futures for (final Map.Entry> entry : futures.entrySet()) { - callbackCompletionExecutor.submit(new Runnable() { + callbackCompletionExecutor.execute(new Runnable() { @Override public void run() { entry.getValue().setException(t);