Skip to content

Commit

Permalink
[NEMO-456] Remove unnecessary shuffling processes in Combine transfor…
Browse files Browse the repository at this point in the history
…mation (#302)

JIRA: [NEMO-456: Remove unnecessary shuffling processes in Combine transformation](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-456)

**Major changes:**
- Removed the unnecessary shuffling process before the partial combine transform

**Tests for the changes:**
- Modified GBKTransformTest.

Closes #302
  • Loading branch information
jaehwan0214 authored Oct 17, 2020
1 parent 13bf8d4 commit e325087
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ private CommunicationPatternProperty.Value getCommPattern(final IRVertex src, fi
if (srcTransform instanceof FlattenTransform) {
return CommunicationPatternProperty.Value.ONE_TO_ONE;
}
if (dstTransform instanceof GBKTransform
// If GBKTransform represents a partial CombinePerKey transformation, we do NOT need to shuffle its input,
// since its output will be shuffled before going through a final CombinePerKey transformation.
if ((dstTransform instanceof GBKTransform && !((GBKTransform) dstTransform).getIsPartialCombining())
|| dstTransform instanceof GroupByKeyTransform) {
return CommunicationPatternProperty.Value.SHUFFLE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslato
// Batch processing, using CombinePartialTransform and CombineFinalTransform
partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn));
finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
// Stream data processing, using GBKTransform
} else {
// Stream data processing, using GBKTransform
final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
final CombineFnBase.GlobalCombineFn partialCombineFn = new PartialCombineFn(
(Combine.CombineFn) combineFn, accumulatorCoder);
Expand Down Expand Up @@ -414,7 +414,8 @@ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslato
ctx.getPipelineOptions(),
partialSystemReduceFn,
DoFnSchemaInformation.create(),
DisplayData.from(beamNode.getTransform()));
DisplayData.from(beamNode.getTransform()),
true);

final GBKTransform finalCombineStreamTransform =
new GBKTransform(
Expand All @@ -424,7 +425,8 @@ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslato
ctx.getPipelineOptions(),
finalSystemReduceFn,
DoFnSchemaInformation.create(),
DisplayData.from(beamNode.getTransform()));
DisplayData.from(beamNode.getTransform()),
false);

partialCombine = new OperatorVertex(partialCombineStreamTransform);
finalCombine = new OperatorVertex(finalCombineStreamTransform);
Expand Down Expand Up @@ -568,7 +570,8 @@ private static Transform createGBKTransform(
ctx.getPipelineOptions(),
SystemReduceFn.buffering(mainInput.getCoder()),
DoFnSchemaInformation.create(),
DisplayData.from(beamNode.getTransform()));
DisplayData.from(beamNode.getTransform()),
false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import java.util.*;

/**
* This transform performs GroupByKey or CombinePerKey operation when input data is unbounded or is not in
* global window.
* This transform executes GroupByKey transformation and CombinePerKey transformation when input data is unbounded
* or is not in a global window.
* @param <K> key type
* @param <InputT> input type
* @param <OutputT> output type
Expand All @@ -56,14 +56,16 @@ public final class GBKTransform<K, InputT, OutputT>
private Watermark inputWatermark = new Watermark(Long.MIN_VALUE);
private boolean dataReceived = false;
private transient OutputCollector originOc;
private final boolean isPartialCombining;

public GBKTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
final TupleTag<KV<K, OutputT>> mainOutputTag,
final WindowingStrategy<?, ?> windowingStrategy,
final PipelineOptions options,
final SystemReduceFn reduceFn,
final DoFnSchemaInformation doFnSchemaInformation,
final DisplayData displayData) {
final DisplayData displayData,
final boolean isPartialCombining) {
super(null,
null,
outputCoders,
Expand All @@ -76,6 +78,7 @@ public GBKTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
doFnSchemaInformation,
Collections.emptyMap()); /* does not have side inputs */
this.reduceFn = reduceFn;
this.isPartialCombining = isPartialCombining;
}

