From 8ce45c784fb0f1f8dd648affdf0f6332f3df5f3a Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Thu, 16 Jul 2020 10:39:06 +0300 Subject: [PATCH] Prevent race condition in BQ sink jobId generation (#877) * fix race condition * [bq] temp file prefix in global window --- .../io/gcp/bigquery/BatchLoadsWithResult.java | 122 ++++++++---------- 1 file changed, 55 insertions(+), 67 deletions(-) diff --git a/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java b/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java index 764bfc1f54..f6be75fe13 100644 --- a/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java +++ b/storage/connectors/bigquery/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadsWithResult.java @@ -5,15 +5,13 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.auto.value.AutoValue; - import java.util.Collections; import java.util.List; import java.util.Set; import javax.annotation.Nullable; - +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.*; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.state.*; import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.transforms.windowing.*; import org.apache.beam.sdk.values.*; @@ -26,7 +24,7 @@ @AutoValue public abstract class BatchLoadsWithResult extends PTransform< - PCollection>, PCollection>> { + PCollection>, PCollection>> { static final Logger LOG = LoggerFactory.getLogger(BatchLoadsWithResult.class); @VisibleForTesting @@ -134,13 +132,14 @@ public PCollection> expand( input.getWindowingStrategy().getWindowFn() instanceof FixedWindows, "Input to BQ writer must be windowed in advance"); - final PCollection loadJobIdPrefixView = createLoadJobIdPrefixView(input); + final PCollectionView loadJobIdPrefixView = createLoadJobIdPrefixView(input); final PCollectionView tempFilePrefixView = - createTempFilePrefixView(loadJobIdPrefixView); + createTempFilePrefixView(input.getPipeline()); PCollection> results = input - .apply("WindowWithTrigger", + .apply( + "WindowWithTrigger", Window.>configure() .triggering( Repeatedly.forever( @@ -161,8 +160,8 @@ public void process(ProcessContext c) { .apply( "WriteGroupedRecords", ParDo.of( - new WriteGroupedRecordsToFiles<>( - tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory())) + new WriteGroupedRecordsToFiles<>( + tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory())) .withSideInputs(tempFilePrefixView)) .setCoder(WriteBundlesToFiles.ResultCoder.of(getDestinationCoder())); @@ -176,8 +175,7 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory())) results .apply( Window.>configure() - .triggering(DefaultTrigger.of()) - ) + .triggering(DefaultTrigger.of())) .apply("AttachSingletonKey", WithKeys.of((Void) null)) .setCoder( KvCoder.of( @@ -187,15 +185,15 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory())) .apply( "WritePartitionTriggered", ParDo.of( - new WritePartition<>( - false, - getDynamicDestinations(), - tempFilePrefixView, - DEFAULT_MAX_FILES_PER_PARTITION, - DEFAULT_MAX_BYTES_PER_PARTITION, - multiPartitionsTag, - singlePartitionTag, - getRowWriterFactory())) + new WritePartition<>( + false, + getDynamicDestinations(), + tempFilePrefixView, + DEFAULT_MAX_FILES_PER_PARTITION, + DEFAULT_MAX_BYTES_PER_PARTITION, + multiPartitionsTag, + singlePartitionTag, + getRowWriterFactory())) .withSideInputs(tempFilePrefixView) .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); @@ -206,73 +204,60 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory())) ShardedKeyCoder.of(NullableCoder.of(getDestinationCoder())), ListCoder.of(StringUtf8Coder.of()))); - return writeSinglePartitionWithResult( - partitions.get(singlePartitionTag), loadJobIdPrefixView.apply(View.asSingleton())); + return writeSinglePartitionWithResult(partitions.get(singlePartitionTag), loadJobIdPrefixView); } - private PCollection createLoadJobIdPrefixView( + /** + * Generates one jobId per window only if any feature row was submitted in this window. We need to + * generate exactly one id per window, otherwise SingletonView will fail. + * + * @param input feature Rows + * @return job id generated once per input's window + */ + private PCollectionView createLoadJobIdPrefixView( PCollection> input) { // We generate new JobId per each (input) window // To keep BQ job's name unique // Windowing of this generator is expected to be synchronized with input window // So generated ids can be applied as side input + + String baseName = input.getPipeline().getOptions().getJobName().replaceAll("-", ""); + return input .apply( - "EraseKey", + "EraseKeyAndValue", ParDo.of( - new DoFn, KV>() { + new DoFn, String>() { @ProcessElement public void process(ProcessContext c) { - c.output(KV.of(null, c.element().getValue())); + // we don't need data, only fact of data existing + c.output(""); } })) .apply( - "CreateJobId", - ParDo.of( - new DoFn, String>() { - @StateId("oncePerWindow") - private final StateSpec> oncePerWindow = StateSpecs.set(BooleanCoder.of()); - - @ProcessElement - public void process( - ProcessContext c, - BoundedWindow w, - @StateId("oncePerWindow") SetState oncePerWindow) { - - // if set already contains something - // it means we already generated Id for this window - Boolean empty = oncePerWindow.isEmpty().read(); - if (empty != null && !empty) { - return; - } - - // trying to add to Set and check if it was added - // if true - we won and Id will be generated in current Process - Boolean insertResult = oncePerWindow.addIfAbsent(true).read(); - if (insertResult != null && !insertResult) { - return; - } - - c.output( - String.format( - "beam_load_%s_%s", - c.getPipelineOptions().getJobName().replaceAll("-", ""), - BigQueryHelpers.randomUUIDString())); - - LOG.info("Pane {}, start: {}, last: {}", c.pane().getIndex(), c.pane().isFirst(), c.pane().isLast()); - LOG.info("[BQ] New window {}, {}", c.timestamp(), w.maxTimestamp()); - } - })); + Combine.globally( + (SerializableFunction, String>) + g -> + String.format( + "beam_load_%s_%s", baseName, BigQueryHelpers.randomUUIDString())) + .withoutDefaults()) + .apply("JobIdView", View.asSingleton()); } - private PCollectionView createTempFilePrefixView(final PCollection jobId) { - return jobId + /** + * Generates one global (per all windows) prefix path to store files before load to BQ + * + * @param p Pipeline + * @return view in global window + */ + private PCollectionView createTempFilePrefixView(final Pipeline p) { + return p.apply("CreateGlobalTempPrefix", Create.of("")) .apply( "GetTempFilePrefix", ParDo.of( new DoFn() { @ProcessElement - public void getTempFilePrefix(ProcessContext c, BoundedWindow w) { + public void getTempFilePrefix(ProcessContext c) { String tempLocationRoot; if (getCustomGcsTempLocation() != null) { tempLocationRoot = getCustomGcsTempLocation().get(); @@ -280,8 +265,11 @@ public void getTempFilePrefix(ProcessContext c, BoundedWindow w) { tempLocationRoot = c.getPipelineOptions().getTempLocation(); } String tempLocation = - resolveTempLocation(tempLocationRoot, "BigQueryWriteTemp", c.element()); - LOG.info("[BQ] temp location generated {}, {}", tempLocation, w.maxTimestamp()); + resolveTempLocation( + tempLocationRoot, + "BigQueryWriteTemp", + c.getPipelineOptions().getJobName()); + c.output(tempLocation); } }))