Skip to content

Commit

Permalink
Prevent race condition in BQ sink jobId generation (feast-dev#877)
Browse files Browse the repository at this point in the history
* fix race condition

* [bq] temp file prefix in global window
  • Loading branch information
Oleksii Moskalenko authored and pyalex committed Jul 17, 2020
1 parent b9915e2 commit 8ce45c7
Showing 1 changed file with 55 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@

import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.value.AutoValue;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.*;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.state.*;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.*;
Expand All @@ -26,7 +24,7 @@
@AutoValue
public abstract class BatchLoadsWithResult<DestinationT>
extends PTransform<
PCollection<KV<DestinationT, TableRow>>, PCollection<KV<TableDestination, String>>> {
PCollection<KV<DestinationT, TableRow>>, PCollection<KV<TableDestination, String>>> {
static final Logger LOG = LoggerFactory.getLogger(BatchLoadsWithResult.class);

@VisibleForTesting
Expand Down Expand Up @@ -134,13 +132,14 @@ public PCollection<KV<TableDestination, String>> expand(
input.getWindowingStrategy().getWindowFn() instanceof FixedWindows,
"Input to BQ writer must be windowed in advance");

final PCollection<String> loadJobIdPrefixView = createLoadJobIdPrefixView(input);
final PCollectionView<String> loadJobIdPrefixView = createLoadJobIdPrefixView(input);
final PCollectionView<String> tempFilePrefixView =
createTempFilePrefixView(loadJobIdPrefixView);
createTempFilePrefixView(input.getPipeline());

PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
input
.apply("WindowWithTrigger",
.apply(
"WindowWithTrigger",
Window.<KV<DestinationT, TableRow>>configure()
.triggering(
Repeatedly.forever(
Expand All @@ -161,8 +160,8 @@ public void process(ProcessContext c) {
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<>(
tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
new WriteGroupedRecordsToFiles<>(
tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
.withSideInputs(tempFilePrefixView))
.setCoder(WriteBundlesToFiles.ResultCoder.of(getDestinationCoder()));

Expand All @@ -176,8 +175,7 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
results
.apply(
Window.<WriteBundlesToFiles.Result<DestinationT>>configure()
.triggering(DefaultTrigger.of())
)
.triggering(DefaultTrigger.of()))
.apply("AttachSingletonKey", WithKeys.of((Void) null))
.setCoder(
KvCoder.of(
Expand All @@ -187,15 +185,15 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
.apply(
"WritePartitionTriggered",
ParDo.of(
new WritePartition<>(
false,
getDynamicDestinations(),
tempFilePrefixView,
DEFAULT_MAX_FILES_PER_PARTITION,
DEFAULT_MAX_BYTES_PER_PARTITION,
multiPartitionsTag,
singlePartitionTag,
getRowWriterFactory()))
new WritePartition<>(
false,
getDynamicDestinations(),
tempFilePrefixView,
DEFAULT_MAX_FILES_PER_PARTITION,
DEFAULT_MAX_BYTES_PER_PARTITION,
multiPartitionsTag,
singlePartitionTag,
getRowWriterFactory()))
.withSideInputs(tempFilePrefixView)
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));

Expand All @@ -206,82 +204,72 @@ tempFilePrefixView, DEFAULT_MAX_FILE_SIZE, getRowWriterFactory()))
ShardedKeyCoder.of(NullableCoder.of(getDestinationCoder())),
ListCoder.of(StringUtf8Coder.of())));

return writeSinglePartitionWithResult(
partitions.get(singlePartitionTag), loadJobIdPrefixView.apply(View.asSingleton()));
return writeSinglePartitionWithResult(partitions.get(singlePartitionTag), loadJobIdPrefixView);
}

private PCollection<String> createLoadJobIdPrefixView(
/**
* Generates one jobId per window only if any feature row was submitted in this window. We need to
* generate exactly one id per window, otherwise SingletonView will fail.
*
* @param input feature Rows
* @return job id generated once per input's window
*/
private PCollectionView<String> createLoadJobIdPrefixView(
PCollection<KV<DestinationT, TableRow>> input) {
// We generate new JobId per each (input) window
// To keep BQ job's name unique
// Windowing of this generator is expected to be synchronized with input window
// So generated ids can be applied as side input

String baseName = input.getPipeline().getOptions().getJobName().replaceAll("-", "");

return input
.apply(
"EraseKey",
"EraseKeyAndValue",
ParDo.of(
new DoFn<KV<DestinationT, TableRow>, KV<Void, TableRow>>() {
new DoFn<KV<DestinationT, TableRow>, String>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(KV.of(null, c.element().getValue()));
// we don't need data, only fact of data existing
c.output("");
}
}))
.apply(
"CreateJobId",
ParDo.of(
new DoFn<KV<Void, TableRow>, String>() {
@StateId("oncePerWindow")
private final StateSpec<SetState<Boolean>> oncePerWindow = StateSpecs.set(BooleanCoder.of());

@ProcessElement
public void process(
ProcessContext c,
BoundedWindow w,
@StateId("oncePerWindow") SetState<Boolean> oncePerWindow) {

// if set already contains something
// it means we already generated Id for this window
Boolean empty = oncePerWindow.isEmpty().read();
if (empty != null && !empty) {
return;
}

// trying to add to Set and check if it was added
// if true - we won and Id will be generated in current Process
Boolean insertResult = oncePerWindow.addIfAbsent(true).read();
if (insertResult != null && !insertResult) {
return;
}

c.output(
String.format(
"beam_load_%s_%s",
c.getPipelineOptions().getJobName().replaceAll("-", ""),
BigQueryHelpers.randomUUIDString()));

LOG.info("Pane {}, start: {}, last: {}", c.pane().getIndex(), c.pane().isFirst(), c.pane().isLast());
LOG.info("[BQ] New window {}, {}", c.timestamp(), w.maxTimestamp());
}
}));
Combine.globally(
(SerializableFunction<Iterable<String>, String>)
g ->
String.format(
"beam_load_%s_%s", baseName, BigQueryHelpers.randomUUIDString()))
.withoutDefaults())
.apply("JobIdView", View.asSingleton());
}

private PCollectionView<String> createTempFilePrefixView(final PCollection<String> jobId) {
return jobId
/**
* Generates one global (per all windows) prefix path to store files before load to BQ
*
* @param p Pipeline
* @return view in global window
*/
private PCollectionView<String> createTempFilePrefixView(final Pipeline p) {
return p.apply("CreateGlobalTempPrefix", Create.of(""))
.apply(
"GetTempFilePrefix",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void getTempFilePrefix(ProcessContext c, BoundedWindow w) {
public void getTempFilePrefix(ProcessContext c) {
String tempLocationRoot;
if (getCustomGcsTempLocation() != null) {
tempLocationRoot = getCustomGcsTempLocation().get();
} else {
tempLocationRoot = c.getPipelineOptions().getTempLocation();
}
String tempLocation =
resolveTempLocation(tempLocationRoot, "BigQueryWriteTemp", c.element());
LOG.info("[BQ] temp location generated {}, {}", tempLocation, w.maxTimestamp());
resolveTempLocation(
tempLocationRoot,
"BigQueryWriteTemp",
c.getPipelineOptions().getJobName());

c.output(tempLocation);
}
}))
Expand Down

0 comments on commit 8ce45c7

Please sign in to comment.