Skip to content

Commit

Permalink
Fix configuration of ReadFromKafkaViaSDF which was always enabling re…
Browse files Browse the repository at this point in the history
…distribute and allowing duplicates instead of basing it on configuration. (#32134)

* Fix configuration of ReadFromKafkaViaSDF which was always enabling redistribute and allowing duplicates instead of basing it on configuration.
* update known issues
  • Loading branch information
scwhittle authored Aug 15, 2024
1 parent 1eb416e commit a2f5ee2
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@

* Large Dataflow graphs using runner v2, or pipelines explicitly enabling the `upload_graph` experiment, will fail at construction time ([#32159](https://github.com/apache/beam/issues/32159)).
* Python pipelines that run with 2.53.0-2.58.0 SDKs and read data from GCS might be affected by a data corruption issue ([#32169](https://github.com/apache/beam/issues/32169)). The issue will be fixed in 2.59.0 ([#32135](https://github.com/apache/beam/pull/32135)). To work around this, update the google-cloud-storage package to version 2.18.2 or newer.
* [KafkaIO] Records read with `ReadFromKafkaViaSDF` are redistributed and may contain duplicates regardless of the configuration. This affects Java pipelines with Dataflow v2 runner and xlang pipelines reading from Kafka, ([#32196](https://github.com/apache/beam/issues/32196))

# [2.57.0] - 2024-06-26

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1730,8 +1730,6 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
.withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider())
.withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider())
.withManualWatermarkEstimator()
.withRedistribute()
.withAllowDuplicates() // must be set with withRedistribute option.
.withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory())
.withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn())
.withConsumerPollingTimeout(kafkaRead.getConsumerPollingTimeout());
Expand Down
1 change: 1 addition & 0 deletions website/www/site/content/en/blog/beam-2.58.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ For more information about changes in 2.58.0, check out the [detailed release no
## Known Issues

* Python pipelines that run with 2.53.0-2.58.0 SDKs and read data from GCS might be affected by a data corruption issue ([#32169](https://github.com/apache/beam/issues/32169)). The issue will be fixed in 2.59.0 ([#32135](https://github.com/apache/beam/pull/32135)). To work around this, update the google-cloud-storage package to version 2.18.2 or newer.
* [KafkaIO] Records read with `ReadFromKafkaViaSDF` are redistributed and may contain duplicates regardless of the configuration. This affects Java pipelines with Dataflow v2 runner and xlang pipelines reading from Kafka, ([#32196](https://github.com/apache/beam/issues/32196))

For the most up to date list of known issues, see https://github.com/apache/beam/blob/master/CHANGES.md

Expand Down

0 comments on commit a2f5ee2

Please sign in to comment.