Skip to content

Commit

Permalink
[BEAM-4429] TypeHint changed to TypeDescriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
mareksimunek committed Jun 20, 2018
1 parent f6b1783 commit cef3e44
Show file tree
Hide file tree
Showing 23 changed files with 159 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@

import java.io.Serializable;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.audience.Audience;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeHint;
import org.apache.beam.sdk.values.TypeDescriptor;


/** @param <T> */
@Audience(Audience.Type.INTERNAL)
public interface TypeHintAware<T> extends Serializable {
public interface TypeDescriptorAware<T> extends Serializable {

/**
* Retrieve type hint associated with this object. Mostly this represents type returned by
* function.
*
* @return {@link TypeHint} associated with this object
* @return {@link TypeDescriptor} associated with this object
*/
TypeHint<T> getTypeHint();
TypeDescriptor<T> getTypeDescriptor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.windowing.WindowingDesc;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.executor.graph.DAG;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
Expand Down Expand Up @@ -162,7 +162,7 @@ public <K> WindowingBuilder<InputT, K> keyBy(UnaryFunction<InputT, K> keyExtract

@Override
public <K> WindowingBuilder<InputT, K> keyBy(
UnaryFunction<InputT, K> keyExtractor, TypeHint<K> typeHint) {
UnaryFunction<InputT, K> keyExtractor, TypeDescriptor<K> typeHint) {
return keyBy(TypeAwareUnaryFunction.of(keyExtractor, typeHint));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.ElementWiseOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeHint;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* A transformation of a dataset from one type into another allowing user code to generate zero,
Expand Down Expand Up @@ -187,8 +187,8 @@ public <OutputT> EventTimeBuilder<InputT, OutputT> using(
}

public <OutputT> EventTimeBuilder<InputT, OutputT> using(
UnaryFunctor<InputT, OutputT> functor, TypeHint<OutputT> outputTypeHint) {
return using(TypeAwareUnaryFunctor.of(functor, outputTypeHint));
UnaryFunctor<InputT, OutputT> functor, TypeDescriptor<OutputT> outputTypeDescriptor) {
return using(TypeAwareUnaryFunctor.of(functor, outputTypeDescriptor));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join.Type;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareBinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeHint;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* Full outer join of two input datasets producing single new dataset.
Expand Down Expand Up @@ -121,9 +121,9 @@ public <K> UsingBuilder<LeftT, RightT, K> by(

public <K> UsingBuilder<LeftT, RightT, K> by(
UnaryFunction<LeftT, K> leftKeyExtractor, UnaryFunction<RightT, K> rightKeyExtractor,
TypeHint<K> keyTypeHint) {
return by(TypeAwareUnaryFunction.of(leftKeyExtractor, keyTypeHint),
TypeAwareUnaryFunction.of(rightKeyExtractor, keyTypeHint));
TypeDescriptor<K> keyTypeDescriptor) {
return by(TypeAwareUnaryFunction.of(leftKeyExtractor, keyTypeDescriptor),
TypeAwareUnaryFunction.of(rightKeyExtractor, keyTypeDescriptor));
}
}

Expand Down Expand Up @@ -155,7 +155,7 @@ public <OutputT> Join.WindowingBuilder<LeftT, RightT, K, OutputT> using(

public <OutputT> Join.WindowingBuilder<LeftT, RightT, K, OutputT> using(
BinaryFunctor<Optional<LeftT>, Optional<RightT>, OutputT> joinFunc,
TypeHint<OutputT> outputTypeHint) {
TypeDescriptor<OutputT> outputTypeDescriptor) {
Objects.requireNonNull(joinFunc);

@SuppressWarnings("unchecked")
Expand All @@ -166,7 +166,7 @@ public <OutputT> Join.WindowingBuilder<LeftT, RightT, K, OutputT> using(
TypeAwareBinaryFunctor.of(
(left, right, context) ->
joinFunc.apply(Optional.ofNullable(left), Optional.ofNullable(right), context),
outputTypeHint);
outputTypeDescriptor);

return new Join.WindowingBuilder<>(paramsCasted);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.windowing.WindowingDesc;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareBinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Either;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.executor.graph.DAG;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
Expand Down Expand Up @@ -313,9 +313,9 @@ public <K> UsingBuilder<LeftT, RightT, K> by(

public <K> UsingBuilder<LeftT, RightT, K> by(
UnaryFunction<LeftT, K> leftKeyExtractor, UnaryFunction<RightT, K> rightKeyExtractor,
TypeHint<K> keyTypeHint) {
return by(TypeAwareUnaryFunction.of(leftKeyExtractor, keyTypeHint),
TypeAwareUnaryFunction.of(rightKeyExtractor, keyTypeHint));
TypeDescriptor<K> keyTypeDescriptor) {
return by(TypeAwareUnaryFunction.of(leftKeyExtractor, keyTypeDescriptor),
TypeAwareUnaryFunction.of(rightKeyExtractor, keyTypeDescriptor));
}
}

Expand Down Expand Up @@ -344,8 +344,9 @@ public <OutputT> Join.WindowingBuilder<LeftT, RightT, K, OutputT> using(
}

public <OutputT> Join.WindowingBuilder<LeftT, RightT, K, OutputT> using(
BinaryFunctor<LeftT, RightT, OutputT> joinFunc, TypeHint<OutputT> outputTypeHint) {
return using(TypeAwareBinaryFunctor.of(joinFunc, outputTypeHint));
BinaryFunctor<LeftT, RightT, OutputT> joinFunc,
TypeDescriptor<OutputT> outputTypeDescriptor) {
return using(TypeAwareBinaryFunctor.of(joinFunc, outputTypeDescriptor));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join.Type;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareBinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeHint;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* Left outer join of two input datasets producing single new dataset.
Expand Down Expand Up @@ -131,9 +131,9 @@ public <K> UsingBuilder<LeftT, RightT, K> by(

public <K> UsingBuilder<LeftT, RightT, K> by(
UnaryFunction<LeftT, K> leftKeyExtractor, UnaryFunction<RightT, K> rightKeyExtractor,
TypeHint<K> keyTypeHint) {
return by(TypeAwareUnaryFunction.of(leftKeyExtractor, keyTypeHint),
TypeAwareUnaryFunction.of(rightKeyExtractor, keyTypeHint));
TypeDescriptor<K> keyTypeDescriptor) {
return by(TypeAwareUnaryFunction.of(leftKeyExtractor, keyTypeDescriptor),
TypeAwareUnaryFunction.of(rightKeyExtractor, keyTypeDescriptor));
}
}

Expand Down Expand Up @@ -165,7 +165,7 @@ public <OutputT> Join.WindowingBuilder<LeftT, RightT, K, OutputT> using(

public <OutputT> Join.WindowingBuilder<LeftT, RightT, K, OutputT> using(
BinaryFunctor<LeftT, Optional<RightT>, OutputT> joinFunc,
TypeHint<OutputT> outputTypeHint) {
TypeDescriptor<OutputT> outputTypeDescriptor) {
Objects.requireNonNull(joinFunc);

@SuppressWarnings("unchecked")
Expand All @@ -175,7 +175,7 @@ public <OutputT> Join.WindowingBuilder<LeftT, RightT, K, OutputT> using(
paramsCasted.joinFunc =
TypeAwareBinaryFunctor.of(
(left, right, context) -> joinFunc.apply(left, Optional.ofNullable(right), context),
outputTypeHint);
outputTypeDescriptor);

return new Join.WindowingBuilder<>(paramsCasted);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.windowing.WindowingDesc;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareReduceFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.executor.graph.DAG;
import org.apache.beam.sdk.extensions.euphoria.core.executor.util.SingleValueContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
Expand Down Expand Up @@ -261,9 +261,10 @@ default <OutputT> WithSortedValuesBuilder<InputT, K, V, OutputT> reduceBy(
}

default <OutputT> WithSortedValuesBuilder<InputT, K, V, OutputT> reduceBy(
ReduceFunction<V, OutputT> reducer, TypeHint<OutputT> outputTypeHint) {
ReduceFunction<V, OutputT> reducer, TypeDescriptor<OutputT> outputTypeDescriptor) {
return reduceBy(
(Stream<V> in, Collector<OutputT> ctx) -> ctx.collect(reducer.apply(in)), outputTypeHint);
(Stream<V> in, Collector<OutputT> ctx) ->
ctx.collect(reducer.apply(in)), outputTypeDescriptor);
}

/**
Expand All @@ -279,8 +280,8 @@ <OutputT> WithSortedValuesBuilder<InputT, K, V, OutputT> reduceBy(
ReduceFunctor<V, OutputT> reducer);

default <OutputT> WithSortedValuesBuilder<InputT, K, V, OutputT> reduceBy(
ReduceFunctor<V, OutputT> reducer, TypeHint<OutputT> outputTypeHint) {
return reduceBy(TypeAwareReduceFunctor.of(reducer, outputTypeHint));
ReduceFunctor<V, OutputT> reducer, TypeDescriptor<OutputT> outputTypeDescriptor) {
return reduceBy(TypeAwareReduceFunctor.of(reducer, outputTypeDescriptor));
}

/**
Expand All @@ -296,7 +297,7 @@ default WindowByBuilder<InputT, K, V, V> combineBy(CombinableReduceFunction<V> r
}

default WindowByBuilder<InputT, K, V, V> combineBy(
CombinableReduceFunction<V> reducer, TypeHint<V> typeHint) {
CombinableReduceFunction<V> reducer, TypeDescriptor<V> typeHint) {
return reduceBy(TypeAwareReduceFunctor.of(toReduceFunctor(reducer), typeHint));
}
}
Expand Down Expand Up @@ -360,7 +361,7 @@ public <K> ValueByReduceByBuilder<InputT, K> keyBy(UnaryFunction<InputT, K> keyE

@Override
public <K> ValueByReduceByBuilder<InputT, K> keyBy(
UnaryFunction<InputT, K> keyExtractor, TypeHint<K> typeHint) {
UnaryFunction<InputT, K> keyExtractor, TypeDescriptor<K> typeHint) {
return keyBy(TypeAwareUnaryFunction.of(keyExtractor, typeHint));
}
}
Expand Down Expand Up @@ -397,7 +398,7 @@ public <V> ReduceByCombineByBuilder<InputT, K, V> valueBy(
}

public <V> ReduceByCombineByBuilder<InputT, K, V> valueBy(
UnaryFunction<InputT, V> valueExtractor, TypeHint<V> typeHint) {
UnaryFunction<InputT, V> valueExtractor, TypeDescriptor<V> typeHint) {
return valueBy(TypeAwareUnaryFunction.of(valueExtractor, typeHint));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.StateMerger;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.windowing.WindowingDesc;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
Expand Down Expand Up @@ -250,14 +250,14 @@ public <K> ValueByBuilder<InputT, K> keyBy(UnaryFunction<InputT, K> keyExtractor
}

public <K> ValueByBuilder<InputT, K> keyBy(
UnaryFunction<InputT, K> keyExtractor, TypeHint<K> typeHint) {
UnaryFunction<InputT, K> keyExtractor, TypeDescriptor<K> typeHint) {

return keyBy(TypeAwareUnaryFunction.of(keyExtractor, typeHint));
}

public <K> ValueByBuilder<InputT, K> keyBy(
UnaryFunction<InputT, K> keyExtractor, Class<K> typeHint) {
return keyBy(TypeAwareUnaryFunction.of(keyExtractor, TypeHint.of(typeHint)));
return keyBy(TypeAwareUnaryFunction.of(keyExtractor, TypeDescriptor.of(typeHint)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join.Type;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareBinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeHint;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* Right outer join of two input datasets producing single new dataset.
Expand Down Expand Up @@ -125,9 +125,9 @@ public <K> UsingBuilder<LeftT, RightT, K> by(

public <K> UsingBuilder<LeftT, RightT, K> by(
UnaryFunction<LeftT, K> leftKeyExtractor, UnaryFunction<RightT, K> rightKeyExtractor,
TypeHint<K> keyTypeHint) {
return by(TypeAwareUnaryFunction.of(leftKeyExtractor, keyTypeHint),
TypeAwareUnaryFunction.of(rightKeyExtractor, keyTypeHint));
TypeDescriptor<K> keyTypeDescriptor) {
return by(TypeAwareUnaryFunction.of(leftKeyExtractor, keyTypeDescriptor),
TypeAwareUnaryFunction.of(rightKeyExtractor, keyTypeDescriptor));
}
}

Expand Down Expand Up @@ -160,7 +160,7 @@ public <OutputT> Join.WindowingBuilder<LeftT, RightT, K, OutputT> using(

public <OutputT> Join.WindowingBuilder<LeftT, RightT, K, OutputT> using(
BinaryFunctor<Optional<LeftT>, RightT, OutputT> joinFunc,
TypeHint<OutputT> outputTypeHint) {
TypeDescriptor<OutputT> outputTypeDescriptor) {
Objects.requireNonNull(joinFunc);

@SuppressWarnings("unchecked")
Expand All @@ -170,7 +170,7 @@ public <OutputT> Join.WindowingBuilder<LeftT, RightT, K, OutputT> using(
paramsCasted.joinFunc =
TypeAwareBinaryFunctor.of(
(left, right, context) -> joinFunc.apply(Optional.ofNullable(left), right, context),
outputTypeHint);
outputTypeDescriptor);

return new Join.WindowingBuilder<>(paramsCasted);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.windowing.WindowingDesc;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
import org.apache.beam.sdk.extensions.euphoria.core.executor.graph.DAG;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
Expand Down Expand Up @@ -206,7 +207,7 @@ public <K> ValueByWindowByBuilder<InputT, K> keyBy(UnaryFunction<InputT, K> keyE

@Override
public <K> ValueByWindowByBuilder<InputT, K> keyBy(
UnaryFunction<InputT, K> keyExtractor, TypeHint<K> typeHint) {
UnaryFunction<InputT, K> keyExtractor, TypeDescriptor<K> typeHint) {
return keyBy(TypeAwareUnaryFunction.of(keyExtractor, typeHint));
}
}
Expand All @@ -226,7 +227,8 @@ public static class ValueByWindowByBuilder<InputT, K>
}

public WindowByBuilder<InputT, K> valueBy(UnaryFunction<InputT, Long> valueExtractor) {
params.valueExtractor = TypeAwareUnaryFunction.of(valueExtractor, TypeHint.ofLong());
params.valueExtractor =
TypeAwareUnaryFunction.of(valueExtractor, TypeDescriptors.longs());
return new WindowByBuilder<>(params);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorageDescriptor;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.windowing.WindowingDesc;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
import org.apache.beam.sdk.extensions.euphoria.core.executor.graph.DAG;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
Expand Down Expand Up @@ -287,7 +287,7 @@ public <K> ValueByBuilder<InputT, K> keyBy(UnaryFunction<InputT, K> keyFn) {

@Override
public <K> ValueByBuilder<InputT, K> keyBy(
UnaryFunction<InputT, K> keyExtractor, TypeHint<K> typeHint) {
UnaryFunction<InputT, K> keyExtractor, TypeDescriptor<K> typeHint) {
return keyBy(TypeAwareUnaryFunction.of(keyExtractor, typeHint));
}
}
Expand All @@ -313,8 +313,8 @@ public <V> ScoreByBuilder<InputT, K, V> valueBy(UnaryFunction<InputT, V> valueFn
}

public <V> ScoreByBuilder<InputT, K, V> valueBy(
UnaryFunction<InputT, V> valueFn, TypeHint<V> valueTypeHint) {
return valueBy(TypeAwareUnaryFunction.of(valueFn, valueTypeHint));
UnaryFunction<InputT, V> valueFn, TypeDescriptor<V> valueTypeDescriptor) {
return valueBy(TypeAwareUnaryFunction.of(valueFn, valueTypeDescriptor));
}
}

Expand Down
Loading

0 comments on commit cef3e44

Please sign in to comment.