Skip to content

Commit

Permalink
Feature/add error handling for bqio (#30081)
Browse files Browse the repository at this point in the history
* initial work on bqio dlq

* add test for exception handling on bq

* add tests

* add trigger file to force the postcommit

* wire error handling into storage write, update check to not log a warning if errorhandler is used but error output isn't consumed.

* wire batch loads with error handler

* wire in batch loads changes, update tests

* add BQ write with error handling test cases

* spotless

* add trigger files

* address comments, support error handling when direct read from table

* revert change to build.gradle

* address comments, improve splitting logic

* spotless
  • Loading branch information
johnjcasey authored Feb 21, 2024
1 parent b78a3e1 commit 6a8c27e
Show file tree
Hide file tree
Showing 22 changed files with 1,084 additions and 170 deletions.
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -1687,7 +1687,11 @@ public CompositeBehavior enterCompositeTransform(Node node) {
String rootBigQueryTransform = "";
if (transform.getClass().equals(StorageApiLoads.class)) {
StorageApiLoads<?, ?> storageLoads = (StorageApiLoads<?, ?>) transform;
failedTag = storageLoads.getFailedRowsTag();
// If the storage load is directing exceptions to an error handler, we don't need to
// warn for unconsumed rows
if (!storageLoads.usesErrorHandler()) {
failedTag = storageLoads.getFailedRowsTag();
}
// For storage API the transform that outputs failed rows is nested one layer below
// BigQueryIO.
rootBigQueryTransform = node.getEnclosingNode().getFullName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,13 @@ public static class ErrorSinkTransform
}
}
}

public static class EchoErrorTransform
extends PTransform<PCollection<BadRecord>, PCollection<BadRecord>> {

@Override
public PCollection<BadRecord> expand(PCollection<BadRecord> input) {
return input;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ class AvroRowWriter<AvroT, T> extends BigQueryRowWriter<T> {
}

@Override
public void write(T element) throws IOException {
public void write(T element) throws IOException, BigQueryRowSerializationException {
AvroWriteRequest<T> writeRequest = new AvroWriteRequest<>(element, schema);
writer.append(toAvroRecord.apply(writeRequest));
AvroT serializedRequest;
try {
serializedRequest = toAvroRecord.apply(writeRequest);
} catch (Exception e) {
throw new BigQueryRowSerializationException(e);
}
writer.append(serializedRequest);
}

public Schema getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -57,6 +58,9 @@
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
Expand All @@ -77,6 +81,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
Expand Down Expand Up @@ -161,6 +166,8 @@ class BatchLoads<DestinationT, ElementT>
private final RowWriterFactory<ElementT, DestinationT> rowWriterFactory;
private final @Nullable String kmsKey;
private final String tempDataset;
private final BadRecordRouter badRecordRouter;
private final ErrorHandler<BadRecord, ?> badRecordErrorHandler;
private Coder<TableDestination> tableDestinationCoder;

// The maximum number of times to retry failed load or copy jobs.
Expand All @@ -180,7 +187,9 @@ class BatchLoads<DestinationT, ElementT>
@Nullable String kmsKey,
boolean clusteringEnabled,
boolean useAvroLogicalTypes,
String tempDataset) {
String tempDataset,
BadRecordRouter badRecordRouter,
ErrorHandler<BadRecord, ?> badRecordErrorHandler) {
bigQueryServices = new BigQueryServicesImpl();
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
Expand All @@ -207,6 +216,8 @@ class BatchLoads<DestinationT, ElementT>
this.tempDataset = tempDataset;
this.tableDestinationCoder =
clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of();
this.badRecordRouter = badRecordRouter;
this.badRecordErrorHandler = badRecordErrorHandler;
}

void setSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
Expand Down Expand Up @@ -601,9 +612,13 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFil
unwrittedRecordsTag,
maxNumWritersPerBundle,
maxFileSize,
rowWriterFactory))
rowWriterFactory,
input.getCoder(),
badRecordRouter))
.withSideInputs(tempFilePrefix)
.withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
.withOutputTags(
writtenFilesTag,
TupleTagList.of(ImmutableList.of(unwrittedRecordsTag, BAD_RECORD_TAG))));
PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
writeBundlesTuple
.get(writtenFilesTag)
Expand All @@ -612,6 +627,8 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFil
writeBundlesTuple
.get(unwrittedRecordsTag)
.setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder));
badRecordErrorHandler.addErrorCollection(
writeBundlesTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));

