From aac672ea92a267cf91e9efc36115ee562c41e09a Mon Sep 17 00:00:00 2001 From: twosom Date: Wed, 25 Sep 2024 00:57:54 +0900 Subject: [PATCH 1/5] Optimize to skip filter application when there is only a single output --- .../translation/TransformTranslator.java | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 5dc553faab5be..6ab4f79787ebd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -74,6 +74,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.spark.HashPartitioner; @@ -448,7 +449,7 @@ public void evaluate( } Map, PCollection> outputs = context.getOutputs(transform); - if (outputs.size() > 1) { + if (hasMultipleOutputs(outputs)) { StorageLevel level = StorageLevel.fromString(context.storageLevel()); if (canAvoidRddSerialization(level)) { // if it is memory only reduce the overhead of moving to bytes @@ -463,17 +464,28 @@ public void evaluate( .persist(level) .mapToPair(TranslationUtils.getTupleTagDecodeFunction(coderMap)); } - } - for (Map.Entry, PCollection> output : outputs.entrySet()) { - JavaPairRDD, WindowedValue> filtered = - all.filter(new TranslationUtils.TupleTagFilter(output.getKey())); - // Object is the best we can do since different outputs can have different tags + + for (Map.Entry, PCollection> output : outputs.entrySet()) { + JavaPairRDD, WindowedValue> filtered = + all.filter(new TranslationUtils.TupleTagFilter(output.getKey())); + // Object is the best we can do since different outputs can have different tags + JavaRDD> values = + (JavaRDD>) (JavaRDD) filtered.values(); + context.putDataset(output.getValue(), new BoundedDataset<>(values)); + } + } else { JavaRDD> values = - (JavaRDD>) (JavaRDD) filtered.values(); - context.putDataset(output.getValue(), new BoundedDataset<>(values)); + (JavaRDD>) (JavaRDD) all.values(); + context.putDataset( + Iterables.getOnlyElement(outputs.entrySet()).getValue(), + new BoundedDataset<>(values)); } } + private boolean hasMultipleOutputs(Map, PCollection> outputs) { + return outputs.size() > 1; + } + @Override public String toNativeString() { return "mapPartitions(new ())"; From 48c9b807a73015709bff90e6545177a37c48afce Mon Sep 17 00:00:00 2001 From: twosom Date: Wed, 25 Sep 2024 00:59:41 +0900 Subject: [PATCH 2/5] Make SparkTransformOverrides class public for testing --- .../org/apache/beam/runners/spark/SparkTransformOverrides.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkTransformOverrides.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkTransformOverrides.java index 52cf7ba6f2fbc..5bab8e58098ea 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkTransformOverrides.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkTransformOverrides.java @@ -31,7 +31,7 @@ @SuppressWarnings({ "rawtypes" // TODO(https://github.com/apache/beam/issues/20447) }) -class SparkTransformOverrides { +public final class SparkTransformOverrides { public static List getDefaultOverrides(boolean streaming) { ImmutableList.Builder builder = ImmutableList.builder(); // TODO: [https://github.com/apache/beam/issues/19107] Support @RequiresStableInput on Spark From ed74b718f97505fd04a67bbc25f85356d1ecba1b Mon Sep 17 00:00:00 2001 From: twosom Date: Wed, 25 Sep 2024 01:20:43 +0900 Subject: [PATCH 3/5] add related test --- .../spark/translation/PassThrough.java | 91 ++++++++++++++++++ .../runners/spark/translation/RDDNode.java | 65 +++++++++++++ .../spark/translation/RDDTreeParser.java | 55 +++++++++++ .../translation/TransformTranslatorTest.java | 96 ++++++++++++++++++- 4 files changed, 305 insertions(+), 2 deletions(-) create mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/translation/PassThrough.java create mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/translation/RDDNode.java create mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/translation/RDDTreeParser.java diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/PassThrough.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/PassThrough.java new file mode 100644 index 0000000000000..b01c8cf3a0663 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/PassThrough.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +public class PassThrough { + + public static SingleOutput ofSingleOutput(Coder inputCoder) { + return new SingleOutput<>(inputCoder); + } + + public static MultipleOutput ofMultipleOutput( + TupleTag tag1, TupleTag tag2) { + return new MultipleOutput<>(tag1, tag2); + } + + public static class SingleOutput + extends PTransform, PCollection> { + private final Coder inputCoder; + + public SingleOutput(Coder inputCoder) { + this.inputCoder = inputCoder; + } + + @Override + public PCollection expand(PCollection input) { + return input + .apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void process(@Element InputT input, OutputReceiver output) { + output.output(input); + } + })) + .setCoder(inputCoder); + } + } + + public static class MultipleOutput + extends PTransform, PCollectionTuple> { + + private final TupleTag tag1; + private final TupleTag tag2; + + public MultipleOutput(TupleTag tag1, TupleTag tag2) { + this.tag1 = tag1; + this.tag2 = tag2; + } + + @Override + public PCollectionTuple expand(PCollection input) { + return input.apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void process(@Element InputT input, MultiOutputReceiver output) { + if (input.hashCode() % 2 == 0) { + output.get(tag1).output(input); + } else { + output.get(tag2).output(input); + } + } + }) + .withOutputTags(tag1, TupleTagList.of(tag2))); + } + } +} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/RDDNode.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/RDDNode.java new file mode 100644 index 0000000000000..a85340410307c --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/RDDNode.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +class RDDNode { + private final int id; + private final String name; + private final String operator; + private final String location; + + public RDDNode(int id, String name, String operator, String location) { + this.id = id; + this.name = name; + this.operator = operator; + this.location = location; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public String getOperator() { + return operator; + } + + public String getLocation() { + return location; + } + + @Override + public String toString() { + return "RDDNode{" + + "id=" + + id + + ", name='" + + name + + '\'' + + ", operator='" + + operator + + '\'' + + ", location='" + + location + + '\'' + + '}'; + } +} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/RDDTreeParser.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/RDDTreeParser.java new file mode 100644 index 0000000000000..26419aff9f973 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/RDDTreeParser.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import java.util.ArrayList; +import java.util.List; + +/** Utility class for parsing RDD Debug String. */ +@SuppressWarnings("StringSplitter") +class RDDTreeParser { + + public static List parse(String debugString) { + List list = new ArrayList<>(); + String[] lines = debugString.split("\n"); + + for (String line : lines) { + line = line.trim(); + if (line.isEmpty()) { + continue; + } + + int id = extractId(line); + final String[] parsedString = line.replace("|", "").split(" at "); + String name = parsedString[0].replaceAll("[+\\-]", "").replaceAll("\\(\\d+\\)", "").trim(); + String operation = parsedString[1].trim(); + String location = parsedString[2].trim(); + + RDDNode node = new RDDNode(id, name, operation, location); + + list.add(node); + } + + return list; + } + + private static int extractId(String line) { + String idPart = line.substring(line.indexOf('[') + 1, line.indexOf(']')); + return Integer.parseInt(idPart); + } +} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index c271eae426cab..2f84c2b23fac1 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -17,23 +17,43 @@ */ package org.apache.beam.runners.spark.translation; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import org.apache.beam.runners.spark.SparkContextRule; +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.SparkRunner; +import org.apache.beam.runners.spark.SparkTransformOverrides; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.util.ByteArray; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.spark.api.java.JavaRDD; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.ClassRule; import org.junit.Test; import scala.Tuple2; @@ -41,7 +61,9 @@ @SuppressWarnings({ "rawtypes" // TODO(https://github.com/apache/beam/issues/20447) }) -public class TransformTranslatorTest { +public class TransformTranslatorTest implements Serializable { + + @ClassRule public static SparkContextRule contextRule = new SparkContextRule(); @Test public void testIteratorFlatten() { @@ -60,7 +82,7 @@ public void testSplitBySameKey() { WindowedValue.WindowedValueCoder wvCoder = WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE); Instant now = Instant.now(); - List window = Arrays.asList(GlobalWindow.INSTANCE); + List window = Collections.singletonList(GlobalWindow.INSTANCE); PaneInfo paneInfo = PaneInfo.NO_FIRING; List> firstKey = Arrays.asList( @@ -109,4 +131,74 @@ public void testSplitBySameKey() { } } } + + @Test + public void testSingleOutputParDoHasNoFilter() { + Pipeline p = Pipeline.create(); + SparkPipelineOptions options = contextRule.createPipelineOptions(); + TransformTranslator.Translator translator = new TransformTranslator.Translator(); + + PTransform> createTransform = Create.of("foo", "bar"); + + PassThrough.SingleOutput passThroughTransform = + PassThrough.ofSingleOutput(StringUtf8Coder.of()); + + PCollection pCollection = + p.apply("Create Values", createTransform) + .apply("Single Output PassThrough", passThroughTransform); + + p.replaceAll(SparkTransformOverrides.getDefaultOverrides(false)); + + EvaluationContext ctxt = new EvaluationContext(contextRule.getSparkContext(), p, options); + SparkRunner.initAccumulators(options, ctxt.getSparkContext()); + + p.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); + + @SuppressWarnings("unchecked") + BoundedDataset dataset = (BoundedDataset) ctxt.borrowDataset(pCollection); + JavaRDD> rdd = dataset.getRDD(); + + List parsed = RDDTreeParser.parse(rdd.toDebugString()); + for (RDDNode node : parsed) { + assertNotEquals("filter", node.getOperator()); + } + } + + @Test + public void testMultipleOutputPardoHaveFilter() { + Pipeline p = Pipeline.create(); + TupleTag tag1 = new TupleTag("tag1") {}; + TupleTag tag2 = new TupleTag("tag2") {}; + + SparkPipelineOptions options = contextRule.createPipelineOptions(); + TransformTranslator.Translator translator = new TransformTranslator.Translator(); + + PTransform> createTransform = Create.of("foo", "bar"); + + PassThrough.MultipleOutput passThroughTransform = + PassThrough.ofMultipleOutput(tag1, tag2); + + PCollectionTuple pCollectionTuple = + p.apply("Create Values", createTransform) + .apply("Multiple Output PassThrough", passThroughTransform); + + p.replaceAll(SparkTransformOverrides.getDefaultOverrides(false)); + + EvaluationContext ctxt = new EvaluationContext(contextRule.getSparkContext(), p, options); + SparkRunner.initAccumulators(options, ctxt.getSparkContext()); + + p.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); + + for (TupleTag tag : Lists.newArrayList(tag1, tag2)) { + @SuppressWarnings("unchecked") + BoundedDataset dataset = + (BoundedDataset) ctxt.borrowDataset(pCollectionTuple.get(tag)); + + JavaRDD> rdd = dataset.getRDD(); + List parsed = RDDTreeParser.parse(rdd.toDebugString()); + + assertThat(parsed.stream().map(RDDNode::getOperator)).contains("filter"); + assertTrue(parsed.stream().anyMatch(e -> e.getName().contains(tag.getId()))); + } + } } From 2e8e3dcbd7f5b152096c9ccdb80f001ceb1e8b12 Mon Sep 17 00:00:00 2001 From: twosom Date: Wed, 25 Sep 2024 23:40:56 +0900 Subject: [PATCH 4/5] Touch trigger files --- .../beam_PostCommit_Java_ValidatesRunner_Spark.json | 3 ++- ...stCommit_Java_ValidatesRunner_SparkStructuredStreaming.json | 3 ++- .../beam_PostCommit_Java_ValidatesRunner_Spark_Java11.json | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json index d59e273949da9..9b023f630c362 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json @@ -1,5 +1,6 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", - "https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test" + "https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test", + "https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test" } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json index d59e273949da9..9b023f630c362 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json @@ -1,5 +1,6 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", - "https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test" + "https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test", + "https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test" } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark_Java11.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark_Java11.json index d59e273949da9..9b023f630c362 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark_Java11.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark_Java11.json @@ -1,5 +1,6 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", - "https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test" + "https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test", + "https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test" } From 354a62931f1088e0c4bd4179d3700c4871d5108a Mon Sep 17 00:00:00 2001 From: twosom Date: Thu, 26 Sep 2024 00:08:19 +0900 Subject: [PATCH 5/5] add CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index d58ceffeb411a..7677a9cf31d3e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ * Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)). * Added support for Zstd codec in SerializableAvroCodecFactory (Java) ([#32349](https://github.com/apache/beam/issues/32349)) +* Optimized Spark Runner parDo transform evaluator (Java) ([#32537](https://github.com/apache/beam/issues/32537)) * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Breaking Changes