Skip to content

Commit

Permalink
[NEMO-460] Setting coders in CombinePerKey transformation (#303)
Browse files Browse the repository at this point in the history
JIRA: [NEMO-460: Setting coders in CombinePerKey transformation](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-460)

**Major changes:**
- Added the additional parameter "inputCoder" for GBKTransform constructor.
- Fixed the input coder and the output coder for the partial combine transform and the final combine transform.

**Minor changes to note:**
- Fixed the main output TupleTags for the partial combine transform and the final combine transform.

**Tests for the changes:**
- Current tests suffice.

**Other comments:**
- This needs to be merged after merging #302

Closes #303
  • Loading branch information
jaehwan0214 authored Oct 18, 2020
1 parent e325087 commit 3d46caf
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,11 @@ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslato
KvCoder.of(inputCoder.getKeyCoder(),
accumulatorCoder),
null, mainInput.getWindowingStrategy()));
final TupleTag<?> partialMainOutputTag = new TupleTag<>();
final GBKTransform partialCombineStreamTransform =
new GBKTransform(
getOutputCoders(pTransform),
new TupleTag<>(),
new GBKTransform(inputCoder,
Collections.singletonMap(partialMainOutputTag, KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder)),
partialMainOutputTag,
mainInput.getWindowingStrategy(),
ctx.getPipelineOptions(),
partialSystemReduceFn,
Expand All @@ -418,9 +419,9 @@ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslato
true);

final GBKTransform finalCombineStreamTransform =
new GBKTransform(
new GBKTransform(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
getOutputCoders(pTransform),
new TupleTag<>(),
Iterables.getOnlyElement(beamNode.getOutputs().keySet()),
mainInput.getWindowingStrategy(),
ctx.getPipelineOptions(),
finalSystemReduceFn,
Expand Down Expand Up @@ -556,14 +557,15 @@ private static Transform createGBKTransform(
final AppliedPTransform<?, ?, ?> pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
final PCollection<?> mainInput = (PCollection<?>)
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
final TupleTag mainOutputTag = new TupleTag<>();
final TupleTag mainOutputTag = Iterables.getOnlyElement(beamNode.getOutputs().keySet());

if (isGlobalWindow(beamNode, ctx.getPipeline())) {
// GroupByKey Transform when using a global windowing strategy.
return new GroupByKeyTransform();
} else {
// GroupByKey Transform when using a non-global windowing strategy.
return new GBKTransform<>(
(KvCoder) mainInput.getCoder(),
getOutputCoders(pTransform),
mainOutputTag,
mainInput.getWindowingStrategy(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public final class GBKTransform<K, InputT, OutputT>
private transient OutputCollector originOc;
private final boolean isPartialCombining;

public GBKTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
public GBKTransform(final Coder<KV<K, InputT>> inputCoder,
final Map<TupleTag<?>, Coder<?>> outputCoders,
final TupleTag<KV<K, OutputT>> mainOutputTag,
final WindowingStrategy<?, ?> windowingStrategy,
final PipelineOptions options,
Expand All @@ -67,7 +68,7 @@ public GBKTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
final DisplayData displayData,
final boolean isPartialCombining) {
super(null,
null,
inputCoder,
outputCoders,
mainOutputTag,
Collections.emptyList(), /* no additional outputs */
Expand Down Expand Up @@ -278,7 +279,7 @@ public GBKOutputCollector(final OutputCollector oc) {

/** Emit output. If {@param output} is emitted on-time, save its timestamp in the output watermark map. */
@Override
public void emit(final WindowedValue<KV<K, OutputT>> output) {
public final void emit(final WindowedValue<KV<K, OutputT>> output) {
// The watermark advances only in ON_TIME
if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
KV<K, OutputT> value = output.getValue();
Expand All @@ -296,13 +297,13 @@ public void emit(final WindowedValue<KV<K, OutputT>> output) {

/** Emit watermark. */
@Override
public void emitWatermark(final Watermark watermark) {
public final void emitWatermark(final Watermark watermark) {
oc.emitWatermark(watermark);
}

/** Emit output value to {@param dstVertexId}. */
@Override
public <T> void emit(final String dstVertexId, final T output) {
public final <T> void emit(final String dstVertexId, final T output) {
oc.emit(dstVertexId, output);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.nemo.compiler.frontend.beam.transform;

import com.google.common.collect.Iterables;
import junit.framework.TestCase;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.coders.*;
Expand All @@ -41,15 +42,12 @@

import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.*;
import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

public class GBKTransformTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(GBKTransformTest.class.getName());
private final static Coder STRING_CODER = StringUtf8Coder.of();
private final static Coder INTEGER_CODER = BigEndianIntegerCoder.of();
private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;

private void checkOutput(final KV<String, Integer> expected, final KV<String, Integer> result) {
// check key
Expand Down Expand Up @@ -155,7 +153,8 @@ public void test_combine() {

final GBKTransform<String, Integer, Integer> combine_transform =
new GBKTransform(
NULL_OUTPUT_CODERS,
KvCoder.of(STRING_CODER, INTEGER_CODER),
Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, INTEGER_CODER)),
outputTag,
WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES),
PipelineOptionsFactory.as(NemoPipelineOptions.class),
Expand Down Expand Up @@ -283,7 +282,8 @@ public void test_combine_lateData() {

final GBKTransform<String, Integer, Integer> combine_transform =
new GBKTransform(
NULL_OUTPUT_CODERS,
KvCoder.of(STRING_CODER, INTEGER_CODER),
Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, INTEGER_CODER)),
outputTag,
WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES).withAllowedLateness(lateness),
PipelineOptionsFactory.as(NemoPipelineOptions.class),
Expand Down Expand Up @@ -377,7 +377,8 @@ public void test_gbk() {

final GBKTransform<String, String, Iterable<String>> doFnTransform =
new GBKTransform(
NULL_OUTPUT_CODERS,
KvCoder.of(STRING_CODER, STRING_CODER),
Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, IterableCoder.of(STRING_CODER))),
outputTag,
WindowingStrategy.of(slidingWindows),
PipelineOptionsFactory.as(NemoPipelineOptions.class),
Expand Down Expand Up @@ -562,7 +563,8 @@ public void test_gbk_eventTimeTrigger() {

final GBKTransform<String, String, Iterable<String>> doFnTransform =
new GBKTransform(
NULL_OUTPUT_CODERS,
KvCoder.of(STRING_CODER, STRING_CODER),
Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, IterableCoder.of(STRING_CODER))),
outputTag,
WindowingStrategy.of(window).withTrigger(trigger)
.withMode(ACCUMULATING_FIRED_PANES)
Expand Down

0 comments on commit 3d46caf

Please sign in to comment.