From 85ac64fdb04b4a15c8652f3e06b29fd14f947a9b Mon Sep 17 00:00:00 2001 From: Jack Dingilian Date: Tue, 27 Jun 2023 16:54:19 -0400 Subject: [PATCH] Cleanup for GA launch of ReadChangeStream (#27249) This includes a series of small changes to release the connector: - Remove option for users to override heartbeat duration - Add option to skip creating metadata table and standalone utility to create metadata table - Add hard timeout to mutateRow requests as workaround for hanging requests - Remove excessive RCSP logging for debugging during preview - Increase read rows timeout to better accomodate large tables - Remove side effects from InitializeDoFn - Lower ReadChangeStream deadline to closer align with checkpoint duration - Add release note to CHANGES.md --- CHANGES.md | 1 + .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 133 ++++++++++++++---- .../action/ChangeStreamAction.java | 41 +----- .../ReadChangeStreamPartitionAction.java | 31 +--- .../dao/BigtableChangeStreamAccessor.java | 44 ++++-- .../changestreams/dao/ChangeStreamDao.java | 14 +- .../changestreams/dao/DaoFactory.java | 15 +- .../dao/MetadataTableAdminDao.java | 5 + .../changestreams/dao/MetadataTableDao.java | 47 ++++++- .../changestreams/dofn/InitializeDoFn.java | 35 +---- .../dofn/ReadChangeStreamPartitionDoFn.java | 11 +- .../action/ChangeStreamActionTest.java | 12 +- .../ReadChangeStreamPartitionActionTest.java | 33 +++-- .../dao/MetadataTableDaoTest.java | 31 ++++ .../dofn/InitializeDoFnTest.java | 71 +--------- .../ReadChangeStreamPartitionDoFnTest.java | 5 +- 16 files changed, 271 insertions(+), 258 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6c55e276f91fb..aee9b96d9f4a9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,6 +59,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Support for Bigtable Change Streams added in Java `BigtableIO.ReadChangeStream` ([#27183](https://github.com/apache/beam/issues/27183)) ## New Features / Improvements diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 319c31390217a..06497458a66a6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn; @@ -310,7 +311,6 @@ public static Write write() { * *