Skip to content

Commit

Permalink
Add Error Handlers to File IO and related IOs (TextIO, AvroIO) (#29670)
Browse files Browse the repository at this point in the history
* first pass of wiring error handling into write files and adding tests

* fix error handling to solve constant filenaming policy returning a null destination

* fix tests, add a safety check to the error handler

* spotless

* add documentation

* add textio error handler pass-through

* add avroio error handler pass-through

* add documentation to avroio

* add documentation to WriteFiles

* remove function to check if the exception is bad, because that isn't portable

* spotless

* spotless

* clean up documentation

* clean up documentation, remove unnecessary unwritten records tag

* spotless

* spotless
  • Loading branch information
johnjcasey authored Dec 28, 2023
1 parent 783c72a commit f303d6a
Show file tree
Hide file tree
Showing 9 changed files with 511 additions and 74 deletions.
43 changes: 43 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand Down Expand Up @@ -236,6 +238,27 @@
* destination-dependent: every window/pane for every destination will use the same number of shards
* specified via {@link Write#withNumShards} or {@link Write#withSharding}.
*
* <h3>Handling Errors</h3>
*
* <p>When using dynamic destinations, or when using a formatting function to format a record for
* writing, it's possible for an individual record to be malformed, causing an exception. By
* default, these exceptions are propagated to the runner causing the bundle to fail. These are
* usually retried, though this depends on the runner. Alternately, these errors can be routed to
* another {@link PTransform} by using {@link Write#withBadRecordErrorHandler(ErrorHandler)}. The
* ErrorHandler is registered with the pipeline (see below). See {@link ErrorHandler} for more
* documentation. Of note, this error handling only handles errors related to specific records. It
* does not handle errors related to connectivity, authorization, etc. as those should be retried by
* the runner.
*
* <pre>{@code
* PCollection<> records = ...;
* PTransform<PCollection<BadRecord>,?> alternateSink = ...;
* try (BadRecordErrorHandler<?> handler = pipeline.registerBadRecordErrorHandler(alternateSink) {
* records.apply("Write", FileIO.writeDynamic().otherConfigs()
* .withBadRecordErrorHandler(handler));
* }
* }</pre>
*
* <h3>Writing custom types to sinks</h3>
*
* <p>Normally, when writing a collection of a custom type using a {@link Sink} that takes a
Expand Down Expand Up @@ -1016,6 +1039,8 @@ public static FileNaming relativeFileNaming(

abstract boolean getNoSpilling();

abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract Builder<DestinationT, UserT> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -1062,6 +1087,9 @@ abstract Builder<DestinationT, UserT> setSharding(

abstract Builder<DestinationT, UserT> setNoSpilling(boolean noSpilling);

abstract Builder<DestinationT, UserT> setBadRecordErrorHandler(
@Nullable ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Write<DestinationT, UserT> build();
}

Expand Down Expand Up @@ -1288,6 +1316,18 @@ public Write<DestinationT, UserT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}

/**
* Configures a new {@link Write} with an ErrorHandler. For configuring an ErrorHandler, see
* {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a dynamic destination
* is performed, and that operation fails, the exception is passed to the error handler. This is
* intended to handle any errors related to the data of a record, but not any connectivity or IO
* errors related to the literal writing of a record.
*/
public Write<DestinationT, UserT> withBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> errorHandler) {
return toBuilder().setBadRecordErrorHandler(errorHandler).build();
}

@VisibleForTesting
Contextful<Fn<DestinationT, FileNaming>> resolveFileNamingFn() {
if (getDynamic()) {
Expand Down Expand Up @@ -1391,6 +1431,9 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
if (getNoSpilling()) {
writeFiles = writeFiles.withNoSpilling();
}
if (getBadRecordErrorHandler() != null) {
writeFiles = writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
return input.apply(writeFiles);
}

Expand Down
20 changes: 20 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
Expand Down Expand Up @@ -176,6 +178,10 @@
*
* <p>For backwards compatibility, {@link TextIO} also supports the legacy {@link
* DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}.
*
* <p>Error handling for records that are malformed can be handled by using {@link
* TypedWrite#withBadRecordErrorHandler(ErrorHandler)}. See documentation in {@link FileIO} for
* details on usage
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -708,6 +714,8 @@ public abstract static class TypedWrite<UserT, DestinationT>
*/
abstract WritableByteChannelFactory getWritableByteChannelFactory();

abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract Builder<UserT, DestinationT> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -754,6 +762,9 @@ abstract Builder<UserT, DestinationT> setNumShards(
abstract Builder<UserT, DestinationT> setWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory);

abstract Builder<UserT, DestinationT> setBadRecordErrorHandler(
@Nullable ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract TypedWrite<UserT, DestinationT> build();
}

Expand Down Expand Up @@ -993,6 +1004,12 @@ public TypedWrite<UserT, DestinationT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}

/** See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage. */
public TypedWrite<UserT, DestinationT> withBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> errorHandler) {
return toBuilder().setBadRecordErrorHandler(errorHandler).build();
}

/** Don't write any output files if the PCollection is empty. */
public TypedWrite<UserT, DestinationT> skipIfEmpty() {
return toBuilder().setSkipIfEmpty(true).build();
Expand Down Expand Up @@ -1083,6 +1100,9 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
if (getNoSpilling()) {
write = write.withNoSpilling();
}
if (getBadRecordErrorHandler() != null) {
write = write.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
if (getSkipIfEmpty()) {
write = write.withSkipIfEmpty();
}
Expand Down
Loading

0 comments on commit f303d6a

Please sign in to comment.