Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ParquetCachedBatchSerializer support for Databricks #2880

Merged
merged 10 commits into from
Jul 12, 2021
2 changes: 1 addition & 1 deletion jenkins/databricks/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def main():
print("rsync command: %s" % rsync_command)
subprocess.check_call(rsync_command, shell = True)

ssh_command = "bash -c 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null ubuntu@%s -p 2200 -i %s %s %s %s 2>&1 | tee testout; if [ `echo ${PIPESTATUS[0]}` -ne 0 ]; then false; else true; fi'" % (master_addr, params.private_key_file, params.script_dest, params.jar_path, params.spark_conf)
ssh_command = "bash -c 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null ubuntu@%s -p 2200 -i %s %s %s %s 2>&1 | tee testout; if [ `echo ${PIPESTATUS[0]}` -ne 0 ]; then false; else true; fi'" % (master_addr, params.private_key_file, params.script_dest, params.jar_path, params.spark_conf, params.base_spark_pom_version)
NvTimLiu marked this conversation as resolved.
Show resolved Hide resolved
print("ssh command: %s" % ssh_command)
subprocess.check_call(ssh_command, shell = True)

Expand Down
20 changes: 19 additions & 1 deletion jenkins/databricks/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

set -ex

LOCAL_JAR_PATH=$1
LOCAL_JAR_PATH=${LOCAL_JAR_PATH:-$1}
SPARK_CONF=$2
BASE_SPARK_VER=${BASE_SPARK_VER:-$3}

# tests
export PATH=/databricks/conda/envs/databricks-ml-gpu/bin:/databricks/conda/condabin:$PATH
Expand Down Expand Up @@ -54,19 +55,36 @@ if [ -n "$SPARK_CONF" ]; then
SPARK_CONF="--conf ${SPARK_CONF/','/' --conf '}"
fi

IS_SPARK_311_OR_LATER=0
[[ "$(printf '%s\n' "3.1.1" "$BASE_SPARK_VER" | sort -V | head -n1)" = "3.1.1" ]] && IS_SPARK_311_OR_LATER=1

TEST_TYPE="nightly"
PCBS_CONF="com.nvidia.spark.rapids.shims.spark311db.ParquetCachedBatchSerializer"
if [ -d "$LOCAL_JAR_PATH" ]; then
## Run tests with jars in the LOCAL_JAR_PATH dir downloading from the denpedency repo
LOCAL_JAR_PATH=$LOCAL_JAR_PATH bash $LOCAL_JAR_PATH/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" --test_type=$TEST_TYPE

## Run cache tests
if [[ "$IS_SPARK_311_OR_LATER" -eq "1" ]]; then
PYSP_TEST_spark_sql_cache_serializer=${PCBS_CONF} \
LOCAL_JAR_PATH=$LOCAL_JAR_PATH bash $LOCAL_JAR_PATH/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" --test_type=$TEST_TYPE -k cache_test
fi

## Run cudf-udf tests
CUDF_UDF_TEST_ARGS="$CUDF_UDF_TEST_ARGS --conf spark.executorEnv.PYTHONPATH=`ls $LOCAL_JAR_PATH/rapids-4-spark_*.jar | grep -v 'tests.jar'`"
LOCAL_JAR_PATH=$LOCAL_JAR_PATH SPARK_SUBMIT_FLAGS="$SPARK_CONF $CUDF_UDF_TEST_ARGS" TEST_PARALLEL=1 \
bash $LOCAL_JAR_PATH/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" -m "cudf_udf" --cudf_udf --test_type=$TEST_TYPE

else
## Run tests with jars building from the spark-rapids source code
bash /home/ubuntu/spark-rapids/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" --test_type=$TEST_TYPE

## Run cache tests
if [[ "$IS_SPARK_311_OR_LATER" -eq "1" ]]; then
PYSP_TEST_spark_sql_cache_serializer=${PCBS_CONF} \
bash /home/ubuntu/spark-rapids/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" --test_type=$TEST_TYPE -k cache_test
fi
NvTimLiu marked this conversation as resolved.
Show resolved Hide resolved

