Skip to content

Commit

Permalink
Merge pull request #159 from Allan-QLB/master
Browse files Browse the repository at this point in the history
Add error log when KafkaConsumerThread terminate unexpectedly
  • Loading branch information
AnuGayan authored Mar 2, 2023
2 parents 722f805 + 0b594af commit 8eb2d4a
Showing 1 changed file with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -115,6 +116,14 @@ void run() {
try {
for (KafkaConsumerThread consumerThread : kafkaConsumerThreadList) {
futureList.add(executorService.submit(consumerThread));
futureList.add(CompletableFuture.runAsync(consumerThread, executorService).whenComplete(
(ignored, throwable) -> {
if (throwable != null) {
LOG.error("KafkaConsumerThread for topic(s):{} terminated unexpectedly!",
Arrays.toString(topics), throwable);
}
}
));
}
} catch (Throwable t) {
LOG.error("Error while creating KafkaConsumerThread for topic(s): " + Arrays.toString(topics), t);
Expand Down

0 comments on commit 8eb2d4a

Please sign in to comment.