From eac573d48daf37e95fcf59537aff70f03f3cd97e Mon Sep 17 00:00:00 2001 From: Vaclav Plajt Date: Thu, 21 Jun 2018 16:00:36 +0200 Subject: [PATCH] [BEAM-4609] Race condition was removed from translation of combinable `ReduceByKey` operator. --- .../euphoria/core/translate/ReduceByKeyTranslator.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java index f7e1f3f07eee..82b0f9aef87b 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/ReduceByKeyTranslator.java @@ -120,8 +120,9 @@ private static SerializableFunction, InputT> @SuppressWarnings("unchecked") final ReduceFunctor combiner = (ReduceFunctor) reducer; - final SingleValueCollector collector = new SingleValueCollector<>(); + return (Iterable input) -> { + SingleValueCollector collector = new SingleValueCollector<>(); combiner.apply(StreamSupport.stream(input.spliterator(), false), collector); return collector.get(); }; @@ -153,8 +154,8 @@ public void processElement(ProcessContext ctx) { } /** - * Translation of {@link Collector} collect to Beam's context output. OperatorName serve - * as namespace for Beam's metrics. + * Translation of {@link Collector} collect to Beam's context output. OperatorName serve as + * namespace for Beam's metrics. */ private static class Collector implements DoFnCollector.BeamCollector>, Pair, OutT> {