diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index a454945c3d014..138aa22ff473e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1687,7 +1687,11 @@ public CompositeBehavior enterCompositeTransform(Node node) { String rootBigQueryTransform = ""; if (transform.getClass().equals(StorageApiLoads.class)) { StorageApiLoads storageLoads = (StorageApiLoads) transform; - failedTag = storageLoads.getFailedRowsTag(); + // If the storage load is directing exceptions to an error handler, we don't need to + // warn for unconsumed rows + if (!storageLoads.usesErrorHandler()) { + failedTag = storageLoads.getFailedRowsTag(); + } // For storage API the transform that outputs failed rows is nested one layer below // BigQueryIO. rootBigQueryTransform = node.getEnclosingNode().getFullName(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java index 41367765b9208..b4db4867cfc7b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java @@ -45,4 +45,13 @@ public static class ErrorSinkTransform } } } + + public static class EchoErrorTransform + extends PTransform, PCollection> { + + @Override + public PCollection expand(PCollection input) { + return input; + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java index 0b64a65c4503e..1f45371b19ff9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java @@ -47,9 +47,15 @@ class AvroRowWriter extends BigQueryRowWriter { } @Override - public void write(T element) throws IOException { + public void write(T element) throws IOException, BigQueryRowSerializationException { AvroWriteRequest writeRequest = new AvroWriteRequest<>(element, schema); - writer.append(toAvroRecord.apply(writeRequest)); + AvroT serializedRequest; + try { + serializedRequest = toAvroRecord.apply(writeRequest); + } catch (Exception e) { + throw new BigQueryRowSerializationException(e); + } + writer.append(serializedRequest); } public Schema getSchema() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 23acd8e01f7fe..56bd14318be44 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -57,6 +58,9 @@ import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.transforms.windowing.AfterFirst; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; @@ -77,6 +81,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -161,6 +166,8 @@ class BatchLoads private final RowWriterFactory rowWriterFactory; private final @Nullable String kmsKey; private final String tempDataset; + private final BadRecordRouter badRecordRouter; + private final ErrorHandler badRecordErrorHandler; private Coder tableDestinationCoder; // The maximum number of times to retry failed load or copy jobs. @@ -180,7 +187,9 @@ class BatchLoads @Nullable String kmsKey, boolean clusteringEnabled, boolean useAvroLogicalTypes, - String tempDataset) { + String tempDataset, + BadRecordRouter badRecordRouter, + ErrorHandler badRecordErrorHandler) { bigQueryServices = new BigQueryServicesImpl(); this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; @@ -207,6 +216,8 @@ class BatchLoads this.tempDataset = tempDataset; this.tableDestinationCoder = clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of(); + this.badRecordRouter = badRecordRouter; + this.badRecordErrorHandler = badRecordErrorHandler; } void setSchemaUpdateOptions(Set schemaUpdateOptions) { @@ -601,9 +612,13 @@ PCollection> writeDynamicallyShardedFil unwrittedRecordsTag, maxNumWritersPerBundle, maxFileSize, - rowWriterFactory)) + rowWriterFactory, + input.getCoder(), + badRecordRouter)) .withSideInputs(tempFilePrefix) - .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag))); + .withOutputTags( + writtenFilesTag, + TupleTagList.of(ImmutableList.of(unwrittedRecordsTag, BAD_RECORD_TAG)))); PCollection> writtenFiles = writeBundlesTuple .get(writtenFilesTag) @@ -612,6 +627,8 @@ PCollection> writeDynamicallyShardedFil writeBundlesTuple .get(unwrittedRecordsTag) .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder)); + badRecordErrorHandler.addErrorCollection( + writeBundlesTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()))); // If the bundles contain too many output tables to be written inline to files (due to memory // limits), any unwritten records will be spilled to the unwrittenRecordsTag PCollection. @@ -680,62 +697,92 @@ PCollection> writeDynamicallyShardedFil // parallelize properly. We also ensure that the files are written if a threshold number of // records are ready. Dynamic sharding is achieved via the withShardedKey() option provided by // GroupIntoBatches. - return input - .apply( - GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) - .withByteSize(byteSize) - .withMaxBufferingDuration(maxBufferingDuration) - .withShardedKey()) - .setCoder( - KvCoder.of( - org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder), - IterableCoder.of(elementCoder))) - .apply( - "StripShardId", - MapElements.via( - new SimpleFunction< - KV, Iterable>, - KV>>() { - @Override - public KV> apply( - KV, Iterable> - input) { - return KV.of(input.getKey().getKey(), input.getValue()); - } - })) - .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder))) - .apply( - "WriteGroupedRecords", - ParDo.of( - new WriteGroupedRecordsToFiles( - tempFilePrefix, maxFileSize, rowWriterFactory)) - .withSideInputs(tempFilePrefix)) + TupleTag> successfulResultsTag = new TupleTag<>(); + PCollectionTuple writeResults = + input + .apply( + GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) + .withByteSize(byteSize) + .withMaxBufferingDuration(maxBufferingDuration) + .withShardedKey()) + .setCoder( + KvCoder.of( + org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder), + IterableCoder.of(elementCoder))) + .apply( + "StripShardId", + MapElements.via( + new SimpleFunction< + KV, Iterable>, + KV>>() { + @Override + public KV> apply( + KV, Iterable> + input) { + return KV.of(input.getKey().getKey(), input.getValue()); + } + })) + .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder))) + .apply( + "WriteGroupedRecords", + ParDo.of( + new WriteGroupedRecordsToFiles( + tempFilePrefix, + maxFileSize, + rowWriterFactory, + badRecordRouter, + successfulResultsTag, + elementCoder)) + .withSideInputs(tempFilePrefix) + .withOutputTags(successfulResultsTag, TupleTagList.of(BAD_RECORD_TAG))); + badRecordErrorHandler.addErrorCollection( + writeResults.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()))); + + return writeResults + .get(successfulResultsTag) .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); } private PCollection> writeShardedRecords( PCollection, ElementT>> shardedRecords, PCollectionView tempFilePrefix) { - return shardedRecords - .apply("GroupByDestination", GroupByKey.create()) - .apply( - "StripShardId", - MapElements.via( - new SimpleFunction< - KV, Iterable>, - KV>>() { - @Override - public KV> apply( - KV, Iterable> input) { - return KV.of(input.getKey().getKey(), input.getValue()); - } - })) - .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder))) - .apply( - "WriteGroupedRecords", - ParDo.of( - new WriteGroupedRecordsToFiles<>(tempFilePrefix, maxFileSize, rowWriterFactory)) - .withSideInputs(tempFilePrefix)) + TupleTag> successfulResultsTag = new TupleTag<>(); + PCollectionTuple writeResults = + shardedRecords + .apply("GroupByDestination", GroupByKey.create()) + .apply( + "StripShardId", + MapElements.via( + new SimpleFunction< + KV, Iterable>, + KV>>() { + @Override + public KV> apply( + KV, Iterable> input) { + return KV.of(input.getKey().getKey(), input.getValue()); + } + })) + .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder))) + .apply( + "WriteGroupedRecords", + ParDo.of( + new WriteGroupedRecordsToFiles<>( + tempFilePrefix, + maxFileSize, + rowWriterFactory, + badRecordRouter, + successfulResultsTag, + elementCoder)) + .withSideInputs(tempFilePrefix) + .withOutputTags(successfulResultsTag, TupleTagList.of(BAD_RECORD_TAG))); + + badRecordErrorHandler.addErrorCollection( + writeResults + .get(BAD_RECORD_TAG) + .setCoder(BadRecord.getCoder(shardedRecords.getPipeline()))); + + return writeResults + .get(successfulResultsTag) .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index cd62c5810d81f..43c5af1631909 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -19,6 +19,8 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.RECORDING_ROUTER; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -74,13 +76,13 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.io.AvroSource; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.extensions.gcp.util.Transport; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResolveOptions; @@ -111,6 +113,9 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -120,6 +125,11 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.ThrowingBadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.DefaultErrorHandler; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -742,6 +752,8 @@ public static TypedRead read(SerializableFunction par .setUseAvroLogicalTypes(false) .setFormat(DataFormat.AVRO) .setProjectionPushdownApplied(false) + .setBadRecordErrorHandler(new DefaultErrorHandler<>()) + .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .build(); } @@ -770,6 +782,8 @@ public static TypedRead readWithDatumReader( .setUseAvroLogicalTypes(false) .setFormat(DataFormat.AVRO) .setProjectionPushdownApplied(false) + .setBadRecordErrorHandler(new DefaultErrorHandler<>()) + .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .build(); } @@ -985,6 +999,11 @@ abstract Builder setDatumReaderFactory( abstract Builder setUseAvroLogicalTypes(Boolean useAvroLogicalTypes); + abstract Builder setBadRecordErrorHandler( + ErrorHandler badRecordErrorHandler); + + abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter); + abstract Builder setProjectionPushdownApplied(boolean projectionPushdownApplied); } @@ -1033,6 +1052,10 @@ abstract Builder setDatumReaderFactory( abstract Boolean getUseAvroLogicalTypes(); + abstract ErrorHandler getBadRecordErrorHandler(); + + abstract BadRecordRouter getBadRecordRouter(); + abstract boolean getProjectionPushdownApplied(); /** @@ -1138,6 +1161,9 @@ public void validate(PipelineOptions options) { e); } } + checkArgument( + getBadRecordRouter().equals(BadRecordRouter.THROWING_ROUTER), + "BigQueryIO Read with Error Handling is only available when DIRECT_READ is used"); } ValueProvider table = getTableProvider(); @@ -1429,27 +1455,75 @@ private PCollection expandForDirectRead( ValueProvider tableProvider = getTableProvider(); Pipeline p = input.getPipeline(); if (tableProvider != null) { - // No job ID is required. Read directly from BigQuery storage. - PCollection rows = - p.apply( - org.apache.beam.sdk.io.Read.from( - BigQueryStorageTableSource.create( - tableProvider, - getFormat(), - getSelectedFields(), - getRowRestriction(), - getParseFn(), - outputCoder, - getBigQueryServices(), - getProjectionPushdownApplied()))); - if (beamSchema != null) { - rows.setSchema( - beamSchema, - getTypeDescriptor(), - getToBeamRowFn().apply(beamSchema), - getFromBeamRowFn().apply(beamSchema)); + // ThrowingBadRecordRouter is the default value, and is what is used if the user hasn't + // specified any particular error handling. + if (getBadRecordRouter() instanceof ThrowingBadRecordRouter) { + // No job ID is required. Read directly from BigQuery storage. + PCollection rows = + p.apply( + org.apache.beam.sdk.io.Read.from( + BigQueryStorageTableSource.create( + tableProvider, + getFormat(), + getSelectedFields(), + getRowRestriction(), + getParseFn(), + outputCoder, + getBigQueryServices(), + getProjectionPushdownApplied()))); + if (beamSchema != null) { + rows.setSchema( + beamSchema, + getTypeDescriptor(), + getToBeamRowFn().apply(beamSchema), + getFromBeamRowFn().apply(beamSchema)); + } + return rows; + } else { + // We need to manually execute the table source, so as to be able to capture exceptions + // to pipe to error handling + BigQueryStorageTableSource source = + BigQueryStorageTableSource.create( + tableProvider, + getFormat(), + getSelectedFields(), + getRowRestriction(), + getParseFn(), + outputCoder, + getBigQueryServices(), + getProjectionPushdownApplied()); + List> sources; + try { + // This splitting logic taken from the SDF implementation of Read + long estimatedSize = source.getEstimatedSizeBytes(bqOptions); + // Split into pieces as close to the default desired bundle size but if that would cause + // too few splits then prefer to split up to the default desired number of splits. + long desiredChunkSize; + if (estimatedSize <= 0) { + desiredChunkSize = 64 << 20; // 64mb + } else { + // 1mb --> 1 shard; 1gb --> 32 shards; 1tb --> 1000 shards, 1pb --> 32k shards + desiredChunkSize = Math.max(1 << 20, (long) (1000 * Math.sqrt(estimatedSize))); + } + sources = source.split(desiredChunkSize, bqOptions); + } catch (Exception e) { + throw new RuntimeException("Unable to split TableSource", e); + } + TupleTag rowTag = new TupleTag<>(); + PCollectionTuple resultTuple = + p.apply(Create.of(sources)) + .apply( + "Read Storage Table Source", + ParDo.of(new ReadTableSource(rowTag, getParseFn(), getBadRecordRouter())) + .withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG))); + getBadRecordErrorHandler() + .addErrorCollection( + resultTuple + .get(BAD_RECORD_TAG) + .setCoder(BadRecord.getCoder(input.getPipeline()))); + + return resultTuple.get(rowTag).setCoder(outputCoder); } - return rows; } checkArgument( @@ -1475,7 +1549,8 @@ private PCollection expandForDirectRead( PCollectionTuple tuple; PCollection rows; - if (!getWithTemplateCompatibility()) { + if (!getWithTemplateCompatibility() + && getBadRecordRouter() instanceof ThrowingBadRecordRouter) { // Create a singleton job ID token at pipeline construction time. String staticJobUuid = BigQueryHelpers.randomUUIDString(); jobIdTokenView = @@ -1588,6 +1663,52 @@ void cleanup(ContextContainer c) throws Exception { return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView)); } + private static class ReadTableSource extends DoFn, T> { + + private final TupleTag rowTag; + + private final SerializableFunction parseFn; + + private final BadRecordRouter badRecordRouter; + + public ReadTableSource( + TupleTag rowTag, + SerializableFunction parseFn, + BadRecordRouter badRecordRouter) { + this.rowTag = rowTag; + this.parseFn = parseFn; + this.badRecordRouter = badRecordRouter; + } + + @ProcessElement + public void processElement( + @Element BoundedSource boundedSource, + MultiOutputReceiver outputReceiver, + PipelineOptions options) + throws Exception { + ErrorHandlingParseFn errorHandlingParseFn = new ErrorHandlingParseFn(parseFn); + BoundedSource sourceWithErrorHandlingParseFn; + if (boundedSource instanceof BigQueryStorageStreamSource) { + sourceWithErrorHandlingParseFn = + ((BigQueryStorageStreamSource) boundedSource).fromExisting(errorHandlingParseFn); + } else if (boundedSource instanceof BigQueryStorageStreamBundleSource) { + sourceWithErrorHandlingParseFn = + ((BigQueryStorageStreamBundleSource) boundedSource) + .fromExisting(errorHandlingParseFn); + } else { + throw new RuntimeException( + "Bounded Source is not BigQueryStorageStreamSource or BigQueryStorageStreamBundleSource, unable to read"); + } + readSource( + options, + rowTag, + outputReceiver, + sourceWithErrorHandlingParseFn, + errorHandlingParseFn, + badRecordRouter); + } + } + private PCollectionTuple createTupleForDirectRead( PCollection jobIdTokenCollection, Coder outputCoder, @@ -1724,13 +1845,45 @@ public void processElement(ProcessContext c) throws Exception { return tuple; } + private static class ErrorHandlingParseFn + implements SerializableFunction { + private final SerializableFunction parseFn; + + private transient SchemaAndRecord schemaAndRecord = null; + + private ErrorHandlingParseFn(SerializableFunction parseFn) { + this.parseFn = parseFn; + } + + @Override + public T apply(SchemaAndRecord input) { + schemaAndRecord = input; + try { + return parseFn.apply(input); + } catch (Exception e) { + throw new ParseException(e); + } + } + + public SchemaAndRecord getSchemaAndRecord() { + return schemaAndRecord; + } + } + + private static class ParseException extends RuntimeException { + public ParseException(Exception e) { + super(e); + } + } + private PCollection createPCollectionForDirectRead( PCollectionTuple tuple, Coder outputCoder, TupleTag readStreamsTag, PCollectionView readSessionView, PCollectionView tableSchemaView) { - PCollection rows = + TupleTag rowTag = new TupleTag<>(); + PCollectionTuple resultTuple = tuple .get(readStreamsTag) .apply(Reshuffle.viaRandomKey()) @@ -1738,36 +1891,44 @@ private PCollection createPCollectionForDirectRead( ParDo.of( new DoFn() { @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement( + ProcessContext c, MultiOutputReceiver outputReceiver) + throws Exception { ReadSession readSession = c.sideInput(readSessionView); TableSchema tableSchema = BigQueryHelpers.fromJsonString( c.sideInput(tableSchemaView), TableSchema.class); ReadStream readStream = c.element(); + ErrorHandlingParseFn errorHandlingParseFn = + new ErrorHandlingParseFn(getParseFn()); + BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create( readSession, readStream, tableSchema, - getParseFn(), + errorHandlingParseFn, outputCoder, getBigQueryServices()); - // Read all of the data from the stream. In the event that this work - // item fails and is rescheduled, the same rows will be returned in - // the same order. - BoundedSource.BoundedReader reader = - streamSource.createReader(c.getPipelineOptions()); - for (boolean more = reader.start(); more; more = reader.advance()) { - c.output(reader.getCurrent()); - } + readSource( + c.getPipelineOptions(), + rowTag, + outputReceiver, + streamSource, + errorHandlingParseFn, + getBadRecordRouter()); } }) - .withSideInputs(readSessionView, tableSchemaView)) - .setCoder(outputCoder); + .withSideInputs(readSessionView, tableSchemaView) + .withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG))); - return rows; + getBadRecordErrorHandler() + .addErrorCollection( + resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(tuple.getPipeline()))); + + return resultTuple.get(rowTag).setCoder(outputCoder); } private PCollection createPCollectionForDirectReadWithStreamBundle( @@ -1776,7 +1937,8 @@ private PCollection createPCollectionForDirectReadWithStreamBundle( TupleTag> listReadStreamsTag, PCollectionView readSessionView, PCollectionView tableSchemaView) { - PCollection rows = + TupleTag rowTag = new TupleTag<>(); + PCollectionTuple resultTuple = tuple .get(listReadStreamsTag) .apply(Reshuffle.viaRandomKey()) @@ -1784,37 +1946,93 @@ private PCollection createPCollectionForDirectReadWithStreamBundle( ParDo.of( new DoFn, T>() { @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement( + ProcessContext c, MultiOutputReceiver outputReceiver) + throws Exception { ReadSession readSession = c.sideInput(readSessionView); TableSchema tableSchema = BigQueryHelpers.fromJsonString( c.sideInput(tableSchemaView), TableSchema.class); List streamBundle = c.element(); + ErrorHandlingParseFn errorHandlingParseFn = + new ErrorHandlingParseFn(getParseFn()); + BigQueryStorageStreamBundleSource streamSource = BigQueryStorageStreamBundleSource.create( readSession, streamBundle, tableSchema, - getParseFn(), + errorHandlingParseFn, outputCoder, getBigQueryServices(), 1L); - // Read all of the data from the stream. In the event that this work - // item fails and is rescheduled, the same rows will be returned in - // the same order. - BoundedReader reader = - streamSource.createReader(c.getPipelineOptions()); - for (boolean more = reader.start(); more; more = reader.advance()) { - c.output(reader.getCurrent()); - } + readSource( + c.getPipelineOptions(), + rowTag, + outputReceiver, + streamSource, + errorHandlingParseFn, + getBadRecordRouter()); } }) - .withSideInputs(readSessionView, tableSchemaView)) - .setCoder(outputCoder); + .withSideInputs(readSessionView, tableSchemaView) + .withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG))); - return rows; + getBadRecordErrorHandler() + .addErrorCollection( + resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(tuple.getPipeline()))); + + return resultTuple.get(rowTag).setCoder(outputCoder); + } + + public static void readSource( + PipelineOptions options, + TupleTag rowTag, + MultiOutputReceiver outputReceiver, + BoundedSource streamSource, + ErrorHandlingParseFn errorHandlingParseFn, + BadRecordRouter badRecordRouter) + throws Exception { + // Read all the data from the stream. In the event that this work + // item fails and is rescheduled, the same rows will be returned in + // the same order. + BoundedSource.BoundedReader reader = streamSource.createReader(options); + + try { + if (reader.start()) { + outputReceiver.get(rowTag).output(reader.getCurrent()); + } else { + return; + } + } catch (ParseException e) { + GenericRecord record = errorHandlingParseFn.getSchemaAndRecord().getRecord(); + badRecordRouter.route( + outputReceiver, + record, + AvroCoder.of(record.getSchema()), + (Exception) e.getCause(), + "Unable to parse record reading from BigQuery"); + } + + while (true) { + try { + if (reader.advance()) { + outputReceiver.get(rowTag).output(reader.getCurrent()); + } else { + return; + } + } catch (ParseException e) { + GenericRecord record = errorHandlingParseFn.getSchemaAndRecord().getRecord(); + badRecordRouter.route( + outputReceiver, + record, + AvroCoder.of(record.getSchema()), + (Exception) e.getCause(), + "Unable to parse record reading from BigQuery"); + } + } } @Override @@ -2014,6 +2232,13 @@ public TypedRead withRowRestriction(ValueProvider rowRestriction) { return toBuilder().setRowRestriction(rowRestriction).build(); } + public TypedRead withErrorHandler(ErrorHandler badRecordErrorHandler) { + return toBuilder() + .setBadRecordErrorHandler(badRecordErrorHandler) + .setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER) + .build(); + } + public TypedRead withTemplateCompatibility() { return toBuilder().setWithTemplateCompatibility(true).build(); } @@ -2151,6 +2376,8 @@ public static Write write() { .setDirectWriteProtos(true) .setDefaultMissingValueInterpretation( AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) + .setBadRecordErrorHandler(new DefaultErrorHandler<>()) + .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .build(); } @@ -2357,6 +2584,10 @@ public enum Method { abstract @Nullable SerializableFunction getRowMutationInformationFn(); + abstract ErrorHandler getBadRecordErrorHandler(); + + abstract BadRecordRouter getBadRecordRouter(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -2465,6 +2696,11 @@ abstract Builder setDeterministicRecordIdFn( abstract Builder setRowMutationInformationFn( SerializableFunction rowMutationFn); + abstract Builder setBadRecordErrorHandler( + ErrorHandler badRecordErrorHandler); + + abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter); + abstract Write build(); } @@ -3131,6 +3367,13 @@ public Write withWriteTempDataset(String writeTempDataset) { return toBuilder().setWriteTempDataset(writeTempDataset).build(); } + public Write withErrorHandler(ErrorHandler errorHandler) { + return toBuilder() + .setBadRecordErrorHandler(errorHandler) + .setBadRecordRouter(RECORDING_ROUTER) + .build(); + } + @Override public void validate(PipelineOptions pipelineOptions) { BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class); @@ -3538,6 +3781,9 @@ private WriteResult continueExpandTyped( checkArgument( !getPropagateSuccessfulStorageApiWrites(), "withPropagateSuccessfulStorageApiWrites only supported when using storage api writes."); + checkArgument( + getBadRecordRouter() instanceof ThrowingBadRecordRouter, + "Error Handling is not supported with STREAMING_INSERTS"); RowWriterFactory.TableRowWriterFactory tableRowWriterFactory = (RowWriterFactory.TableRowWriterFactory) rowWriterFactory; @@ -3572,6 +3818,10 @@ private WriteResult continueExpandTyped( checkArgument( !getPropagateSuccessfulStorageApiWrites(), "withPropagateSuccessfulStorageApiWrites only supported when using storage api writes."); + if (!(getBadRecordRouter() instanceof ThrowingBadRecordRouter)) { + LOG.warn( + "Error Handling is partially supported when using FILE_LOADS. Consider using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE"); + } // Batch load handles wrapped json string value differently than the other methods. Raise a // warning when applies. @@ -3610,7 +3860,9 @@ private WriteResult continueExpandTyped( getKmsKey(), getClustering() != null, getUseAvroLogicalTypes(), - getWriteTempDataset()); + getWriteTempDataset(), + getBadRecordRouter(), + getBadRecordErrorHandler()); batchLoads.setTestServices(getBigQueryServices()); if (getSchemaUpdateOptions() != null) { batchLoads.setSchemaUpdateOptions(getSchemaUpdateOptions()); @@ -3730,7 +3982,9 @@ private WriteResult continueExpandTyped( getIgnoreUnknownValues(), getPropagateSuccessfulStorageApiWrites(), getRowMutationInformationFn() != null, - getDefaultMissingValueInterpretation()); + getDefaultMissingValueInterpretation(), + getBadRecordRouter(), + getBadRecordErrorHandler()); return input.apply("StorageApiLoads", storageApiLoads); } else { throw new RuntimeException("Unexpected write method " + method); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index f659cc0668291..2fa5bdf25a102 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -56,6 +56,9 @@ import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; @@ -101,6 +104,8 @@ static class BigQueryIOReadTranslator implements TransformPayloadTranslator transform) { fieldValues.put("use_avro_logical_types", transform.getUseAvroLogicalTypes()); } fieldValues.put("projection_pushdown_applied", transform.getProjectionPushdownApplied()); + fieldValues.put("bad_record_router", toByteArray(transform.getBadRecordRouter())); + fieldValues.put( + "bad_record_error_handler", toByteArray(transform.getBadRecordErrorHandler())); return Row.withSchema(schema).withFieldValues(fieldValues).build(); } @@ -304,6 +312,11 @@ public TypedRead fromConfigRow(Row configRow, PipelineOptions options) { if (projectionPushdownApplied != null) { builder = builder.setProjectionPushdownApplied(projectionPushdownApplied); } + byte[] badRecordRouter = configRow.getBytes("bad_record_router"); + builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter)); + byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler"); + builder.setBadRecordErrorHandler( + (ErrorHandler) fromByteArray(badRecordErrorHandler)); return builder.build(); } catch (InvalidClassException e) { @@ -378,6 +391,8 @@ static class BigQueryIOWriteTranslator implements TransformPayloadTranslator transform) { fieldValues.put( "row_mutation_information_fn", toByteArray(transform.getRowMutationInformationFn())); } + fieldValues.put("bad_record_router", toByteArray(transform.getBadRecordRouter())); + fieldValues.put( + "bad_record_error_handler", toByteArray(transform.getBadRecordErrorHandler())); return Row.withSchema(schema).withFieldValues(fieldValues).build(); } @@ -822,6 +840,11 @@ public Write fromConfigRow(Row configRow, PipelineOptions options) { builder.setRowMutationInformationFn( (SerializableFunction) fromByteArray(rowMutationInformationFnBytes)); } + byte[] badRecordRouter = configRow.getBytes("bad_record_router"); + builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter)); + byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler"); + builder.setBadRecordErrorHandler( + (ErrorHandler) fromByteArray(badRecordErrorHandler)); return builder.build(); } catch (InvalidClassException e) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryRowWriter.java index a442144e16103..b846a06af580e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryRowWriter.java @@ -63,7 +63,7 @@ protected OutputStream getOutputStream() { return out; } - abstract void write(T value) throws Exception; + abstract void write(T value) throws IOException, BigQueryRowSerializationException; long getByteSize() { return out.getCount(); @@ -80,4 +80,11 @@ Result getResult() { checkState(isClosed, "Not yet closed"); return new Result(resourceId, out.getCount()); } + + public static class BigQueryRowSerializationException extends Exception { + + public BigQueryRowSerializationException(Exception e) { + super(e); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java index a2df86af1ee64..eeb747f21ea54 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java @@ -110,6 +110,18 @@ public BigQueryStorageStreamBundleSource fromExisting(List newStr getMinBundleSize()); } + public BigQueryStorageStreamBundleSource fromExisting( + SerializableFunction parseFn) { + return new BigQueryStorageStreamBundleSource<>( + readSession, + streamBundle, + jsonTableSchema, + parseFn, + outputCoder, + bqServices, + getMinBundleSize()); + } + private final ReadSession readSession; private final List streamBundle; private final String jsonTableSchema; @@ -334,10 +346,6 @@ private boolean readNextRecord() throws IOException { reader.processReadRowsResponse(response); } - SchemaAndRecord schemaAndRecord = new SchemaAndRecord(reader.readSingleRecord(), tableSchema); - - current = parseFn.apply(schemaAndRecord); - // Calculates the fraction of the current stream that has been consumed. This value is // calculated by interpolating between the fraction consumed value from the previous server // response (or zero if we're consuming the first response) and the fractional value in the @@ -355,6 +363,11 @@ private boolean readNextRecord() throws IOException { // progress made in the current Stream gives us the overall StreamBundle progress. fractionOfStreamBundleConsumed = (currentStreamBundleIndex + fractionOfCurrentStreamConsumed) / source.streamBundle.size(); + + SchemaAndRecord schemaAndRecord = new SchemaAndRecord(reader.readSingleRecord(), tableSchema); + + current = parseFn.apply(schemaAndRecord); + return true; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java index a4336cd48f944..8f7f50febaf41 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java @@ -82,6 +82,12 @@ public BigQueryStorageStreamSource fromExisting(ReadStream newReadStream) { readSession, newReadStream, jsonTableSchema, parseFn, outputCoder, bqServices); } + public BigQueryStorageStreamSource fromExisting( + SerializableFunction parseFn) { + return new BigQueryStorageStreamSource<>( + readSession, readStream, jsonTableSchema, parseFn, outputCoder, bqServices); + } + private final ReadSession readSession; private final ReadStream readStream; private final String jsonTableSchema; @@ -274,10 +280,6 @@ private synchronized boolean readNextRecord() throws IOException { reader.processReadRowsResponse(response); } - SchemaAndRecord schemaAndRecord = new SchemaAndRecord(reader.readSingleRecord(), tableSchema); - - current = parseFn.apply(schemaAndRecord); - // Updates the fraction consumed value. This value is calculated by interpolating between // the fraction consumed value from the previous server response (or zero if we're consuming // the first response) and the fractional value in the current response based on how many of @@ -291,6 +293,10 @@ private synchronized boolean readNextRecord() throws IOException { * 1.0 / totalRowsInCurrentResponse; + SchemaAndRecord schemaAndRecord = new SchemaAndRecord(reader.readSingleRecord(), tableSchema); + + current = parseFn.apply(schemaAndRecord); + return true; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java index 23fe0250b7d9e..aefdb79c535c8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; + import com.google.api.services.bigquery.model.TableRow; import java.io.IOException; import org.apache.beam.sdk.coders.Coder; @@ -27,12 +29,15 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -50,6 +55,7 @@ public class StorageApiConvertMessages private final Coder> successCoder; private final @Nullable SerializableFunction rowMutationFn; + private final BadRecordRouter badRecordRouter; public StorageApiConvertMessages( StorageApiDynamicDestinations dynamicDestinations, @@ -58,7 +64,8 @@ public StorageApiConvertMessages( TupleTag> successfulWritesTag, Coder errorCoder, Coder> successCoder, - @Nullable SerializableFunction rowMutationFn) { + @Nullable SerializableFunction rowMutationFn, + BadRecordRouter badRecordRouter) { this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; this.failedWritesTag = failedWritesTag; @@ -66,6 +73,7 @@ public StorageApiConvertMessages( this.errorCoder = errorCoder; this.successCoder = successCoder; this.rowMutationFn = rowMutationFn; + this.badRecordRouter = badRecordRouter; } @Override @@ -82,11 +90,16 @@ public PCollectionTuple expand(PCollection> input) { operationName, failedWritesTag, successfulWritesTag, - rowMutationFn)) - .withOutputTags(successfulWritesTag, TupleTagList.of(failedWritesTag)) + rowMutationFn, + badRecordRouter, + input.getCoder())) + .withOutputTags( + successfulWritesTag, + TupleTagList.of(ImmutableList.of(failedWritesTag, BAD_RECORD_TAG))) .withSideInputs(dynamicDestinations.getSideInputs())); result.get(successfulWritesTag).setCoder(successCoder); result.get(failedWritesTag).setCoder(errorCoder); + result.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())); return result; } @@ -98,6 +111,8 @@ public static class ConvertMessagesDoFn failedWritesTag; private final TupleTag> successfulWritesTag; private final @Nullable SerializableFunction rowMutationFn; + private final BadRecordRouter badRecordRouter; + Coder> elementCoder; private transient @Nullable DatasetService datasetServiceInternal = null; ConvertMessagesDoFn( @@ -106,13 +121,17 @@ public static class ConvertMessagesDoFn failedWritesTag, TupleTag> successfulWritesTag, - @Nullable SerializableFunction rowMutationFn) { + @Nullable SerializableFunction rowMutationFn, + BadRecordRouter badRecordRouter, + Coder> elementCoder) { this.dynamicDestinations = dynamicDestinations; this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); this.bqServices = bqServices; this.failedWritesTag = failedWritesTag; this.successfulWritesTag = successfulWritesTag; this.rowMutationFn = rowMutationFn; + this.badRecordRouter = badRecordRouter; + this.elementCoder = elementCoder; } private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { @@ -159,9 +178,19 @@ public void processElement( .toMessage(element.getValue(), rowMutationInformation) .withTimestamp(timestamp); o.get(successfulWritesTag).output(KV.of(element.getKey(), payload)); - } catch (TableRowToStorageApiProto.SchemaConversionException e) { - TableRow tableRow = messageConverter.toTableRow(element.getValue()); - o.get(failedWritesTag).output(new BigQueryStorageApiInsertError(tableRow, e.toString())); + } catch (TableRowToStorageApiProto.SchemaConversionException conversionException) { + TableRow tableRow; + try { + tableRow = messageConverter.toTableRow(element.getValue()); + } catch (Exception e) { + badRecordRouter.route(o, element, elementCoder, e, "Unable to convert value to TableRow"); + return; + } + o.get(failedWritesTag) + .output(new BigQueryStorageApiInsertError(tableRow, conversionException.toString())); + } catch (Exception e) { + badRecordRouter.route( + o, element, elementCoder, e, "Unable to convert value to StorageWriteApiPayload"); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index 0227b80201292..62174b5c917a8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -17,8 +17,11 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; + import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; @@ -32,6 +35,10 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.ThrowingBadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.ShardedKey; @@ -68,6 +75,10 @@ public class StorageApiLoads private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation; + private final BadRecordRouter badRecordRouter; + + private final ErrorHandler badRecordErrorHandler; + public StorageApiLoads( Coder destinationCoder, StorageApiDynamicDestinations dynamicDestinations, @@ -83,7 +94,9 @@ public StorageApiLoads( boolean ignoreUnknownValues, boolean propagateSuccessfulStorageApiWrites, boolean usesCdc, - AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) { + AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, + BadRecordRouter badRecordRouter, + ErrorHandler badRecordErrorHandler) { this.destinationCoder = destinationCoder; this.dynamicDestinations = dynamicDestinations; this.rowUpdateFn = rowUpdateFn; @@ -101,12 +114,18 @@ public StorageApiLoads( } this.usesCdc = usesCdc; this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; + this.badRecordRouter = badRecordRouter; + this.badRecordErrorHandler = badRecordErrorHandler; } public TupleTag getFailedRowsTag() { return failedRowsTag; } + public boolean usesErrorHandler() { + return !(badRecordRouter instanceof ThrowingBadRecordRouter); + } + @Override public WriteResult expand(PCollection> input) { Coder payloadCoder; @@ -143,7 +162,8 @@ public WriteResult expandInconsistent( successfulConvertedRowsTag, BigQueryStorageApiInsertErrorCoder.of(), successCoder, - rowUpdateFn)); + rowUpdateFn, + badRecordRouter)); PCollectionTuple writeRecordsResult = convertMessagesResult .get(successfulConvertedRowsTag) @@ -171,6 +191,9 @@ public WriteResult expandInconsistent( if (successfulWrittenRowsTag != null) { successfulWrittenRows = writeRecordsResult.get(successfulWrittenRowsTag); } + + addErrorCollections(convertMessagesResult, writeRecordsResult); + return WriteResult.in( input.getPipeline(), null, @@ -201,7 +224,8 @@ public WriteResult expandTriggered( successfulConvertedRowsTag, BigQueryStorageApiInsertErrorCoder.of(), successCoder, - rowUpdateFn)); + rowUpdateFn, + badRecordRouter)); PCollection, Iterable>> groupedRecords; @@ -261,6 +285,8 @@ public WriteResult expandTriggered( successfulWrittenRows = writeRecordsResult.get(successfulWrittenRowsTag); } + addErrorCollections(convertMessagesResult, writeRecordsResult); + return WriteResult.in( input.getPipeline(), null, @@ -319,7 +345,8 @@ public WriteResult expandUntriggered( successfulConvertedRowsTag, BigQueryStorageApiInsertErrorCoder.of(), successCoder, - rowUpdateFn)); + rowUpdateFn, + badRecordRouter)); PCollectionTuple writeRecordsResult = convertMessagesResult @@ -350,6 +377,8 @@ public WriteResult expandUntriggered( successfulWrittenRows = writeRecordsResult.get(successfulWrittenRowsTag); } + addErrorCollections(convertMessagesResult, writeRecordsResult); + return WriteResult.in( input.getPipeline(), null, @@ -362,4 +391,53 @@ public WriteResult expandUntriggered( successfulWrittenRowsTag, successfulWrittenRows); } + + private void addErrorCollections( + PCollectionTuple convertMessagesResult, PCollectionTuple writeRecordsResult) { + if (usesErrorHandler()) { + PCollection badRecords = + PCollectionList.of( + convertMessagesResult + .get(failedRowsTag) + .apply( + "ConvertMessageFailuresToBadRecord", + ParDo.of( + new ConvertInsertErrorToBadRecord( + "Failed to Convert to Storage API Message")))) + .and(convertMessagesResult.get(BAD_RECORD_TAG)) + .and( + writeRecordsResult + .get(failedRowsTag) + .apply( + "WriteRecordFailuresToBadRecord", + ParDo.of( + new ConvertInsertErrorToBadRecord( + "Failed to Write Message to Storage API")))) + .apply("flattenBadRecords", Flatten.pCollections()); + badRecordErrorHandler.addErrorCollection(badRecords); + } + } + + private static class ConvertInsertErrorToBadRecord + extends DoFn { + + private final String errorMessage; + + public ConvertInsertErrorToBadRecord(String errorMessage) { + this.errorMessage = errorMessage; + } + + @ProcessElement + public void processElement( + @Element BigQueryStorageApiInsertError bigQueryStorageApiInsertError, + OutputReceiver outputReceiver) + throws IOException { + outputReceiver.output( + BadRecord.fromExceptionInformation( + bigQueryStorageApiInsertError, + BigQueryStorageApiInsertErrorCoder.of(), + null, + errorMessage)); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java index 6cbeb61f624f9..4d5fb1b3d7465 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; import java.nio.charset.StandardCharsets; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -37,8 +38,13 @@ class TableRowWriter extends BigQueryRowWriter { } @Override - void write(T value) throws Exception { - TableRow tableRow = toRow.apply(value); + void write(T value) throws IOException, BigQueryRowSerializationException { + TableRow tableRow; + try { + tableRow = toRow.apply(value); + } catch (Exception e) { + throw new BigQueryRowSerializationException(e); + } CODER.encode(tableRow, getOutputStream(), Context.OUTER); getOutputStream().write(NEWLINE); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 894983ab664f6..9d84abbbbf1ae 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -32,8 +32,10 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryRowWriter.BigQueryRowSerializationException; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; @@ -69,6 +71,8 @@ class WriteBundlesToFiles private final int maxNumWritersPerBundle; private final long maxFileSize; private final RowWriterFactory rowWriterFactory; + private final Coder> coder; + private final BadRecordRouter badRecordRouter; private int spilledShardNumber; /** @@ -165,12 +169,16 @@ public void verifyDeterministic() {} TupleTag, ElementT>> unwrittenRecordsTag, int maxNumWritersPerBundle, long maxFileSize, - RowWriterFactory rowWriterFactory) { + RowWriterFactory rowWriterFactory, + Coder> coder, + BadRecordRouter badRecordRouter) { this.tempFilePrefixView = tempFilePrefixView; this.unwrittenRecordsTag = unwrittenRecordsTag; this.maxNumWritersPerBundle = maxNumWritersPerBundle; this.maxFileSize = maxFileSize; this.rowWriterFactory = rowWriterFactory; + this.coder = coder; + this.badRecordRouter = badRecordRouter; } @StartBundle @@ -197,7 +205,10 @@ BigQueryRowWriter createAndInsertWriter( @ProcessElement public void processElement( - ProcessContext c, @Element KV element, BoundedWindow window) + ProcessContext c, + @Element KV element, + BoundedWindow window, + MultiOutputReceiver outputReceiver) throws Exception { Map> writers = Preconditions.checkStateNotNull(this.writers); @@ -234,17 +245,32 @@ public void processElement( try { writer.write(element.getValue()); - } catch (Exception e) { - // Discard write result and close the write. + } catch (BigQueryRowSerializationException e) { try { - writer.close(); - // The writer does not need to be reset, as this DoFn cannot be reused. - } catch (Exception closeException) { - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); + badRecordRouter.route( + outputReceiver, + element, + coder, + e, + "Unable to Write BQ Record to File because serialization to TableRow failed"); + } catch (Exception e2) { + cleanupWriter(writer, e2); } - throw e; + } catch (Exception e) { + cleanupWriter(writer, e); + } + } + + private void cleanupWriter(BigQueryRowWriter writer, Exception e) throws Exception { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this DoFn cannot be reused. + } catch (Exception closeException) { + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); } + throw e; } @FinishBundle diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java index 236b07d74756b..3a4f377ce2b8d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java @@ -17,9 +17,14 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryRowWriter.BigQueryRowSerializationException; +import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; /** * Receives elements grouped by their destination, and writes them out to a file. Since all the @@ -31,21 +36,30 @@ class WriteGroupedRecordsToFiles private final PCollectionView tempFilePrefix; private final long maxFileSize; private final RowWriterFactory rowWriterFactory; + private final BadRecordRouter badRecordRouter; + private final TupleTag> successfulResultsTag; + private final Coder elementCoder; WriteGroupedRecordsToFiles( PCollectionView tempFilePrefix, long maxFileSize, - RowWriterFactory rowWriterFactory) { + RowWriterFactory rowWriterFactory, + BadRecordRouter badRecordRouter, + TupleTag> successfulResultsTag, + Coder elementCoder) { this.tempFilePrefix = tempFilePrefix; this.maxFileSize = maxFileSize; this.rowWriterFactory = rowWriterFactory; + this.badRecordRouter = badRecordRouter; + this.successfulResultsTag = successfulResultsTag; + this.elementCoder = elementCoder; } @ProcessElement public void processElement( ProcessContext c, @Element KV> element, - OutputReceiver> o) + MultiOutputReceiver outputReceiver) throws Exception { String tempFilePrefix = c.sideInput(this.tempFilePrefix); @@ -58,20 +72,29 @@ public void processElement( if (writer.getByteSize() > maxFileSize) { writer.close(); BigQueryRowWriter.Result result = writer.getResult(); - o.output( - new WriteBundlesToFiles.Result<>( - result.resourceId.toString(), result.byteSize, c.element().getKey())); + outputReceiver + .get(successfulResultsTag) + .output( + new WriteBundlesToFiles.Result<>( + result.resourceId.toString(), result.byteSize, c.element().getKey())); writer = rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey()); } - writer.write(tableRow); + try { + writer.write(tableRow); + } catch (BigQueryRowSerializationException e) { + badRecordRouter.route( + outputReceiver, tableRow, elementCoder, e, "Unable to Write BQ Record to File"); + } } } finally { writer.close(); } BigQueryRowWriter.Result result = writer.getResult(); - o.output( - new WriteBundlesToFiles.Result<>( - result.resourceId.toString(), result.byteSize, c.element().getKey())); + outputReceiver + .get(successfulResultsTag) + .output( + new WriteBundlesToFiles.Result<>( + result.resourceId.toString(), result.byteSize, c.element().getKey())); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java index d355d6bb93366..2b1c111269df9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions.BIGQUERY_EARLY_ROLLOUT_REGION; +import com.google.api.services.bigquery.model.TableRow; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; @@ -33,6 +34,9 @@ import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.junit.Test; @@ -107,4 +111,46 @@ public void testBigQueryStorageQuery1G() throws Exception { setUpTestEnvironment("1G"); runBigQueryIOStorageQueryPipeline(); } + + static class FailingTableRowParser implements SerializableFunction { + + public static final BigQueryIOStorageReadIT.FailingTableRowParser INSTANCE = + new BigQueryIOStorageReadIT.FailingTableRowParser(); + + private int parseCount = 0; + + @Override + public TableRow apply(SchemaAndRecord schemaAndRecord) { + parseCount++; + if (parseCount % 50 == 0) { + throw new RuntimeException("ExpectedException"); + } + return TableRowParser.INSTANCE.apply(schemaAndRecord); + } + } + + @Test + public void testBigQueryStorageQueryWithErrorHandling1M() throws Exception { + setUpTestEnvironment("1M"); + Pipeline p = Pipeline.create(options); + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + PCollection count = + p.apply( + "Read", + BigQueryIO.read(FailingTableRowParser.INSTANCE) + .fromQuery("SELECT * FROM `" + options.getInputTable() + "`") + .usingStandardSql() + .withMethod(Method.DIRECT_READ) + .withErrorHandler(errorHandler)) + .apply("Count", Count.globally()); + + errorHandler.close(); + + // When 1/50 elements fail sequentially, this is the expected success count + PAssert.thatSingleton(count).isEqualTo(10381L); + // this is the total elements, less the successful elements + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(10592L - 10381L); + p.run().waitUntilFinish(); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java index af6dd505b916f..0c5325286dd78 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java @@ -78,6 +78,9 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -769,18 +772,8 @@ public void testQuerySourceCreateReader() throws Exception { querySource.createReader(options); } - @Test - public void testReadFromBigQueryIO() throws Exception { - doReadFromBigQueryIO(false); - } - - @Test - public void testReadFromBigQueryIOWithTemplateCompatibility() throws Exception { - doReadFromBigQueryIO(true); - } - - private void doReadFromBigQueryIO(boolean templateCompatibility) throws Exception { - + public TypedRead> configureTypedRead( + SerializableFunction> parseFn) throws Exception { TableReference sourceTableRef = BigQueryHelpers.parseTableSpec("project:dataset.table"); fakeDatasetService.createDataset( @@ -840,15 +833,29 @@ private void doReadFromBigQueryIO(boolean templateCompatibility) throws Exceptio when(fakeStorageClient.readRows(expectedReadRowsRequest, "")) .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses)); - BigQueryIO.TypedRead> typedRead = - BigQueryIO.read(new ParseKeyValue()) - .fromQuery(encodedQuery) - .withMethod(Method.DIRECT_READ) - .withTestServices( - new FakeBigQueryServices() - .withDatasetService(fakeDatasetService) - .withJobService(fakeJobService) - .withStorageClient(fakeStorageClient)); + return BigQueryIO.read(parseFn) + .fromQuery(encodedQuery) + .withMethod(Method.DIRECT_READ) + .withTestServices( + new FakeBigQueryServices() + .withDatasetService(fakeDatasetService) + .withJobService(fakeJobService) + .withStorageClient(fakeStorageClient)); + } + + @Test + public void testReadFromBigQueryIO() throws Exception { + doReadFromBigQueryIO(false); + } + + @Test + public void testReadFromBigQueryIOWithTemplateCompatibility() throws Exception { + doReadFromBigQueryIO(true); + } + + private void doReadFromBigQueryIO(boolean templateCompatibility) throws Exception { + + BigQueryIO.TypedRead> typedRead = configureTypedRead(new ParseKeyValue()); if (templateCompatibility) { typedRead = typedRead.withTemplateCompatibility(); @@ -862,4 +869,35 @@ private void doReadFromBigQueryIO(boolean templateCompatibility) throws Exceptio p.run(); } + + private static final class FailingParseKeyValue + implements SerializableFunction> { + @Override + public KV apply(SchemaAndRecord input) { + if (input.getRecord().get("name").toString().equals("B")) { + throw new RuntimeException("ExpectedException"); + } + return KV.of( + input.getRecord().get("name").toString(), (Long) input.getRecord().get("number")); + } + } + + @Test + public void testReadFromBigQueryWithExceptionHandling() throws Exception { + + TypedRead> typedRead = configureTypedRead(new FailingParseKeyValue()); + + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + typedRead = typedRead.withErrorHandler(errorHandler); + PCollection> output = p.apply(typedRead); + errorHandler.close(); + + PAssert.that(output) + .containsInAnyOrder(ImmutableList.of(KV.of("A", 1L), KV.of("C", 3L), KV.of("D", 4L))); + + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(1L); + + p.run(); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index e95ad4678ea81..4e20d36348008 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions.BIGQUERY_EARLY_ROLLOUT_REGION; import static org.junit.Assert.assertEquals; +import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.bigquery.storage.v1.DataFormat; import java.util.Map; import org.apache.beam.sdk.Pipeline; @@ -43,6 +44,9 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -121,6 +125,45 @@ private void runBigQueryIOStorageReadPipeline() { p.run().waitUntilFinish(); } + static class FailingTableRowParser implements SerializableFunction { + + public static final FailingTableRowParser INSTANCE = new FailingTableRowParser(); + + private int parseCount = 0; + + @Override + public TableRow apply(SchemaAndRecord schemaAndRecord) { + parseCount++; + if (parseCount % 50 == 0) { + throw new RuntimeException("ExpectedException"); + } + return TableRowParser.INSTANCE.apply(schemaAndRecord); + } + } + + private void runBigQueryIOStorageReadPipelineErrorHandling() throws Exception { + Pipeline p = Pipeline.create(options); + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + PCollection count = + p.apply( + "Read", + BigQueryIO.read(FailingTableRowParser.INSTANCE) + .from(options.getInputTable()) + .withMethod(Method.DIRECT_READ) + .withFormat(options.getDataFormat()) + .withErrorHandler(errorHandler)) + .apply("Count", Count.globally()); + + errorHandler.close(); + + // When 1/50 elements fail sequentially, this is the expected success count + PAssert.thatSingleton(count).isEqualTo(10381L); + // this is the total elements, less the successful elements + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(10592L - 10381L); + p.run().waitUntilFinish(); + } + @Test public void testBigQueryStorageRead1GAvro() throws Exception { setUpTestEnvironment("1G", DataFormat.AVRO); @@ -133,6 +176,18 @@ public void testBigQueryStorageRead1GArrow() throws Exception { runBigQueryIOStorageReadPipeline(); } + @Test + public void testBigQueryStorageRead1MErrorHandlingAvro() throws Exception { + setUpTestEnvironment("1M", DataFormat.AVRO); + runBigQueryIOStorageReadPipelineErrorHandling(); + } + + @Test + public void testBigQueryStorageRead1MErrorHandlingArrow() throws Exception { + setUpTestEnvironment("1M", DataFormat.ARROW); + runBigQueryIOStorageReadPipelineErrorHandling(); + } + @Test public void testBigQueryStorageReadWithAvro() throws Exception { storageReadWithSchema(DataFormat.AVRO); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index 668f4eef4d839..7f2ff8945482e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -71,6 +71,8 @@ public class BigQueryIOTranslationTest { READ_TRANSFORM_SCHEMA_MAPPING.put("getUseAvroLogicalTypes", "use_avro_logical_types"); READ_TRANSFORM_SCHEMA_MAPPING.put( "getProjectionPushdownApplied", "projection_pushdown_applied"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getBadRecordRouter", "bad_record_router"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getBadRecordErrorHandler", "bad_record_error_handler"); } static final Map WRITE_TRANSFORM_SCHEMA_MAPPING = new HashMap<>(); @@ -128,6 +130,8 @@ public class BigQueryIOTranslationTest { WRITE_TRANSFORM_SCHEMA_MAPPING.put("getWriteTempDataset", "write_temp_dataset"); WRITE_TRANSFORM_SCHEMA_MAPPING.put( "getRowMutationInformationFn", "row_mutation_information_fn"); + WRITE_TRANSFORM_SCHEMA_MAPPING.put("getBadRecordRouter", "bad_record_router"); + WRITE_TRANSFORM_SCHEMA_MAPPING.put("getBadRecordErrorHandler", "bad_record_error_handler"); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 21d3e53a07018..f42734af76714 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -71,6 +71,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -83,6 +84,7 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.StreamSupport; +import org.apache.avro.Schema.Field; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; @@ -92,6 +94,7 @@ import org.apache.avro.io.Encoder; import org.apache.beam.runners.direct.DirectOptions; import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -142,6 +145,10 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.EchoErrorTransform; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -175,6 +182,7 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -904,6 +912,124 @@ public void testBatchFileLoadsWithTempTablesCreateNever() throws Exception { containsInAnyOrder(Iterables.toArray(elements, TableRow.class))); } + private static final SerializableFunction failingIntegerToTableRow = + new SerializableFunction() { + @Override + public TableRow apply(Integer input) { + if (input == 15) { + throw new RuntimeException("Expected Exception"); + } + return new TableRow().set("number", input); + } + }; + + @Test + public void testBatchLoadsWithTableRowErrorHandling() throws Exception { + assumeTrue(!useStreaming); + assumeTrue(!useStorageApi); + List elements = Lists.newArrayList(); + for (int i = 0; i < 30; ++i) { + elements.add(i); + } + + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + + WriteResult result = + p.apply(Create.of(elements).withCoder(BigEndianIntegerCoder.of())) + .apply( + BigQueryIO.write() + .to("dataset-id.table-id") + .withFormatFunction(failingIntegerToTableRow) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withErrorHandler(errorHandler) + .withoutValidation()); + + errorHandler.close(); + + PAssert.that(result.getSuccessfulTableLoads()) + .containsInAnyOrder(new TableDestination("project-id:dataset-id.table-id", null)); + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(1L); + p.run(); + + elements.remove(15); + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id").stream() + .map(tr -> ((Integer) tr.get("number"))) + .collect(Collectors.toList()), + containsInAnyOrder(Iterables.toArray(elements, Integer.class))); + } + + private static final org.apache.avro.Schema avroSchema = + org.apache.avro.Schema.createRecord( + ImmutableList.of( + new Field( + "number", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), + "nodoc", + 0))); + private static final SerializableFunction, GenericRecord> + failingLongToAvro = + new SerializableFunction, GenericRecord>() { + @Override + public GenericRecord apply(AvroWriteRequest input) { + if (input.getElement() == 15) { + throw new RuntimeException("Expected Exception"); + } + return new GenericRecordBuilder(avroSchema).set("number", input.getElement()).build(); + } + }; + + @Test + public void testBatchLoadsWithAvroErrorHandling() throws Exception { + assumeTrue(!useStreaming); + assumeTrue(!useStorageApi); + List elements = Lists.newArrayList(); + for (long i = 0L; i < 30L; ++i) { + elements.add(i); + } + + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + + WriteResult result = + p.apply(Create.of(elements).withCoder(VarLongCoder.of())) + .apply( + BigQueryIO.write() + .to("dataset-id.table-id") + .withAvroFormatFunction(failingLongToAvro) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withErrorHandler(errorHandler) + .withoutValidation()); + + errorHandler.close(); + + PAssert.that(result.getSuccessfulTableLoads()) + .containsInAnyOrder(new TableDestination("project-id:dataset-id.table-id", null)); + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(1L); + p.run(); + + elements.remove(15); + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id").stream() + .map(tr -> Long.valueOf((String) tr.get("number"))) + .collect(Collectors.toList()), + containsInAnyOrder(Iterables.toArray(elements, Long.class))); + } + @Test public void testStreamingInsertsFailuresNoRetryPolicy() throws Exception { assumeTrue(!useStorageApi); @@ -1337,6 +1463,120 @@ public void testStreamingStorageApiWriteWithAutoSharding() throws Exception { storageWrite(true); } + // There are two failure scenarios in storage write. + // first is in conversion, which is triggered by using a bad format function + // second is in actually sending to BQ, which is triggered by telling te dataset service + // to fail a row + private void storageWriteWithErrorHandling(boolean autoSharding) throws Exception { + assumeTrue(useStorageApi); + if (autoSharding) { + assumeTrue(!useStorageApiApproximate); + assumeTrue(useStreaming); + } + List elements = Lists.newArrayList(); + for (int i = 0; i < 30; ++i) { + elements.add(i); + } + + Function shouldFailRow = + (Function & Serializable) + tr -> + tr.containsKey("number") + && (tr.get("number").equals("27") || tr.get("number").equals("3")); + fakeDatasetService.setShouldFailRow(shouldFailRow); + + TestStream testStream = + TestStream.create(BigEndianIntegerCoder.of()) + .addElements(elements.get(0), Iterables.toArray(elements.subList(1, 10), Integer.class)) + .advanceProcessingTime(Duration.standardMinutes(1)) + .addElements( + elements.get(10), Iterables.toArray(elements.subList(11, 20), Integer.class)) + .advanceProcessingTime(Duration.standardMinutes(1)) + .addElements( + elements.get(20), Iterables.toArray(elements.subList(21, 30), Integer.class)) + .advanceWatermarkToInfinity(); + + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new EchoErrorTransform()); + + BigQueryIO.Write write = + BigQueryIO.write() + .to("project-id:dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withFormatFunction(failingIntegerToTableRow) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withErrorHandler(errorHandler) + .withoutValidation(); + + if (useStreaming) { + if (!useStorageApiApproximate) { + write = + write + .withTriggeringFrequency(Duration.standardSeconds(30)) + .withNumStorageWriteApiStreams(2); + } + if (autoSharding) { + write = write.withAutoSharding(); + } + } + + PTransform> source = + useStreaming ? testStream : Create.of(elements).withCoder(BigEndianIntegerCoder.of()); + + p.apply(source).apply("WriteToBQ", write); + + errorHandler.close(); + + PAssert.that(errorHandler.getOutput()) + .satisfies( + badRecords -> { + int count = 0; + Iterator iterator = badRecords.iterator(); + while (iterator.hasNext()) { + count++; + iterator.next(); + } + Assert.assertEquals("Wrong number of bad records", 3, count); + return null; + }); + + p.run().waitUntilFinish(); + + // remove the "bad" elements from the expected elements written + elements.remove(27); + elements.remove(15); + elements.remove(3); + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id").stream() + .map(tr -> Integer.valueOf((String) tr.get("number"))) + .collect(Collectors.toList()), + containsInAnyOrder(Iterables.toArray(elements, Integer.class))); + } + + @Test + public void testBatchStorageApiWriteWithErrorHandling() throws Exception { + assumeTrue(!useStreaming); + storageWriteWithErrorHandling(false); + } + + @Test + public void testStreamingStorageApiWriteWithErrorHandling() throws Exception { + assumeTrue(useStreaming); + storageWriteWithErrorHandling(false); + } + + @Test + public void testStreamingStorageApiWriteWithAutoShardingWithErrorHandling() throws Exception { + assumeTrue(useStreaming); + assumeTrue(!useStorageApiApproximate); + storageWriteWithErrorHandling(true); + } + @DefaultSchema(JavaFieldSchema.class) static class SchemaPojo { final String name;