From e3250872d29b292c8cd6bdc41a2dfdd063c7949e Mon Sep 17 00:00:00 2001 From: jaehwan0214 <60352009+jaehwan0214@users.noreply.github.com> Date: Sun, 18 Oct 2020 08:33:44 +0900 Subject: [PATCH] [NEMO-456] Remove unnecessary shuffling processes in Combine transformation (#302) JIRA: [NEMO-456: Remove unnecessary shuffling processes in Combine transformation](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-456) **Major changes:** - Removed the unnecessary shuffling process before the partial combine transform **Tests for the changes:** - Modified GBKTransformTest. Closes #302 --- .../beam/PipelineTranslationContext.java | 4 +- .../frontend/beam/PipelineTranslator.java | 11 ++-- .../frontend/beam/transform/GBKTransform.java | 51 +++++++++++-------- .../beam/transform/GBKTransformTest.java | 12 +++-- 4 files changed, 48 insertions(+), 30 deletions(-) diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java index 2d1b90baf7..28aafb2c90 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java @@ -275,7 +275,9 @@ private CommunicationPatternProperty.Value getCommPattern(final IRVertex src, fi if (srcTransform instanceof FlattenTransform) { return CommunicationPatternProperty.Value.ONE_TO_ONE; } - if (dstTransform instanceof GBKTransform + // If GBKTransform represents a partial CombinePerKey transformation, we do NOT need to shuffle its input, + // since its output will be shuffled before going through a final CombinePerKey transformation. + if ((dstTransform instanceof GBKTransform && !((GBKTransform) dstTransform).getIsPartialCombining()) || dstTransform instanceof GroupByKeyTransform) { return CommunicationPatternProperty.Value.SHUFFLE; } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java index 1e429c31cc..21ded1c268 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java @@ -384,8 +384,8 @@ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslato // Batch processing, using CombinePartialTransform and CombineFinalTransform partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn)); finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn)); - // Stream data processing, using GBKTransform } else { + // Stream data processing, using GBKTransform final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline()); final CombineFnBase.GlobalCombineFn partialCombineFn = new PartialCombineFn( (Combine.CombineFn) combineFn, accumulatorCoder); @@ -414,7 +414,8 @@ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslato ctx.getPipelineOptions(), partialSystemReduceFn, DoFnSchemaInformation.create(), - DisplayData.from(beamNode.getTransform())); + DisplayData.from(beamNode.getTransform()), + true); final GBKTransform finalCombineStreamTransform = new GBKTransform( @@ -424,7 +425,8 @@ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslato ctx.getPipelineOptions(), finalSystemReduceFn, DoFnSchemaInformation.create(), - DisplayData.from(beamNode.getTransform())); + DisplayData.from(beamNode.getTransform()), + false); partialCombine = new OperatorVertex(partialCombineStreamTransform); finalCombine = new OperatorVertex(finalCombineStreamTransform); @@ -568,7 +570,8 @@ private static Transform createGBKTransform( ctx.getPipelineOptions(), SystemReduceFn.buffering(mainInput.getCoder()), DoFnSchemaInformation.create(), - DisplayData.from(beamNode.getTransform())); + DisplayData.from(beamNode.getTransform()), + false); } } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java index 37952eaf0e..9dd2e5a6e9 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java @@ -39,8 +39,8 @@ import java.util.*; /** - * This transform performs GroupByKey or CombinePerKey operation when input data is unbounded or is not in - * global window. + * This transform executes GroupByKey transformation and CombinePerKey transformation when input data is unbounded + * or is not in a global window. * @param key type * @param input type * @param output type @@ -56,6 +56,7 @@ public final class GBKTransform private Watermark inputWatermark = new Watermark(Long.MIN_VALUE); private boolean dataReceived = false; private transient OutputCollector originOc; + private final boolean isPartialCombining; public GBKTransform(final Map, Coder> outputCoders, final TupleTag> mainOutputTag, @@ -63,7 +64,8 @@ public GBKTransform(final Map, Coder> outputCoders, final PipelineOptions options, final SystemReduceFn reduceFn, final DoFnSchemaInformation doFnSchemaInformation, - final DisplayData displayData) { + final DisplayData displayData, + final boolean isPartialCombining) { super(null, null, outputCoders, @@ -76,6 +78,7 @@ public GBKTransform(final Map, Coder> outputCoders, doFnSchemaInformation, Collections.emptyMap()); /* does not have side inputs */ this.reduceFn = reduceFn; + this.isPartialCombining = isPartialCombining; } /** @@ -122,19 +125,19 @@ OutputCollector wrapOutputCollector(final OutputCollector oc) { */ @Override public void onData(final WindowedValue> element) { - dataReceived = true; - try { - checkAndInvokeBundle(); - final KV kv = element.getValue(); - final KeyedWorkItem keyedWorkItem = - KeyedWorkItems.elementsWorkItem(kv.getKey(), - Collections.singletonList(element.withValue(kv.getValue()))); - getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem)); - checkAndFinishBundle(); - } catch (final Exception e) { - e.printStackTrace(); - throw new RuntimeException("Exception triggered element " + element.toString()); - } + dataReceived = true; + try { + checkAndInvokeBundle(); + final KV kv = element.getValue(); + final KeyedWorkItem keyedWorkItem = + KeyedWorkItems.elementsWorkItem(kv.getKey(), + Collections.singletonList(element.withValue(kv.getValue()))); + getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem)); + checkAndFinishBundle(); + } catch (final Exception e) { + e.printStackTrace(); + throw new RuntimeException("Exception triggered element " + element.toString()); + } } /** @@ -183,8 +186,8 @@ protected void beforeClose() { * @param watermark watermark */ private void triggerTimers(final Instant processingTime, - final Instant synchronizedTime, - final Watermark watermark) { + final Instant synchronizedTime, + final Watermark watermark) { final Iterator> iter = inMemoryTimerInternalsFactory.getTimerInternalsMap().entrySet().iterator(); while (iter.hasNext()) { @@ -259,6 +262,12 @@ private void emitOutputWatermark() { } } + /** Accessor for isPartialCombining. */ + public boolean getIsPartialCombining() { + return isPartialCombining; + } + + /** Wrapper class for {@link OutputCollector}. */ public class GBKOutputCollector implements OutputCollector>> { private final OutputCollector>> oc; @@ -278,9 +287,9 @@ public void emit(final WindowedValue> output) { (InMemoryTimerInternals) inMemoryTimerInternalsFactory.timerInternalsForKey(key); // Add the output timestamp to the watermark hold of each key. // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999. - keyOutputWatermarkMap.put(key, - new Watermark(output.getTimestamp().getMillis() + 1)); - timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1)); + keyOutputWatermarkMap.put(key, + new Watermark(output.getTimestamp().getMillis() + 1)); + timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1)); } oc.emit(output); } diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java index 9a9893213a..45933b0d37 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java @@ -161,7 +161,8 @@ public void test_combine() { PipelineOptionsFactory.as(NemoPipelineOptions.class), SystemReduceFn.combining(STRING_CODER, applied_combine_fn), DoFnSchemaInformation.create(), - DisplayData.none()); + DisplayData.none(), + false); // window1 : [-5000, 5000) in millisecond // window2 : [0, 10000) @@ -288,7 +289,8 @@ public void test_combine_lateData() { PipelineOptionsFactory.as(NemoPipelineOptions.class), SystemReduceFn.combining(STRING_CODER, applied_combine_fn), DoFnSchemaInformation.create(), - DisplayData.none()); + DisplayData.none(), + false); // window1 : [-5000, 5000) in millisecond // window2 : [0, 10000) @@ -381,7 +383,8 @@ public void test_gbk() { PipelineOptionsFactory.as(NemoPipelineOptions.class), SystemReduceFn.buffering(STRING_CODER), DoFnSchemaInformation.create(), - DisplayData.none()); + DisplayData.none(), + false); final Instant ts1 = new Instant(1); final Instant ts2 = new Instant(100); @@ -567,7 +570,8 @@ public void test_gbk_eventTimeTrigger() { PipelineOptionsFactory.as(NemoPipelineOptions.class), SystemReduceFn.buffering(STRING_CODER), DoFnSchemaInformation.create(), - DisplayData.none()); + DisplayData.none(), + false); final Transform.Context context = mock(Transform.Context.class);