// If the bundles contain too many output tables to be written inline to files (due to memory
// limits), any unwritten records will be spilled to the unwrittenRecordsTag PCollection.
Expand Down Expand Up @@ -680,62 +697,92 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFil
// parallelize properly. We also ensure that the files are written if a threshold number of
// records are ready. Dynamic sharding is achieved via the withShardedKey() option provided by
// GroupIntoBatches.
return input
.apply(
GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withByteSize(byteSize)
.withMaxBufferingDuration(maxBufferingDuration)
.withShardedKey())
.setCoder(
KvCoder.of(
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
IterableCoder.of(elementCoder)))
.apply(
"StripShardId",
MapElements.via(
new SimpleFunction<
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>,
KV<DestinationT, Iterable<ElementT>>>() {
@Override
public KV<DestinationT, Iterable<ElementT>> apply(
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>
input) {
return KV.of(input.getKey().getKey(), input.getValue());
}
}))
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
tempFilePrefix, maxFileSize, rowWriterFactory))
.withSideInputs(tempFilePrefix))
TupleTag<Result<DestinationT>> successfulResultsTag = new TupleTag<>();
PCollectionTuple writeResults =
input
.apply(
GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withByteSize(byteSize)
.withMaxBufferingDuration(maxBufferingDuration)
.withShardedKey())
.setCoder(
KvCoder.of(
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
IterableCoder.of(elementCoder)))
.apply(
"StripShardId",
MapElements.via(
new SimpleFunction<
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>,
KV<DestinationT, Iterable<ElementT>>>() {
@Override
public KV<DestinationT, Iterable<ElementT>> apply(
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>
input) {
return KV.of(input.getKey().getKey(), input.getValue());
}
}))
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
tempFilePrefix,
maxFileSize,
rowWriterFactory,
badRecordRouter,
successfulResultsTag,
elementCoder))
.withSideInputs(tempFilePrefix)
.withOutputTags(successfulResultsTag, TupleTagList.of(BAD_RECORD_TAG)));
badRecordErrorHandler.addErrorCollection(
writeResults.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));

return writeResults
.get(successfulResultsTag)
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}

private PCollection<Result<DestinationT>> writeShardedRecords(
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords,
PCollectionView<String> tempFilePrefix) {
return shardedRecords
.apply("GroupByDestination", GroupByKey.create())
.apply(
"StripShardId",
MapElements.via(
new SimpleFunction<
KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
KV<DestinationT, Iterable<ElementT>>>() {
@Override
public KV<DestinationT, Iterable<ElementT>> apply(
KV<ShardedKey<DestinationT>, Iterable<ElementT>> input) {
return KV.of(input.getKey().getKey(), input.getValue());
}
}))
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<>(tempFilePrefix, maxFileSize, rowWriterFactory))
.withSideInputs(tempFilePrefix))
TupleTag<Result<DestinationT>> successfulResultsTag = new TupleTag<>();
PCollectionTuple writeResults =
shardedRecords
.apply("GroupByDestination", GroupByKey.create())
.apply(
"StripShardId",
MapElements.via(
new SimpleFunction<
KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
KV<DestinationT, Iterable<ElementT>>>() {
@Override
public KV<DestinationT, Iterable<ElementT>> apply(
KV<ShardedKey<DestinationT>, Iterable<ElementT>> input) {
return KV.of(input.getKey().getKey(), input.getValue());
}
}))
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<>(
tempFilePrefix,
maxFileSize,
rowWriterFactory,
badRecordRouter,
successfulResultsTag,
elementCoder))
.withSideInputs(tempFilePrefix)
.withOutputTags(successfulResultsTag, TupleTagList.of(BAD_RECORD_TAG)));

badRecordErrorHandler.addErrorCollection(
writeResults
.get(BAD_RECORD_TAG)
.setCoder(BadRecord.getCoder(shardedRecords.getPipeline())));

return writeResults
.get(successfulResultsTag)
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}

Expand Down
Loading

0 comments on commit 6a8c27e

Please sign in to comment.