Skip to content

Commit

Permalink
Optimized SparkRunner ParDo Operation (#32546)
Browse files Browse the repository at this point in the history
* Optimize to skip filter application when there is only a single output

* Make SparkTransformOverrides class public for testing

* add related test

* Touch trigger files

* add CHANGES.md
  • Loading branch information
twosom authored Oct 3, 2024
1 parent 9095547 commit 3f76d11
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test"
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test"
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test"
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test"
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* Prism now supports Bundle Finalization. ([#32425](https://github.com/apache/beam/pull/32425))
* Significantly improved performance of Kafka IO reads that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) by removing the data reshuffle from SDF implementation. ([#31682](https://github.com/apache/beam/pull/31682)).
* Added support for dynamic writing in MqttIO (Java) ([#19376](https://github.com/apache/beam/issues/19376))
* Optimized Spark Runner parDo transform evaluator (Java) ([#32537](https://github.com/apache/beam/issues/32537))
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@SuppressWarnings({
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
})
class SparkTransformOverrides {
public final class SparkTransformOverrides {
public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
ImmutableList.Builder<PTransformOverride> builder = ImmutableList.builder();
// TODO: [https://github.com/apache/beam/issues/19107] Support @RequiresStableInput on Spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.spark.HashPartitioner;
Expand Down Expand Up @@ -448,7 +449,7 @@ public void evaluate(
}

Map<TupleTag<?>, PCollection<?>> outputs = context.getOutputs(transform);
if (outputs.size() > 1) {
if (hasMultipleOutputs(outputs)) {
StorageLevel level = StorageLevel.fromString(context.storageLevel());
if (canAvoidRddSerialization(level)) {
// if it is memory only reduce the overhead of moving to bytes
Expand All @@ -463,17 +464,28 @@ public void evaluate(
.persist(level)
.mapToPair(TranslationUtils.getTupleTagDecodeFunction(coderMap));
}
}
for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
JavaPairRDD<TupleTag<?>, WindowedValue<?>> filtered =
all.filter(new TranslationUtils.TupleTagFilter(output.getKey()));
// Object is the best we can do since different outputs can have different tags

for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
JavaPairRDD<TupleTag<?>, WindowedValue<?>> filtered =
all.filter(new TranslationUtils.TupleTagFilter(output.getKey()));
// Object is the best we can do since different outputs can have different tags
JavaRDD<WindowedValue<Object>> values =
(JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values();
context.putDataset(output.getValue(), new BoundedDataset<>(values));
}
} else {
JavaRDD<WindowedValue<Object>> values =
(JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values();
context.putDataset(output.getValue(), new BoundedDataset<>(values));
(JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) all.values();
context.putDataset(
Iterables.getOnlyElement(outputs.entrySet()).getValue(),
new BoundedDataset<>(values));
}
}

private boolean hasMultipleOutputs(Map<TupleTag<?>, PCollection<?>> outputs) {
return outputs.size() > 1;
}

@Override
public String toNativeString() {
return "mapPartitions(new <fn>())";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.translation;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;

public class PassThrough {

public static <InputT> SingleOutput<InputT> ofSingleOutput(Coder<InputT> inputCoder) {
return new SingleOutput<>(inputCoder);
}

public static <InputT> MultipleOutput<InputT> ofMultipleOutput(
TupleTag<InputT> tag1, TupleTag<InputT> tag2) {
return new MultipleOutput<>(tag1, tag2);
}

public static class SingleOutput<InputT>
extends PTransform<PCollection<InputT>, PCollection<InputT>> {
private final Coder<InputT> inputCoder;

public SingleOutput(Coder<InputT> inputCoder) {
this.inputCoder = inputCoder;
}

@Override
public PCollection<InputT> expand(PCollection<InputT> input) {
return input
.apply(
ParDo.of(
new DoFn<InputT, InputT>() {
@ProcessElement
public void process(@Element InputT input, OutputReceiver<InputT> output) {
output.output(input);
}
}))
.setCoder(inputCoder);
}
}

public static class MultipleOutput<InputT>
extends PTransform<PCollection<InputT>, PCollectionTuple> {

private final TupleTag<InputT> tag1;
private final TupleTag<InputT> tag2;

public MultipleOutput(TupleTag<InputT> tag1, TupleTag<InputT> tag2) {
this.tag1 = tag1;
this.tag2 = tag2;
}

@Override
public PCollectionTuple expand(PCollection<InputT> input) {
return input.apply(
ParDo.of(
new DoFn<InputT, InputT>() {
@ProcessElement
public void process(@Element InputT input, MultiOutputReceiver output) {
if (input.hashCode() % 2 == 0) {
output.get(tag1).output(input);
} else {
output.get(tag2).output(input);
}
}
})
.withOutputTags(tag1, TupleTagList.of(tag2)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.translation;

class RDDNode {
private final int id;
private final String name;
private final String operator;
private final String location;

public RDDNode(int id, String name, String operator, String location) {
this.id = id;
this.name = name;
this.operator = operator;
this.location = location;
}

public int getId() {
return id;
}

public String getName() {
return name;
}

public String getOperator() {
return operator;
}

public String getLocation() {
return location;
}

@Override
public String toString() {
return "RDDNode{"
+ "id="
+ id
+ ", name='"
+ name
+ '\''
+ ", operator='"
+ operator
+ '\''
+ ", location='"
+ location
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.translation;

import java.util.ArrayList;
import java.util.List;

/** Utility class for parsing RDD Debug String. */
@SuppressWarnings("StringSplitter")
class RDDTreeParser {

public static List<RDDNode> parse(String debugString) {
List<RDDNode> list = new ArrayList<>();
String[] lines = debugString.split("\n");

for (String line : lines) {
line = line.trim();
if (line.isEmpty()) {
continue;
}

int id = extractId(line);
final String[] parsedString = line.replace("|", "").split(" at ");
String name = parsedString[0].replaceAll("[+\\-]", "").replaceAll("\\(\\d+\\)", "").trim();
String operation = parsedString[1].trim();
String location = parsedString[2].trim();

RDDNode node = new RDDNode(id, name, operation, location);

list.add(node);
}

return list;
}

private static int extractId(String line) {
String idPart = line.substring(line.indexOf('[') + 1, line.indexOf(']'));
return Integer.parseInt(idPart);
}
}
Loading

0 comments on commit 3f76d11

Please sign in to comment.