Skip to content

Commit

Permalink
[#10467] Bump spring-kafka from 2.9.11 to 3.0.12
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Nov 7, 2023
1 parent d8aa8c6 commit 8d353a4
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Repository;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

/**
* @author intr3p1d
Expand All @@ -47,7 +47,7 @@ public class PinotExceptionTraceDao implements ExceptionTraceDao {

private final String topic;

private final ListenableFutureCallback<SendResult<String, ExceptionMetaDataEntity>> resultCallback
private final BiConsumer<SendResult<String, ExceptionMetaDataEntity>, Throwable> resultCallback
= KafkaCallbacks.loggingCallback("Kafka(ExceptionMetaDataEntity)", logger);


Expand All @@ -68,10 +68,10 @@ public void insert(List<ExceptionMetaData> exceptionMetaData) {

for (ExceptionMetaData e : exceptionMetaData) {
ExceptionMetaDataEntity dataEntity = mapper.toEntity(e);
ListenableFuture<SendResult<String, ExceptionMetaDataEntity>> response = this.kafkaExceptionMetaDataTemplate.send(
CompletableFuture<SendResult<String, ExceptionMetaDataEntity>> response = this.kafkaExceptionMetaDataTemplate.send(
topic, dataEntity
);
response.addCallback(resultCallback);
response.whenComplete(resultCallback);
}
}
}
2 changes: 1 addition & 1 deletion log/log-collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.4</version>
<version>${spring.kafka.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Repository;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

/**
* @author minwoo.jung
Expand All @@ -50,7 +50,7 @@ public class PinotMetricTagDao implements MetricTagDao {
private final TagListTypeHandler tagListTypeHandler = new TagListTypeHandler();
private final String topic;

private final ListenableFutureCallback<SendResult<String, MetricJsonTag>> resultCallback
private final BiConsumer<SendResult<String, MetricJsonTag>, Throwable> resultCallback
= KafkaCallbacks.loggingCallback("Kafka(MetricJsonTag)", logger);

public PinotMetricTagDao(SqlSessionTemplate sqlPinotSessionTemplate,
Expand All @@ -64,8 +64,8 @@ public PinotMetricTagDao(SqlSessionTemplate sqlPinotSessionTemplate,
@Override
public void insertMetricTag(MetricTag metricTag) {
MetricJsonTag metricJsonTag = MetricJsonTag.covertMetricJsonTag(tagListTypeHandler, metricTag);
ListenableFuture<SendResult<String, MetricJsonTag>> callBack = kafkaTagTemplate.send(topic, metricTag.getHostGroupName(), metricJsonTag);
callBack.addCallback(resultCallback);
CompletableFuture<SendResult<String, MetricJsonTag>> callBack = kafkaTagTemplate.send(topic, metricTag.getHostGroupName(), metricJsonTag);
callBack.whenComplete(resultCallback);
}

public static class MetricJsonTag {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Repository;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

/**
* @author minwoo.jung
Expand All @@ -47,7 +47,7 @@ public class PinotSystemMetricDataTypeDao implements SystemMetricDataTypeDao {
private final KafkaTemplate<String, MetricData> kafkaDataTypeTemplate;
private final String topic;

private final ListenableFutureCallback<SendResult<String, MetricData>> resultCallback
private final BiConsumer<SendResult<String, MetricData>, Throwable> resultCallback
= KafkaCallbacks.loggingCallback("Kafka(MetricData)", logger);

public PinotSystemMetricDataTypeDao(SqlSessionTemplate sqlPinotSessionTemplate,
Expand All @@ -71,7 +71,7 @@ public MetricData selectMetricDataType(MetricDataName metricDataName) {

@Override
public void updateMetricDataType(MetricData metricData) {
ListenableFuture<SendResult<String, MetricData>> callback = kafkaDataTypeTemplate.send(topic, metricData.getMetricName(), metricData);
callback.addCallback(resultCallback);
CompletableFuture<SendResult<String, MetricData>> callback = kafkaDataTypeTemplate.send(topic, metricData.getMetricName(), metricData);
callback.whenComplete(resultCallback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Repository;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

/**
* @author Hyunjoon Cho
Expand All @@ -44,7 +44,7 @@ public class PinotSystemMetricDoubleDao implements SystemMetricDao<DoubleMetric>

private final String topic;

private final ListenableFutureCallback<SendResult<String, SystemMetricView>> resultCallback
private final BiConsumer<SendResult<String, SystemMetricView>, Throwable> resultCallback
= KafkaCallbacks.loggingCallback("Kafka(SystemMetricView)", logger);

public PinotSystemMetricDoubleDao(KafkaTemplate<String, SystemMetricView> kafkaDoubleTemplate,
Expand All @@ -63,8 +63,8 @@ public void insert(String tenantId, String hostGroupName, String hostName, List<
for (DoubleMetric doubleMetric : systemMetrics) {
String kafkaKey = generateKafkaKey(doubleMetric);
SystemMetricView systemMetricView = new SystemMetricView(tenantId, hostGroupName, doubleMetric);
ListenableFuture<SendResult<String, SystemMetricView>> callback = this.kafkaDoubleTemplate.send(topic, kafkaKey, systemMetricView);
callback.addCallback(resultCallback);
CompletableFuture<SendResult<String, SystemMetricView>> callback = this.kafkaDoubleTemplate.send(topic, kafkaKey, systemMetricView);
callback.whenComplete(resultCallback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@

import org.apache.logging.log4j.Logger;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;

public final class KafkaCallbacks {
import java.util.function.BiConsumer;

public static <T> ListenableFutureCallback<SendResult<String, T>> loggingCallback(String name, Logger logger) {
return new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
logger.warn("{} onFailure:{}", name, ex.getMessage(), ex);
}
public final class KafkaCallbacks {

public static <T> BiConsumer<SendResult<String, T>, Throwable> loggingCallback(String name, Logger logger) {
return new BiConsumer<>() {
@Override
public void onSuccess(SendResult<String, T> result) {
if (logger.isDebugEnabled()) {
logger.debug("{} onSuccess:{}", name, result);
public void accept(SendResult<String, T> result, Throwable throwable) {
if (throwable != null) {
logger.warn("{} onFailure:{}", name, throwable.getMessage(), throwable);
} else {
if (logger.isDebugEnabled()) {
logger.debug("{} onSuccess:{}", name, result);
}
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@
<!-- <spring.security.version>5.8.1</spring.security.version>-->
<spring.security.version>6.1.4</spring.security.version>
<spring.boot2.version>2.7.13</spring.boot2.version>
<spring.kafka.version>2.9.11</spring.kafka.version>
<spring.kafka.version>3.0.12</spring.kafka.version>

<spring.version>${spring6.version}</spring.version>
<spring.boot.version>${spring.boot3.version}</spring.boot.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Repository;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

@Repository
public class PinotUriStatDao implements UriStatDao {
Expand All @@ -24,7 +24,7 @@ public class PinotUriStatDao implements UriStatDao {

private final String topic;

private final ListenableFutureCallback<SendResult<String, UriStat>> resultCallback
private final BiConsumer<SendResult<String, UriStat>, Throwable> resultCallback
= KafkaCallbacks.loggingCallback("Kafka(UriStat)", logger);

public PinotUriStatDao(@Qualifier("kafkaUriStatTemplate") KafkaTemplate<String, UriStat> kafkaUriStatTemplate,
Expand All @@ -38,8 +38,8 @@ public void insert(List<UriStat> data) {
Objects.requireNonNull(data);

for (UriStat uriStat : data) {
ListenableFuture<SendResult<String, UriStat>> response = this.kafkaUriStatTemplate.send(topic, uriStat.getApplicationName(), uriStat);
response.addCallback(resultCallback);
CompletableFuture<SendResult<String, UriStat>> response = this.kafkaUriStatTemplate.send(topic, uriStat.getApplicationName(), uriStat);
response.whenComplete(resultCallback);
}

}
Expand Down

0 comments on commit 8d353a4

Please sign in to comment.