Skip to content

Commit

Permalink
apache#13 [euphoria-flink] Batch executor uses hash code as a key in …
Browse files Browse the repository at this point in the history
…shuffles (IntComparator performs much better)
  • Loading branch information
vanekjar committed Apr 27, 2017
1 parent dc80db9 commit 7f0da3e
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,21 @@
import cz.seznam.euphoria.flink.functions.PartitionerWrapper;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions;
import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Iterables;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class ReduceByKeyTranslator implements BatchOperatorTranslator<ReduceByKey> {

Expand Down Expand Up @@ -80,8 +86,6 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
// ~ extract key/value from input elements and assign windows
DataSet<BatchElement<Window, Pair>> tuples;
{
// FIXME require keyExtractor to deliver `Comparable`s

ExtractEventTime timeAssigner = origOperator.getEventTimeAssigner();
FlatMapOperator<Object, BatchElement<Window, Pair>> wAssigned =
input.flatMap((i, c) -> {
Expand Down Expand Up @@ -109,7 +113,9 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
// ~ reduce the data now
Operator<BatchElement<Window, Pair>, ?> reduced =
tuples.groupBy(new RBKKeySelector())
.reduce(new RBKReducer(reducer))
.combineGroup(new RBKReducer(reducer))
.groupBy(new RBKKeySelector())
.reduceGroup(new RBKReducer(reducer))
.setParallelism(operator.getParallelism())
.name(operator.getName() + "::reduce");

Expand All @@ -135,22 +141,32 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
// ------------------------------------------------------------------------------

/**
* Produces Tuple2[Window, Element Key]
* Grouping by key hashcode will effectively group elements with the same key
* to the one bucket. Although there may occur some collisions that we need
* to be aware of in later processing.
* <p>
* Produces Tuple2[Window, Element Key].hashCode
*/
@SuppressWarnings("unchecked")
static class RBKKeySelector
implements KeySelector<BatchElement<Window, Pair>, Tuple2<Comparable, Comparable>> {
implements KeySelector<BatchElement<Window, Pair>, Integer> {

private final Tuple2 tuple = new Tuple2();

@Override
public Tuple2<Comparable, Comparable> getKey(
public Integer getKey(
BatchElement<Window, Pair> value) {

return new Tuple2(value.getWindow(), value.getElement().getFirst());
tuple.f0 = value.getWindow();
tuple.f1 = value.getElement().getFirst();
return tuple.hashCode();
}
}

static class RBKReducer
implements ReduceFunction<BatchElement<Window, Pair>> {
implements GroupReduceFunction<BatchElement<Window, Pair>, BatchElement<Window, Pair>>,
GroupCombineFunction<BatchElement<Window, Pair>, BatchElement<Window, Pair>>,
ResultTypeQueryable<BatchElement<Window, Pair>> {

final UnaryFunction<Iterable, Object> reducer;

Expand All @@ -159,16 +175,52 @@ static class RBKReducer
}

@Override
public BatchElement<Window, Pair>
reduce(BatchElement<Window, Pair> p1, BatchElement<Window, Pair> p2) {

Window wid = p1.getWindow();
return new BatchElement<>(
wid,
Math.max(p1.getTimestamp(), p2.getTimestamp()),
Pair.of(
p1.getElement().getFirst(),
reducer.apply(Arrays.asList(p1.getElement().getSecond(), p2.getElement().getSecond()))));
public void combine(Iterable<BatchElement<Window, Pair>> values, Collector<BatchElement<Window, Pair>> out) {
doReduce(values, out);
}

@Override
public void reduce(Iterable<BatchElement<Window, Pair>> values, Collector<BatchElement<Window, Pair>> out) {
doReduce(values, out);
}

private void doReduce(Iterable<BatchElement<Window, Pair>> values,
org.apache.flink.util.Collector<BatchElement<Window, Pair>> out) {

// Tuple2[Window, Key] => Reduced Value
Map<Tuple2, BatchElement<Window, Pair>> reducedValues = new HashMap<>();

Tuple2 kw = new Tuple2();
for (BatchElement<Window, Pair> batchElement : values) {
Object key = batchElement.getElement().getFirst();
Window window = batchElement.getWindow();

kw.f0 = window;
kw.f1 = key;
BatchElement<Window, Pair> val = reducedValues.get(kw);
if (val == null) {
reducedValues.put(kw, batchElement);
} else {
Object reduced =
reducer.apply(Arrays.asList(val.getElement().getSecond(),
batchElement.getElement().getSecond()));

reducedValues.put(kw, new BatchElement<>(
window,
Math.max(val.getTimestamp(), batchElement.getTimestamp()),
Pair.of(key, reduced)));
}
}

for (Map.Entry<Tuple2, BatchElement<Window, Pair>> e : reducedValues.entrySet()) {
out.collect(e.getValue());
}
}

@Override
@SuppressWarnings("unchecked")
public TypeInformation<BatchElement<Window, Pair>> getProducedType() {
return TypeInformation.of((Class) BatchElement.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;

import java.util.HashMap;
import java.util.Map;

public class ReduceStateByKeyTranslator implements BatchOperatorTranslator<ReduceStateByKey> {

final StorageProvider stateStorageProvider;
Expand Down Expand Up @@ -70,7 +73,6 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
// ~ extract key/value + timestamp from input elements and assign windows
ExtractEventTime timeAssigner = origOperator.getEventTimeAssigner();

// FIXME require keyExtractor to deliver `Comparable`s
DataSet<BatchElement> wAssigned =
input.flatMap((i, c) -> {
BatchElement wel = (BatchElement) i;
Expand Down Expand Up @@ -98,8 +100,12 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
Utils.wrapQueryable(
// ~ FIXME if the underlying windowing is "non merging" we can group by
// "key _and_ window", thus, better utilizing the available resources
(BatchElement<?, Pair> we) -> (Comparable) we.getElement().getFirst(),
Comparable.class))

// ~ Grouping by key hashcode will effectively group elements with the same key
// to the one bucket. Although there may occur some collisions that we need
// to be aware of in later processing.
(BatchElement<?, Pair> we) -> we.getElement().getFirst().hashCode(),
Integer.class))
.sortGroup(Utils.wrapQueryable(
(KeySelector<BatchElement<?, ?>, Long>)
BatchElement::getTimestamp, Long.class),
Expand All @@ -125,14 +131,16 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,

static class RSBKReducer
implements GroupReduceFunction<BatchElement<?, Pair>, BatchElement<?, Pair>>,
ResultTypeQueryable<BatchElement<?, Pair>>
{
ResultTypeQueryable<BatchElement<?, Pair>> {
private final StateFactory<?, ?, State<?, ?>> stateFactory;
private final StateMerger<?, ?, State<?, ?>> stateCombiner;
private final StorageProvider stateStorageProvider;
private final Windowing windowing;
private final Trigger trigger;

// mapping of [Key -> GroupReducer]
private transient Map<Object, GroupReducer> activeReducers;

@SuppressWarnings("unchecked")
RSBKReducer(
ReduceStateByKey operator,
Expand All @@ -151,18 +159,36 @@ static class RSBKReducer
public void reduce(Iterable<BatchElement<?, Pair>> values,
org.apache.flink.util.Collector<BatchElement<?, Pair>> out)
{
GroupReducer reducer = new GroupReducer(
stateFactory,
stateCombiner,
stateStorageProvider,
BatchElement::new,
windowing,
trigger,
elem -> out.collect((BatchElement) elem));
for (BatchElement value : values) {
reducer.process(value);
activeReducers = new HashMap<>();
for (BatchElement<?, Pair> batchElement : values) {
Object key = batchElement.getElement().getKey();

GroupReducer reducer = activeReducers.get(key);
if (reducer == null) {
reducer = new GroupReducer(
stateFactory,
stateCombiner,
stateStorageProvider,
BatchElement::new,
windowing,
trigger,
elem -> out.collect((BatchElement) elem));

activeReducers.put(key, reducer);
}

reducer.process(batchElement);
}

flushStates();
}

private void flushStates() {
for (Map.Entry<Object, GroupReducer> e : activeReducers.entrySet()) {
GroupReducer reducer = e.getValue();
reducer.close();
}
reducer.close();
activeReducers.clear();
}

@Override
Expand Down

0 comments on commit 7f0da3e

Please sign in to comment.