Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
ConsumerRecord Logging Metadata Option
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell authored and artembilan committed Jul 13, 2020
1 parent a669555 commit 2ae732d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ ext {
jacksonVersion = '2.11.0'
junitJupiterVersion = '5.6.2'
log4jVersion = '2.13.2'
springIntegrationVersion = '5.3.0.RELEASE'
springKafkaVersion = '2.5.0.RELEASE'
springIntegrationVersion = '5.3.1.RELEASE'
springKafkaVersion = '2.5.4.BUILD-SNAPSHOT'

idPrefix = 'kafka'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.listener.LoggingCommitCallback;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
Expand Down Expand Up @@ -667,6 +668,8 @@ public static class KafkaAckCallback<K, V> implements AcknowledgmentCallback, Ac

private final boolean isSyncCommits;

private final boolean logOnlyMetadata;

private volatile boolean acknowledged;

private boolean autoAckEnabled = true;
Expand All @@ -690,6 +693,7 @@ public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo, @Nullable ConsumerProperties
consumerProperties != null
? consumerProperties.getCommitLogLevel()
: LogIfLevelEnabled.Level.DEBUG);
this.logOnlyMetadata = consumerProperties.isOnlyLogRecordMetadata();
}

@Override
Expand Down Expand Up @@ -739,7 +743,8 @@ private void rollback(ConsumerRecord<K, V> record) {
})
.collect(Collectors.toList());
if (rewound.size() > 0 && this.logger.isWarnEnabled()) {
this.logger.warn("Rolled back " + record + " later in-flight offsets "
this.logger.warn("Rolled back " + ListenerUtils.recordToString(record, this.logOnlyMetadata)
+ " later in-flight offsets "
+ rewound + " will also be re-fetched");
}
}
Expand All @@ -749,7 +754,8 @@ private void rollback(ConsumerRecord<K, V> record) {
private void commitIfPossible(ConsumerRecord<K, V> record) {
if (this.ackInfo.isRolledBack()) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Cannot commit offset for " + record
this.logger.warn("Cannot commit offset for "
+ ListenerUtils.recordToString(record, this.logOnlyMetadata)
+ "; an earlier offset was rolled back");
}
}
Expand All @@ -773,13 +779,17 @@ private void commitIfPossible(ConsumerRecord<K, V> record) {
if (toCommit.size() > 0) {
ackInformation = toCommit.get(toCommit.size() - 1);
KafkaAckInfo<K, V> ackInformationToLog = ackInformation;
this.commitLogger.log(() -> "Committing pending offsets for " + record
+ " and all deferred to " + ackInformationToLog.getRecord());
this.commitLogger.log(() -> "Committing pending offsets for "
+ ListenerUtils.recordToString(record, this.logOnlyMetadata)
+ " and all deferred to "
+ ListenerUtils.recordToString(ackInformationToLog.getRecord(),
this.logOnlyMetadata));
candidates.removeAll(toCommit);
}
else {
ackInformation = this.ackInfo;
this.commitLogger.log(() -> "Committing offset for " + record);
this.commitLogger.log(() -> "Committing offset for "
+ ListenerUtils.recordToString(record, this.logOnlyMetadata));
}
}
else { // earlier offsets present
Expand Down

0 comments on commit 2ae732d

Please sign in to comment.