From 0b594af5b9f6ba301b2f3b5051a32c008f3dfc8d Mon Sep 17 00:00:00 2001 From: Allan-QLB Date: Sat, 25 Feb 2023 10:25:14 +0800 Subject: [PATCH] Add error log when KafkaConsumerThread terminate unexpectedly --- .../extension/io/kafka/source/ConsumerKafkaGroup.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java b/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java index 80ce59df..602bdfa1 100644 --- a/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java +++ b/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -115,6 +116,14 @@ void run() { try { for (KafkaConsumerThread consumerThread : kafkaConsumerThreadList) { futureList.add(executorService.submit(consumerThread)); + futureList.add(CompletableFuture.runAsync(consumerThread, executorService).whenComplete( + (ignored, throwable) -> { + if (throwable != null) { + LOG.error("KafkaConsumerThread for topic(s):{} terminated unexpectedly!", + Arrays.toString(topics), throwable); + } + } + )); } } catch (Throwable t) { LOG.error("Error while creating KafkaConsumerThread for topic(s): " + Arrays.toString(topics), t);