From 682eaeff69d944dc1ed399db9e4ceeacdc72e710 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Tue, 12 Nov 2024 13:55:03 +0100 Subject: [PATCH] [KafkaIO] Fix potential data race in ReadFromKafkaDoFn.AverageRecordSize (#33073) * Add comments clarifying offets and record size calculation --- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 7c2064883488..add76c9682a0 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -27,6 +27,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; @@ -338,13 +340,18 @@ public WatermarkEstimator newWatermarkEstimator( public double getSize( @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange offsetRange) throws Exception { + // If present, estimates the record size to offset gap ratio. Compacted topics may hold less + // records than the estimated offset range due to record deletion within a partition. final LoadingCache avgRecordSize = Preconditions.checkStateNotNull(this.avgRecordSize); - double numRecords = + // The tracker estimates the offset range by subtracting the last claimed position from the + // currently observed end offset for the partition belonging to this split. + double estimatedOffsetRange = restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining(); // Before processing elements, we don't have a good estimated size of records and offset gap. + // Return the estimated offset range without scaling by a size to gap ratio. if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) { - return numRecords; + return estimatedOffsetRange; } if (offsetEstimatorCache != null) { for (Map.Entry tp : @@ -353,7 +360,12 @@ public double getSize( } } - return avgRecordSize.get(kafkaSourceDescriptor.getTopicPartition()).getTotalSize(numRecords); + // When processing elements, a moving average estimates the size of records and offset gap. + // Return the estimated offset range scaled by the estimated size to gap ratio. + return estimatedOffsetRange + * avgRecordSize + .get(kafkaSourceDescriptor.getTopicPartition()) + .estimateRecordByteSizeToOffsetCountRatio(); } @NewTracker @@ -665,8 +677,15 @@ private Map overrideBootstrapServersConfig( return config; } + // TODO: Collapse the two moving average trackers into a single accumulator using a single Guava + // AtomicDouble. Note that this requires that a single thread will call update and that while get + // may be called by multiple threads the method must only load the accumulator itself. + @ThreadSafe private static class AverageRecordSize { + @GuardedBy("this") private MovingAvg avgRecordSize; + + @GuardedBy("this") private MovingAvg avgRecordGap; public AverageRecordSize() { @@ -674,13 +693,26 @@ public AverageRecordSize() { this.avgRecordGap = new MovingAvg(); } - public void update(int recordSize, long gap) { + public synchronized void update(int recordSize, long gap) { avgRecordSize.update(recordSize); avgRecordGap.update(gap); } - public double getTotalSize(double numRecords) { - return avgRecordSize.get() * numRecords / (1 + avgRecordGap.get()); + public double estimateRecordByteSizeToOffsetCountRatio() { + double avgRecordSize; + double avgRecordGap; + + synchronized (this) { + avgRecordSize = this.avgRecordSize.get(); + avgRecordGap = this.avgRecordGap.get(); + } + + // The offset increases between records in a batch fetched from a compacted topic may be + // greater than 1. Compacted topics only store records with the greatest offset per key per + // partition, the records in between are deleted and will not be observed by a consumer. + // The observed gap between offsets is used to estimate the number of records that are likely + // to be observed for the provided number of records. + return avgRecordSize / (1 + avgRecordGap); } }