Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent race condition in BQ sink jobId generation #877

Merged
merged 2 commits into from
Jul 16, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@

import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.value.AutoValue;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.*;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.state.*;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.*;
Expand All @@ -26,7 +24,7 @@
@AutoValue
public abstract class BatchLoadsWithResult<DestinationT>
extends PTransform<
PCollection<KV<DestinationT, TableRow>>, PCollection<KV<TableDestination, String>>> {
PCollection<KV<DestinationT, TableRow>>, PCollection<KV<TableDestination, String>>> {
static final Logger LOG = LoggerFactory.getLogger(BatchLoadsWithResult.class);

@VisibleForTesting
Expand Down Expand Up @@ -134,13 +132,14 @@ public PCollection<KV<TableDestination, String>> expand(
input.getWindowingStrategy().getWindowFn() instanceof FixedWindows,
"Input to BQ writer must be windowed in advance");

final PCollection<String> loadJobIdPrefixView = createLoadJobIdPrefixView(input);
final PCollectionView<String> loadJobIdPrefixView = createLoadJobIdPrefixView(input);
final PCollectionView<String> tempFilePrefixView =
createTempFilePrefixView(loadJobIdPrefixView);
createTempFilePrefixView(input.getPipeline());

PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
input
.apply("WindowWithTrigger",
.apply(
"WindowWithTrigger",
Window.<KV<DestinationT, TableRow>>configure()
.triggering(
Repeatedly.forever(
Expand All @@ -161,8 +160,8 @@ public void process(ProcessContext c) {
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<>(
tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
new WriteGroupedRecordsToFiles<>(
tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
.withSideInputs(tempFilePrefixView))
.setCoder(WriteBundlesToFiles.ResultCoder.of(getDestinationCoder()));

Expand All @@ -176,8 +175,7 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
results
.apply(
Window.<WriteBundlesToFiles.Result<DestinationT>>configure()
.triggering(DefaultTrigger.of())
)
.triggering(DefaultTrigger.of()))
.apply("AttachSingletonKey", WithKeys.of((Void) null))
.setCoder(
KvCoder.of(
Expand All @@ -187,15 +185,15 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
.apply(
"WritePartitionTriggered",
ParDo.of(
new WritePartition<>(
false,
getDynamicDestinations(),
tempFilePrefixView,
DEFAULT_MAX_FILES_PER_PARTITION,
DEFAULT_MAX_BYTES_PER_PARTITION,
multiPartitionsTag,
singlePartitionTag,
getRowWriterFactory()))
new WritePartition<>(
false,
getDynamicDestinations(),
tempFilePrefixView,
DEFAULT_MAX_FILES_PER_PARTITION,
DEFAULT_MAX_BYTES_PER_PARTITION,
multiPartitionsTag,
singlePartitionTag,
getRowWriterFactory()))
.withSideInputs(tempFilePrefixView)
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));

Expand All @@ -206,82 +204,72 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
ShardedKeyCoder.of(NullableCoder.of(getDestinationCoder())),
ListCoder.of(StringUtf8Coder.of())));

return writeSinglePartitionWithResult(
partitions.get(singlePartitionTag), loadJobIdPrefixView.apply(View.asSingleton()));
return writeSinglePartitionWithResult(partitions.get(singlePartitionTag), loadJobIdPrefixView);
}

private PCollection<String> createLoadJobIdPrefixView(
/**
* Generates one jobId per window only if any feature row was submitted in this window. We need to
* generate exactly one id per window, otherwise SingletonView will fail.
*
* @param input feature Rows
* @return job id generated once per input's window
*/
private PCollectionView<String> createLoadJobIdPrefixView(
PCollection<KV<DestinationT, TableRow>> input) {
// We generate new JobId per each (input) window
// To keep BQ job's name unique
// Windowing of this generator is expected to be synchronized with input window
// So generated ids can be applied as side input

String baseName = input.getPipeline().getOptions().getJobName().replaceAll("-", "");

return input
.apply(
"EraseKey",
"EraseKeyAndValue",
ParDo.of(
new DoFn<KV<DestinationT, TableRow>, KV<Void, TableRow>>() {
new DoFn<KV<DestinationT, TableRow>, String>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(KV.of(null, c.element().getValue()));
// we don't need data, only fact of data existing
c.output("");
}
}))
.apply(
"CreateJobId",
ParDo.of(
new DoFn<KV<Void, TableRow>, String>() {
@StateId("oncePerWindow")
private final StateSpec<SetState<Boolean>> oncePerWindow = StateSpecs.set(BooleanCoder.of());

@ProcessElement
public void process(
ProcessContext c,
BoundedWindow w,
@StateId("oncePerWindow") SetState<Boolean> oncePerWindow) {

// if set already contains something
// it means we already generated Id for this window
Boolean empty = oncePerWindow.isEmpty().read();
if (empty != null && !empty) {
return;
}

// trying to add to Set and check if it was added
// if true - we won and Id will be generated in current Process
Boolean insertResult = oncePerWindow.addIfAbsent(true).read();
if (insertResult != null && !insertResult) {
return;
}

c.output(
String.format(
"beam_load_%s_%s",
c.getPipelineOptions().getJobName().replaceAll("-", ""),
BigQueryHelpers.randomUUIDString()));

LOG.info("Pane {}, start: {}, last: {}", c.pane().getIndex(), c.pane().isFirst(), c.pane().isLast());
LOG.info("[BQ] New window {}, {}", c.timestamp(), w.maxTimestamp());
}
}));
Combine.globally(
(SerializableFunction<Iterable<String>, String>)
g ->
String.format(
"beam_load_%s_%s", baseName, BigQueryHelpers.randomUUIDString()))
.withoutDefaults())
.apply("JobIdView", View.asSingleton());
}

private PCollectionView<String> createTempFilePrefixView(final PCollection<String> jobId) {
return jobId
/**
* Generates one global (per all windows) prefix path to store files before load to BQ
*
* @param p Pipeline
* @return view in global window
*/
private PCollectionView<String> createTempFilePrefixView(final Pipeline p) {
return p.apply("CreateGlobalTempPrefix", Create.of(""))
.apply(
"GetTempFilePrefix",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void getTempFilePrefix(ProcessContext c, BoundedWindow w) {
public void getTempFilePrefix(ProcessContext c) {
String tempLocationRoot;
if (getCustomGcsTempLocation() != null) {
tempLocationRoot = getCustomGcsTempLocation().get();
} else {
tempLocationRoot = c.getPipelineOptions().getTempLocation();
}
String tempLocation =
resolveTempLocation(tempLocationRoot, "BigQueryWriteTemp", c.element());
LOG.info("[BQ] temp location generated {}, {}", tempLocation, w.maxTimestamp());
resolveTempLocation(
tempLocationRoot,
"BigQueryWriteTemp",
c.getPipelineOptions().getJobName());

c.output(tempLocation);
}
}))
Expand Down Expand Up @@ -321,5 +309,4 @@ PCollection<KV<TableDestination, String>> writeSinglePartitionWithResult(
true,
getSchemaUpdateOptions()));
}

}