diff --git a/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java b/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java index 80ce59df..602bdfa1 100644 --- a/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java +++ b/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -115,6 +116,14 @@ void run() { try { for (KafkaConsumerThread consumerThread : kafkaConsumerThreadList) { futureList.add(executorService.submit(consumerThread)); + futureList.add(CompletableFuture.runAsync(consumerThread, executorService).whenComplete( + (ignored, throwable) -> { + if (throwable != null) { + LOG.error("KafkaConsumerThread for topic(s):{} terminated unexpectedly!", + Arrays.toString(topics), throwable); + } + } + )); } } catch (Throwable t) { LOG.error("Error while creating KafkaConsumerThread for topic(s): " + Arrays.toString(topics), t);