/**
Expand Down Expand Up @@ -122,19 +125,19 @@ OutputCollector wrapOutputCollector(final OutputCollector oc) {
*/
@Override
public void onData(final WindowedValue<KV<K, InputT>> element) {
dataReceived = true;
try {
checkAndInvokeBundle();
final KV<K, InputT> kv = element.getValue();
final KeyedWorkItem<K, InputT> keyedWorkItem =
KeyedWorkItems.elementsWorkItem(kv.getKey(),
Collections.singletonList(element.withValue(kv.getValue())));
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
checkAndFinishBundle();
} catch (final Exception e) {
e.printStackTrace();
throw new RuntimeException("Exception triggered element " + element.toString());
}
dataReceived = true;
try {
checkAndInvokeBundle();
final KV<K, InputT> kv = element.getValue();
final KeyedWorkItem<K, InputT> keyedWorkItem =
KeyedWorkItems.elementsWorkItem(kv.getKey(),
Collections.singletonList(element.withValue(kv.getValue())));
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
checkAndFinishBundle();
} catch (final Exception e) {
e.printStackTrace();
throw new RuntimeException("Exception triggered element " + element.toString());
}
}

/**
Expand Down Expand Up @@ -183,8 +186,8 @@ protected void beforeClose() {
* @param watermark watermark
*/
private void triggerTimers(final Instant processingTime,
final Instant synchronizedTime,
final Watermark watermark) {
final Instant synchronizedTime,
final Watermark watermark) {
final Iterator<Map.Entry<K, InMemoryTimerInternals>> iter =
inMemoryTimerInternalsFactory.getTimerInternalsMap().entrySet().iterator();
while (iter.hasNext()) {
Expand Down Expand Up @@ -259,6 +262,12 @@ private void emitOutputWatermark() {
}
}

/** Accessor for isPartialCombining. */
public boolean getIsPartialCombining() {
return isPartialCombining;
}


/** Wrapper class for {@link OutputCollector}. */
public class GBKOutputCollector implements OutputCollector<WindowedValue<KV<K, OutputT>>> {
private final OutputCollector<WindowedValue<KV<K, OutputT>>> oc;
Expand All @@ -278,9 +287,9 @@ public void emit(final WindowedValue<KV<K, OutputT>> output) {
(InMemoryTimerInternals) inMemoryTimerInternalsFactory.timerInternalsForKey(key);
// Add the output timestamp to the watermark hold of each key.
// +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999.
keyOutputWatermarkMap.put(key,
new Watermark(output.getTimestamp().getMillis() + 1));
timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
keyOutputWatermarkMap.put(key,
new Watermark(output.getTimestamp().getMillis() + 1));
timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
}
oc.emit(output);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public void test_combine() {
PipelineOptionsFactory.as(NemoPipelineOptions.class),
SystemReduceFn.combining(STRING_CODER, applied_combine_fn),
DoFnSchemaInformation.create(),
DisplayData.none());
DisplayData.none(),
false);

// window1 : [-5000, 5000) in millisecond
// window2 : [0, 10000)
Expand Down Expand Up @@ -288,7 +289,8 @@ public void test_combine_lateData() {
PipelineOptionsFactory.as(NemoPipelineOptions.class),
SystemReduceFn.combining(STRING_CODER, applied_combine_fn),
DoFnSchemaInformation.create(),
DisplayData.none());
DisplayData.none(),
false);

// window1 : [-5000, 5000) in millisecond
// window2 : [0, 10000)
Expand Down Expand Up @@ -381,7 +383,8 @@ public void test_gbk() {
PipelineOptionsFactory.as(NemoPipelineOptions.class),
SystemReduceFn.buffering(STRING_CODER),
DoFnSchemaInformation.create(),
DisplayData.none());
DisplayData.none(),
false);

final Instant ts1 = new Instant(1);
final Instant ts2 = new Instant(100);
Expand Down Expand Up @@ -567,7 +570,8 @@ public void test_gbk_eventTimeTrigger() {
PipelineOptionsFactory.as(NemoPipelineOptions.class),
SystemReduceFn.buffering(STRING_CODER),
DoFnSchemaInformation.create(),
DisplayData.none());
DisplayData.none(),
false);


final Transform.Context context = mock(Transform.Context.class);
Expand Down

0 comments on commit e325087

Please sign in to comment.