## Run cudf-udf tests
CUDF_UDF_TEST_ARGS="$CUDF_UDF_TEST_ARGS --conf spark.executorEnv.PYTHONPATH=`ls /home/ubuntu/spark-rapids/dist/target/rapids-4-spark_*.jar | grep -v 'tests.jar'`"
SPARK_SUBMIT_FLAGS="$SPARK_CONF $CUDF_UDF_TEST_ARGS" TEST_PARALLEL=1 \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.spark311db

import com.nvidia.spark.rapids.shims

class ParquetCachedBatchSerializer extends shims.spark311.ParquetCachedBatchSerializer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file shouldn't be needed now right? the 311 shim version of this works.
Or is the intention to keep the db one so that the user specifies this one? https://nvidia.github.io/spark-rapids/docs/additional-functionality/cache-serializer.html. If that is the case we need to update the docs.

originally the intention was that since the user has to specify it have it match the version of spark they are using so that its hopefully least confusing. I don't know how much it betters if they specify spark311 vs spark311db

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if its possible it would be nice to have one class that just loaded the proper shim version, but that is a separate issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file shouldn't be needed now right? the 311 shim version of this works.
Or is the intention to keep the db one so that the user specifies this one? https://nvidia.github.io/spark-rapids/docs/additional-functionality/cache-serializer.html. If that is the case we need to update the docs.

originally the intention was that since the user has to specify it have it match the version of spark they are using so that its hopefully least confusing. I don't know how much it betters if they specify spark311 vs spark311db

If we add spark311db to the documentation we will then also have to add spark311cdh. I almost feel like they are all spark 311 and we aren't doing anything specific in the extended version of their PCBS we should just get rid of them.

if its possible it would be nice to have one class that just loaded the proper shim version, but that is a separate issue.

This is a good idea, I can look into it as a follow-on

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlowe thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if its possible it would be nice to have one class that just loaded the proper shim version, but that is a separate issue.

☝️ This. We should not have Spark-specific versions of user-visible classes unless they are truly required (e.g.: as in the shuffle case, unfortunately). If we know one class will work going forward, as is the case with the main executor plugin, then we should strive to use a common class name without a Spark version in it. If this is indeed possible, we should deprecate the old 311 version and eventually remove it.

So it really all comes down to that question. If we can have a common version, my vote is to use the one class. We can change the package name and deprecate the existing spark311 package version in a new PR if it's too tricky to do in this one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I can do this as a follow-on. In the interim, do we just update the doc or remove the spark311db or spark311cdh versions of the serializer? I feel removing the db and cdh versions of the serializer should be the way as we will do more work as part of the follow-on

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're planning on removing these classes in the near future then we should not document them only to rip them out immediately afterward. Let's keep the number of classes to deprecate to a minimum.

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.spark311.Spark311Shims
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.rapids.shims.spark311.GpuInMemoryTableScanExec
razajafri marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.sql.rapids.shims.spark311db._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
Expand All @@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
razajafri marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec}
import org.apache.spark.sql.execution.datasources.{FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile}
Expand Down Expand Up @@ -166,6 +168,24 @@ class Spark311dbShims extends Spark311Shims {
wrapped.tableIdentifier)(conf)
}
}),
GpuOverrides.exec[InMemoryTableScanExec](
"Implementation of InMemoryTableScanExec to use GPU accelerated Caching",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT).nested(),
TypeSig.all),
(scan, conf, p, r) => new SparkPlanMeta[InMemoryTableScanExec](scan, conf, p, r) {
override def tagPlanForGpu(): Unit = {
if (!scan.relation.cacheBuilder.serializer.isInstanceOf[ParquetCachedBatchSerializer]) {
willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used")
}
}

/**
* Convert InMemoryTableScanExec to a GPU enabled version.
*/
override def convertToGpu(): GpuExec = {
GpuInMemoryTableScanExec(scan.attributes, scan.predicates, scan.relation)
}
}),
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
Expand Down