Skip to content

Commit

Permalink
Cleanup for GA launch of ReadChangeStream (apache#27249)
Browse files Browse the repository at this point in the history
This includes a series of small changes to release the connector:
- Remove option for users to override heartbeat duration
- Add option to skip creating metadata table and standalone utility to create metadata table
- Add hard timeout to mutateRow requests as workaround for hanging requests
- Remove excessive RCSP logging for debugging during preview
- Increase read rows timeout to better accomodate large tables
- Remove side effects from InitializeDoFn
- Lower ReadChangeStream deadline to closer align with checkpoint duration
- Add release note to CHANGES.md
  • Loading branch information
jackdingilian authored and bullet03 committed Aug 11, 2023
1 parent b20a8d8 commit 85ac64f
Show file tree
Hide file tree
Showing 16 changed files with 271 additions and 258 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Support for Bigtable Change Streams added in Java `BigtableIO.ReadChangeStream` ([#27183](https://github.com/apache/beam/issues/27183))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn;
Expand Down Expand Up @@ -310,7 +311,6 @@ public static Write write() {
*
* <ul>
* <li>{@link BigtableIO.ReadChangeStream#withStartTime} which defaults to now.
* <li>{@link BigtableIO.ReadChangeStream#withHeartbeatDuration} with defaults to 1 seconds.
* <li>{@link BigtableIO.ReadChangeStream#withMetadataTableProjectId} which defaults to value
* from {@link BigtableIO.ReadChangeStream#withProjectId}
* <li>{@link BigtableIO.ReadChangeStream#withMetadataTableInstanceId} which defaults to value
Expand Down Expand Up @@ -1797,8 +1797,6 @@ public enum ExistingPipelineOptions {
RESUME_OR_NEW,
// Same as RESUME_OR_NEW except if previous pipeline doesn't exist, don't start.
RESUME_OR_FAIL,
// Start a new pipeline. Overriding existing pipeline with the same name.
NEW,
// This skips cleaning up previous pipeline metadata and starts a new pipeline. This should
// only be used to skip cleanup in tests
@VisibleForTesting
Expand Down Expand Up @@ -1827,8 +1825,6 @@ static ReadChangeStream create() {

abstract @Nullable Instant getEndTime();

abstract @Nullable Duration getHeartbeatDuration();

abstract @Nullable String getChangeStreamName();

abstract @Nullable ExistingPipelineOptions getExistingPipelineOptions();
Expand All @@ -1837,6 +1833,8 @@ static ReadChangeStream create() {

abstract @Nullable String getMetadataTableId();

abstract @Nullable Boolean getCreateOrUpdateMetadataTable();

abstract ReadChangeStream.Builder toBuilder();

/**
Expand Down Expand Up @@ -1909,16 +1907,6 @@ ReadChangeStream withEndTime(Instant endTime) {
return toBuilder().setEndTime(endTime).build();
}

/**
* Returns a new {@link BigtableIO.ReadChangeStream} that will send heartbeat messages at
* specified interval.
*
* <p>Does not modify this object.
*/
public ReadChangeStream withHeartbeatDuration(Duration interval) {
return toBuilder().setHeartbeatDuration(interval).build();
}

/**
* Returns a new {@link BigtableIO.ReadChangeStream} that uses changeStreamName as prefix for
* the metadata table.
Expand Down Expand Up @@ -2000,6 +1988,19 @@ public ReadChangeStream withMetadataTableAppProfileId(String appProfileId) {
.build();
}

/**
* Returns a new {@link BigtableIO.ReadChangeStream} that, if set to true, will create or update
* metadata table before launching pipeline. Otherwise, it is expected that a metadata table
* with correct schema exists.
*
* <p>Optional: defaults to true
*
* <p>Does not modify this object.
*/
public ReadChangeStream withCreateOrUpdateMetadataTable(boolean shouldCreate) {
return toBuilder().setCreateOrUpdateMetadataTable(shouldCreate).build();
}

@Override
public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
checkArgument(
Expand Down Expand Up @@ -2040,10 +2041,6 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
if (startTime == null) {
startTime = Instant.now();
}
Duration heartbeatDuration = getHeartbeatDuration();
if (heartbeatDuration == null) {
heartbeatDuration = Duration.standardSeconds(1);
}
String changeStreamName = getChangeStreamName();
if (changeStreamName == null || changeStreamName.isEmpty()) {
changeStreamName = UniqueIdGenerator.generateRowKeyPrefix();
Expand All @@ -2053,21 +2050,55 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
existingPipelineOptions = ExistingPipelineOptions.FAIL_IF_EXISTS;
}

boolean shouldCreateOrUpdateMetadataTable = true;
if (getCreateOrUpdateMetadataTable() != null) {
shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
}

ActionFactory actionFactory = new ActionFactory();
ChangeStreamMetrics metrics = new ChangeStreamMetrics();
DaoFactory daoFactory =
new DaoFactory(
bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName);
ChangeStreamMetrics metrics = new ChangeStreamMetrics();

try {
MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao();
checkArgument(metadataTableAdminDao != null);
checkArgument(
metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(
metadataTableConfig.getAppProfileId().get()),
"App profile id '"
+ metadataTableConfig.getAppProfileId().get()
+ "' provided to access metadata table needs to use single-cluster routing policy"
+ " and allow single-row transactions.");

// Only try to create or update metadata table if option is set to true. Otherwise, just
// check if the table exists.
if (shouldCreateOrUpdateMetadataTable && metadataTableAdminDao.createMetadataTable()) {
LOG.info("Created metadata table: " + metadataTableAdminDao.getTableId());
}
checkArgument(
metadataTableAdminDao.doesMetadataTableExist(),
"Metadata table does not exist: " + metadataTableAdminDao.getTableId());

try (BigtableChangeStreamAccessor bigtableChangeStreamAccessor =
BigtableChangeStreamAccessor.getOrCreate(bigtableConfig)) {
checkArgument(
bigtableChangeStreamAccessor.getTableAdminClient().exists(getTableId()),
"Change Stream table does not exist");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
daoFactory.close();
}

InitializeDoFn initializeDoFn =
new InitializeDoFn(
daoFactory,
metadataTableConfig.getAppProfileId().get(),
startTime,
existingPipelineOptions);
new InitializeDoFn(daoFactory, startTime, existingPipelineOptions);
DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(getEndTime(), actionFactory, daoFactory, metrics);
ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, actionFactory, metrics);
new ReadChangeStreamPartitionDoFn(daoFactory, actionFactory, metrics);

PCollection<KV<ByteString, ChangeStreamRecord>> readChangeStreamOutput =
input
Expand Down Expand Up @@ -2101,14 +2132,60 @@ abstract ReadChangeStream.Builder setMetadataTableBigtableConfig(

abstract ReadChangeStream.Builder setEndTime(Instant endTime);

abstract ReadChangeStream.Builder setHeartbeatDuration(Duration interval);

abstract ReadChangeStream.Builder setChangeStreamName(String changeStreamName);

abstract ReadChangeStream.Builder setExistingPipelineOptions(
ExistingPipelineOptions existingPipelineOptions);

abstract ReadChangeStream.Builder setCreateOrUpdateMetadataTable(boolean shouldCreate);

abstract ReadChangeStream build();
}
}

/**
* Utility method to create or update Read Change Stream metadata table. This requires Bigtable
* table create permissions. This method is useful if the pipeline isn't granted permissions to
* create Bigtable tables. Run this method with correct permissions to create the metadata table,
* which is required to read Bigtable change streams. This method only needs to be run once, and
* the metadata table can be reused for all pipelines.
*
* @param projectId project id of the metadata table, usually the same as the project of the table
* being streamed
* @param instanceId instance id of the metadata table, usually the same as the instance of the
* table being streamed
* @param tableId name of the metadata table, leave it null or empty to use default.
* @return true if the table was successfully created. Otherwise, false.
*/
public static boolean createOrUpdateReadChangeStreamMetadataTable(
String projectId, String instanceId, @Nullable String tableId) throws IOException {
BigtableConfig bigtableConfig =
BigtableConfig.builder()
.setValidate(true)
.setProjectId(StaticValueProvider.of(projectId))
.setInstanceId(StaticValueProvider.of(instanceId))
.setAppProfileId(
StaticValueProvider.of(
"default")) // App profile is not used. It's only required for data API.
.build();

if (tableId == null || tableId.isEmpty()) {
tableId = MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME;
}

DaoFactory daoFactory = new DaoFactory(null, bigtableConfig, null, tableId, null);

try {
MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao();

// Only try to create or update metadata table if option is set to true. Otherwise, just
// check if the table exists.
if (metadataTableAdminDao.createMetadataTable()) {
LOG.info("Created metadata table: " + metadataTableAdminDao.getTableId());
}
return metadataTableAdminDao.doesMetadataTableExist();
} finally {
daoFactory.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.protobuf.ByteString;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator;
Expand Down Expand Up @@ -108,8 +107,7 @@ public Optional<DoFn.ProcessContinuation> run(
RestrictionTracker<StreamProgress, StreamProgress> tracker,
DoFn.OutputReceiver<KV<ByteString, ChangeStreamRecord>> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator,
BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> throughputEstimator,
boolean shouldDebug) {
BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> throughputEstimator) {
if (record instanceof Heartbeat) {
Heartbeat heartbeat = (Heartbeat) record;
final Instant watermark = toJodaTime(heartbeat.getEstimatedLowWatermark());
Expand All @@ -129,24 +127,11 @@ public Optional<DoFn.ProcessContinuation> run(
true);
watermarkEstimator.setWatermark(watermark);

if (shouldDebug) {
LOG.info(
"RCSP {}: Heartbeat partition: {} token: {} watermark: {}",
formatByteStringRange(partitionRecord.getPartition()),
formatByteStringRange(heartbeat.getChangeStreamContinuationToken().getPartition()),
heartbeat.getChangeStreamContinuationToken().getToken(),
heartbeat.getEstimatedLowWatermark());
}
// If the tracker fail to claim the streamProgress, it most likely means the runner initiated
// a checkpoint. See {@link
// org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
// for more information regarding runner initiated checkpoints.
if (!tracker.tryClaim(streamProgress)) {
if (shouldDebug) {
LOG.info(
"RCSP {}: Checkpoint heart beat tracker",
formatByteStringRange(partitionRecord.getPartition()));
}
return Optional.of(DoFn.ProcessContinuation.stop());
}
metrics.incHeartbeatCount();
Expand All @@ -163,30 +148,11 @@ public Optional<DoFn.ProcessContinuation> run(
CloseStream closeStream = (CloseStream) record;
StreamProgress streamProgress = new StreamProgress(closeStream);

if (shouldDebug) {
LOG.info(
"RCSP {}: CloseStream: {}",
formatByteStringRange(partitionRecord.getPartition()),
closeStream.getChangeStreamContinuationTokens().stream()
.map(
c ->
"{partition: "
+ formatByteStringRange(c.getPartition())
+ " token: "
+ c.getToken()
+ "}")
.collect(Collectors.joining(", ", "[", "]")));
}
// If the tracker fail to claim the streamProgress, it most likely means the runner initiated
// a checkpoint. See {@link
// org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
// for more information regarding runner initiated checkpoints.
if (!tracker.tryClaim(streamProgress)) {
if (shouldDebug) {
LOG.info(
"RCSP {}: Checkpoint close stream tracker",
formatByteStringRange(partitionRecord.getPartition()));
}
return Optional.of(DoFn.ProcessContinuation.stop());
}
metrics.incClosestreamCount();
Expand Down Expand Up @@ -217,11 +183,6 @@ public Optional<DoFn.ProcessContinuation> run(
// a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
// runner initiated checkpoints.
if (!tracker.tryClaim(streamProgress)) {
if (shouldDebug) {
LOG.info(
"RCSP {}: Checkpoint data change tracker",
formatByteStringRange(partitionRecord.getPartition()));
}
return Optional.of(DoFn.ProcessContinuation.stop());
}
if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,23 +138,8 @@ public ProcessContinuation run(
OutputReceiver<KV<ByteString, ChangeStreamRecord>> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator)
throws IOException {
// Watermark being delayed beyond 5 minutes signals a possible problem.
boolean shouldDebug =
watermarkEstimator.getState().plus(Duration.standardMinutes(5)).isBeforeNow();
BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> throughputEstimator =
new BytesThroughputEstimator<>(sizeEstimator, Instant.now());

if (shouldDebug) {
LOG.info(
"RCSP {}: Partition: "
+ partitionRecord
+ "\n Watermark: "
+ watermarkEstimator.getState()
+ "\n RestrictionTracker: "
+ tracker.currentRestriction(),
formatByteStringRange(partitionRecord.getPartition()));
}

// Lock the partition
if (tracker.currentRestriction().isEmpty()) {
boolean lockedPartition = metadataTableDao.lockAndRecordPartition(partitionRecord);
Expand Down Expand Up @@ -266,12 +251,10 @@ public ProcessContinuation run(
new NewPartition(
childPartition, Collections.singletonList(token), watermarkEstimator.getState()));
}
if (shouldDebug) {
LOG.info(
"RCSP {}: Split/Merge into {}",
formatByteStringRange(partitionRecord.getPartition()),
partitionsToString(childPartitions));
}
LOG.info(
"RCSP {}: Split/Merge into {}",
formatByteStringRange(partitionRecord.getPartition()),
partitionsToString(childPartitions));
if (!coverSameKeySpace(tokenPartitions, partitionRecord.getPartition())) {
LOG.warn(
"RCSP {}: CloseStream has tokens {} that don't cover the entire keyspace",
Expand Down Expand Up @@ -299,8 +282,7 @@ public ProcessContinuation run(
partitionRecord,
tracker.currentRestriction(),
partitionRecord.getEndTime(),
heartbeatDuration,
shouldDebug);
heartbeatDuration);
for (ChangeStreamRecord record : stream) {
Optional<ProcessContinuation> result =
changeStreamAction.run(
Expand All @@ -309,8 +291,7 @@ public ProcessContinuation run(
tracker,
receiver,
watermarkEstimator,
throughputEstimator,
shouldDebug);
throughputEstimator);
// changeStreamAction will usually return Optional.empty() except for when a checkpoint
// (either runner or pipeline initiated) is required.
if (result.isPresent()) {
Expand Down
Loading

0 comments on commit 85ac64f

Please sign in to comment.