diff --git a/CHANGES.md b/CHANGES.md index 6c55e276f91f..aee9b96d9f4a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,6 +59,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Support for Bigtable Change Streams added in Java `BigtableIO.ReadChangeStream` ([#27183](https://github.com/apache/beam/issues/27183)) ## New Features / Improvements diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 319c31390217..06497458a66a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn; @@ -310,7 +311,6 @@ public static Write write() { * *
Does not modify this object. - */ - public ReadChangeStream withHeartbeatDuration(Duration interval) { - return toBuilder().setHeartbeatDuration(interval).build(); - } - /** * Returns a new {@link BigtableIO.ReadChangeStream} that uses changeStreamName as prefix for * the metadata table. @@ -2000,6 +1988,19 @@ public ReadChangeStream withMetadataTableAppProfileId(String appProfileId) { .build(); } + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that, if set to true, will create or update + * metadata table before launching pipeline. Otherwise, it is expected that a metadata table + * with correct schema exists. + * + *
Optional: defaults to true + * + *
Does not modify this object.
+ */
+ public ReadChangeStream withCreateOrUpdateMetadataTable(boolean shouldCreate) {
+ return toBuilder().setCreateOrUpdateMetadataTable(shouldCreate).build();
+ }
+
@Override
public PCollection