From 8d353a43e8b247390803695fd9d226f9b31b3b57 Mon Sep 17 00:00:00 2001 From: emeroad Date: Tue, 7 Nov 2023 10:46:00 +0900 Subject: [PATCH] [#10467] Bump spring-kafka from 2.9.11 to 3.0.12 --- .../collector/dao/PinotExceptionTraceDao.java | 10 ++++----- log/log-collector/pom.xml | 2 +- .../dao/pinot/PinotMetricTagDao.java | 10 ++++----- .../pinot/PinotSystemMetricDataTypeDao.java | 10 ++++----- .../dao/pinot/PinotSystemMetricDoubleDao.java | 10 ++++----- .../pinot/kafka/util/KafkaCallbacks.java | 22 +++++++++---------- pom.xml | 2 +- .../collector/dao/PinotUriStatDao.java | 10 ++++----- 8 files changed, 38 insertions(+), 38 deletions(-) diff --git a/exceptiontrace/exceptiontrace-collector/src/main/java/com/navercorp/pinpoint/exceptiontrace/collector/dao/PinotExceptionTraceDao.java b/exceptiontrace/exceptiontrace-collector/src/main/java/com/navercorp/pinpoint/exceptiontrace/collector/dao/PinotExceptionTraceDao.java index b38fec720ad65..fe54917572386 100644 --- a/exceptiontrace/exceptiontrace-collector/src/main/java/com/navercorp/pinpoint/exceptiontrace/collector/dao/PinotExceptionTraceDao.java +++ b/exceptiontrace/exceptiontrace-collector/src/main/java/com/navercorp/pinpoint/exceptiontrace/collector/dao/PinotExceptionTraceDao.java @@ -28,11 +28,11 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Repository; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; /** * @author intr3p1d @@ -47,7 +47,7 @@ public class PinotExceptionTraceDao implements ExceptionTraceDao { private final String topic; - private final ListenableFutureCallback> resultCallback + private final BiConsumer, Throwable> resultCallback = KafkaCallbacks.loggingCallback("Kafka(ExceptionMetaDataEntity)", logger); @@ -68,10 +68,10 @@ public void insert(List exceptionMetaData) { for (ExceptionMetaData e : exceptionMetaData) { ExceptionMetaDataEntity dataEntity = mapper.toEntity(e); - ListenableFuture> response = this.kafkaExceptionMetaDataTemplate.send( + CompletableFuture> response = this.kafkaExceptionMetaDataTemplate.send( topic, dataEntity ); - response.addCallback(resultCallback); + response.whenComplete(resultCallback); } } } diff --git a/log/log-collector/pom.xml b/log/log-collector/pom.xml index faa08e42523ec..5014bb2522334 100644 --- a/log/log-collector/pom.xml +++ b/log/log-collector/pom.xml @@ -74,7 +74,7 @@ org.springframework.kafka spring-kafka - 2.9.4 + ${spring.kafka.version} diff --git a/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotMetricTagDao.java b/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotMetricTagDao.java index 31a094ab30797..7e30f06e6e489 100644 --- a/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotMetricTagDao.java +++ b/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotMetricTagDao.java @@ -30,11 +30,11 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Repository; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; /** * @author minwoo.jung @@ -50,7 +50,7 @@ public class PinotMetricTagDao implements MetricTagDao { private final TagListTypeHandler tagListTypeHandler = new TagListTypeHandler(); private final String topic; - private final ListenableFutureCallback> resultCallback + private final BiConsumer, Throwable> resultCallback = KafkaCallbacks.loggingCallback("Kafka(MetricJsonTag)", logger); public PinotMetricTagDao(SqlSessionTemplate sqlPinotSessionTemplate, @@ -64,8 +64,8 @@ public PinotMetricTagDao(SqlSessionTemplate sqlPinotSessionTemplate, @Override public void insertMetricTag(MetricTag metricTag) { MetricJsonTag metricJsonTag = MetricJsonTag.covertMetricJsonTag(tagListTypeHandler, metricTag); - ListenableFuture> callBack = kafkaTagTemplate.send(topic, metricTag.getHostGroupName(), metricJsonTag); - callBack.addCallback(resultCallback); + CompletableFuture> callBack = kafkaTagTemplate.send(topic, metricTag.getHostGroupName(), metricJsonTag); + callBack.whenComplete(resultCallback); } public static class MetricJsonTag { diff --git a/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotSystemMetricDataTypeDao.java b/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotSystemMetricDataTypeDao.java index a975dfba7da22..a7413a0726350 100644 --- a/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotSystemMetricDataTypeDao.java +++ b/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotSystemMetricDataTypeDao.java @@ -27,11 +27,11 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Repository; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; /** * @author minwoo.jung @@ -47,7 +47,7 @@ public class PinotSystemMetricDataTypeDao implements SystemMetricDataTypeDao { private final KafkaTemplate kafkaDataTypeTemplate; private final String topic; - private final ListenableFutureCallback> resultCallback + private final BiConsumer, Throwable> resultCallback = KafkaCallbacks.loggingCallback("Kafka(MetricData)", logger); public PinotSystemMetricDataTypeDao(SqlSessionTemplate sqlPinotSessionTemplate, @@ -71,7 +71,7 @@ public MetricData selectMetricDataType(MetricDataName metricDataName) { @Override public void updateMetricDataType(MetricData metricData) { - ListenableFuture> callback = kafkaDataTypeTemplate.send(topic, metricData.getMetricName(), metricData); - callback.addCallback(resultCallback); + CompletableFuture> callback = kafkaDataTypeTemplate.send(topic, metricData.getMetricName(), metricData); + callback.whenComplete(resultCallback); } } diff --git a/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotSystemMetricDoubleDao.java b/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotSystemMetricDoubleDao.java index 29f9bcff8c44b..3a5932f7eb1d1 100644 --- a/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotSystemMetricDoubleDao.java +++ b/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/dao/pinot/PinotSystemMetricDoubleDao.java @@ -26,11 +26,11 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Repository; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; /** * @author Hyunjoon Cho @@ -44,7 +44,7 @@ public class PinotSystemMetricDoubleDao implements SystemMetricDao private final String topic; - private final ListenableFutureCallback> resultCallback + private final BiConsumer, Throwable> resultCallback = KafkaCallbacks.loggingCallback("Kafka(SystemMetricView)", logger); public PinotSystemMetricDoubleDao(KafkaTemplate kafkaDoubleTemplate, @@ -63,8 +63,8 @@ public void insert(String tenantId, String hostGroupName, String hostName, List< for (DoubleMetric doubleMetric : systemMetrics) { String kafkaKey = generateKafkaKey(doubleMetric); SystemMetricView systemMetricView = new SystemMetricView(tenantId, hostGroupName, doubleMetric); - ListenableFuture> callback = this.kafkaDoubleTemplate.send(topic, kafkaKey, systemMetricView); - callback.addCallback(resultCallback); + CompletableFuture> callback = this.kafkaDoubleTemplate.send(topic, kafkaKey, systemMetricView); + callback.whenComplete(resultCallback); } } diff --git a/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/util/KafkaCallbacks.java b/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/util/KafkaCallbacks.java index f640b83357db1..e0a3d6d7aa57e 100644 --- a/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/util/KafkaCallbacks.java +++ b/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/util/KafkaCallbacks.java @@ -2,21 +2,21 @@ import org.apache.logging.log4j.Logger; import org.springframework.kafka.support.SendResult; -import org.springframework.util.concurrent.ListenableFutureCallback; -public final class KafkaCallbacks { +import java.util.function.BiConsumer; - public static ListenableFutureCallback> loggingCallback(String name, Logger logger) { - return new ListenableFutureCallback<>() { - @Override - public void onFailure(Throwable ex) { - logger.warn("{} onFailure:{}", name, ex.getMessage(), ex); - } +public final class KafkaCallbacks { + public static BiConsumer, Throwable> loggingCallback(String name, Logger logger) { + return new BiConsumer<>() { @Override - public void onSuccess(SendResult result) { - if (logger.isDebugEnabled()) { - logger.debug("{} onSuccess:{}", name, result); + public void accept(SendResult result, Throwable throwable) { + if (throwable != null) { + logger.warn("{} onFailure:{}", name, throwable.getMessage(), throwable); + } else { + if (logger.isDebugEnabled()) { + logger.debug("{} onSuccess:{}", name, result); + } } } }; diff --git a/pom.xml b/pom.xml index cb80e11016051..1278ff5ea3926 100644 --- a/pom.xml +++ b/pom.xml @@ -181,7 +181,7 @@ 6.1.4 2.7.13 - 2.9.11 + 3.0.12 ${spring6.version} ${spring.boot3.version} diff --git a/uristat/uristat-collector/src/main/java/com/navercorp/pinpoint/uristat/collector/dao/PinotUriStatDao.java b/uristat/uristat-collector/src/main/java/com/navercorp/pinpoint/uristat/collector/dao/PinotUriStatDao.java index 59aa32dafedee..dd713f30507fb 100644 --- a/uristat/uristat-collector/src/main/java/com/navercorp/pinpoint/uristat/collector/dao/PinotUriStatDao.java +++ b/uristat/uristat-collector/src/main/java/com/navercorp/pinpoint/uristat/collector/dao/PinotUriStatDao.java @@ -10,11 +10,11 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Repository; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; @Repository public class PinotUriStatDao implements UriStatDao { @@ -24,7 +24,7 @@ public class PinotUriStatDao implements UriStatDao { private final String topic; - private final ListenableFutureCallback> resultCallback + private final BiConsumer, Throwable> resultCallback = KafkaCallbacks.loggingCallback("Kafka(UriStat)", logger); public PinotUriStatDao(@Qualifier("kafkaUriStatTemplate") KafkaTemplate kafkaUriStatTemplate, @@ -38,8 +38,8 @@ public void insert(List data) { Objects.requireNonNull(data); for (UriStat uriStat : data) { - ListenableFuture> response = this.kafkaUriStatTemplate.send(topic, uriStat.getApplicationName(), uriStat); - response.addCallback(resultCallback); + CompletableFuture> response = this.kafkaUriStatTemplate.send(topic, uriStat.getApplicationName(), uriStat); + response.whenComplete(resultCallback); } }