Skip to content

Commit

Permalink
Bubble Throwables when executing Runnables (awslabs#112)
Browse files Browse the repository at this point in the history
This change updates calls to ExecutorService.submit (mostly deamons), to
call ExecutorService.execute instead.  The submit variant returns a
Future which will contain any errors that are thrown from the Runnable.
However, nothing was checking these throwables for errors.  If an error
happens, for example an OutOfMemoryError or RuntimeException thrown from
a custom AWSCredentialsProvider, the deamon thread will die without ever
logging any error information.  Changing this to use execute allows the
Thread's UncaughtExceptionHandler or the default
UncaughtExceptionHandler to properly handle the failure.  At a minimum
this allows the error to be logged.  Some clients may wish to respond to
an OutOfMemoryError by taking a more severe action, such as restaring
the service.  Given that failure of these deamon threads will likely
wedge the KPL, some retry logic or a hard shutdown should probably be
implemented in a subsequent commit.
  • Loading branch information
jeremysears authored and pfifer committed Nov 13, 2017
1 parent 03483c9 commit fd6ee0c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public Daemon(String pathToExecutable, MessageHandler handler, String workingDir
lenBuf.order(ByteOrder.BIG_ENDIAN);
rcvBuf.order(ByteOrder.BIG_ENDIAN);

executor.submit(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {
try{
Expand Down Expand Up @@ -285,7 +285,7 @@ private void returnMessage() {
* from the child process.
*/
private void startLoops() {
executor.submit(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {
while (!shutdown.get()) {
Expand All @@ -294,7 +294,7 @@ public void run() {
}
});

executor.submit(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {
while (!shutdown.get()) {
Expand All @@ -303,7 +303,7 @@ public void run() {
}
});

executor.submit(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {
while (!shutdown.get()) {
Expand All @@ -312,7 +312,7 @@ public void run() {
}
});

executor.submit(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {
while (!shutdown.get()) {
Expand Down Expand Up @@ -438,7 +438,7 @@ private void startChildProcess() throws IOException, InterruptedException {
}


executor.submit(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Expand Down Expand Up @@ -468,8 +468,8 @@ public void apply(Logger logger, String message) {
}
});

executor.submit(stdOutReader);
executor.submit(stdErrReader);
executor.execute(stdOutReader);
executor.execute(stdErrReader);
try {
int code = process.waitFor();
fatalError("Child process exited with code " + code, code != 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
private class MessageHandler implements Daemon.MessageHandler {
@Override
public void onMessage(final Message m) {
callbackCompletionExecutor.submit(new Runnable() {
callbackCompletionExecutor.execute(new Runnable() {
@Override
public void run() {
if (m.hasPutRecordResult()) {
Expand All @@ -154,7 +154,7 @@ public void onError(final Throwable t) {

// Fail all outstanding futures
for (final Map.Entry<Long, SettableFuture<?>> entry : futures.entrySet()) {
callbackCompletionExecutor.submit(new Runnable() {
callbackCompletionExecutor.execute(new Runnable() {
@Override
public void run() {
entry.getValue().setException(t);
Expand Down

0 comments on commit fd6ee0c

Please sign in to comment.