From 3b2986c775f3d10f917347c878d0bea9adc75653 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 16 Aug 2022 15:32:57 -0700 Subject: [PATCH] Use Spark's `Utils.getContextOrSparkClassLoader` to load Shims (#5646) Fixes #3851 Spark loads external [datasources][1] using `Utils.getContextOrSparkClassLoader` Trampoline to `Utils.getContextOrSparkClassLoader` to make our current code work with external sources, and to unblock JDK9+ [1]: https://github.com/apache/spark/blob/b63674ea5f746306a96ab8c39c23a230a6cb9566/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L652 Signed-off-by: Gera Shegalov --- CONTRIBUTING.md | 4 -- dist/unshimmed-common-from-spark311.txt | 1 + jenkins/databricks/test.sh | 3 +- jenkins/spark-premerge-build.sh | 8 --- jenkins/spark-tests.sh | 1 - .../com/nvidia/spark/rapids/RapidsConf.scala | 8 --- .../nvidia/spark/rapids/GpuOverrides.scala | 8 +-- .../com/nvidia/spark/rapids/RapidsConf.scala | 8 --- .../com/nvidia/spark/rapids/ShimLoader.scala | 66 ++----------------- .../execution/UnshimmedTrampolineUtil.scala | 23 +++++++ 10 files changed, 34 insertions(+), 96 deletions(-) create mode 100644 sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/UnshimmedTrampolineUtil.scala diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ae81930b9d4..dd7c23fae09 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -105,10 +105,6 @@ You can build against different versions of the CUDA Toolkit by using qone of th We support JDK8 as our main JDK version. However, it's possible to build and run with more modern JDK versions as well. To this end set `JAVA_HOME` in the environment to your JDK root directory. -With JDK9+, you need to disable the default classloader manipulation option and set -spark.rapids.force.caller.classloader=false in your Spark application configuration. There are, however, -known issues with it, e.g. see #5513. - At the time of this writing, the most robust way to run the RAPIDS Accelerator is from a jar dedicated to a single Spark version. To this end please use a single shim and specify `-DallowConventionalDistJar=true` diff --git a/dist/unshimmed-common-from-spark311.txt b/dist/unshimmed-common-from-spark311.txt index 879283a2058..f9d391bd858 100644 --- a/dist/unshimmed-common-from-spark311.txt +++ b/dist/unshimmed-common-from-spark311.txt @@ -28,6 +28,7 @@ com/nvidia/spark/rapids/SparkShimVersion* com/nvidia/spark/rapids/SparkShims* com/nvidia/spark/udf/Plugin* org/apache/spark/sql/rapids/ProxyRapidsShuffleInternalManagerBase* +org/apache/spark/sql/rapids/execution/Unshimmed* org/apache/spark/sql/rapids/RapidsShuffleManagerLike* rapids/*.py rapids4spark-version-info.properties diff --git a/jenkins/databricks/test.sh b/jenkins/databricks/test.sh index d5eb0f1ee36..8cbbb4b526a 100755 --- a/jenkins/databricks/test.sh +++ b/jenkins/databricks/test.sh @@ -82,8 +82,7 @@ ICEBERG_SPARK_VER=$(echo $BASE_SPARK_VER | cut -d. -f1,2) # Classloader config is here to work around classloader issues with # --packages in distributed setups, should be fixed by # https://github.com/NVIDIA/spark-rapids/pull/5646 -ICEBERG_CONFS="--conf spark.rapids.force.caller.classloader=false \ - --packages org.apache.iceberg:iceberg-spark-runtime-${ICEBERG_SPARK_VER}_2.12:${ICEBERG_VERSION} \ +ICEBERG_CONFS="--packages org.apache.iceberg:iceberg-spark-runtime-${ICEBERG_SPARK_VER}_2.12:${ICEBERG_VERSION} \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hadoop \ diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index a2306c77814..256f5fdf7e4 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -85,14 +85,6 @@ mvn_verify() { # Triggering here until we change the jenkins file rapids_shuffle_smoke_test - - # non-caller classloader smoke test in pseudo-distributed - # standalone cluster - echo "Running test_cartesian_join_special_case_count with spark.rapids.force.caller.classloader=false" - PYSP_TEST_spark_rapids_force_caller_classloader=false \ - NUM_LOCAL_EXECS=1 \ - TEST_PARALLEL=0 \ - ./integration_tests/run_pyspark_from_build.sh -k 'test_cartesian_join_special_case_count[100]' } rapids_shuffle_smoke_test() { diff --git a/jenkins/spark-tests.sh b/jenkins/spark-tests.sh index 002d19de599..f3c39af3fcf 100755 --- a/jenkins/spark-tests.sh +++ b/jenkins/spark-tests.sh @@ -183,7 +183,6 @@ run_iceberg_tests() { # --packages in distributed setups, should be fixed by # https://github.com/NVIDIA/spark-rapids/pull/5646 SPARK_SUBMIT_FLAGS="$BASE_SPARK_SUBMIT_ARGS $SEQ_CONF \ - --conf spark.rapids.force.caller.classloader=false \ --packages org.apache.iceberg:iceberg-spark-runtime-${ICEBERG_SPARK_VER}_2.12:${ICEBERG_VERSION} \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ diff --git a/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 868743c5cdd..feaf02aebac 100644 --- a/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/spark2-sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1479,14 +1479,6 @@ object RapidsConf { .booleanConf .createWithDefault(true) - val FORCE_SHIMCALLER_CLASSLOADER = conf("spark.rapids.force.caller.classloader") - .doc("Option to statically add shim's parallel world classloader URLs to " + - "the classloader of the ShimLoader class, typically Bootstrap classloader. This option" + - " uses reflection with setAccessible true on a classloader that is not created by Spark.") - .internal() - .booleanConf - .createWithDefault(value = true) - val SPARK_GPU_RESOURCE_NAME = conf("spark.rapids.gpu.resourceName") .doc("The name of the Spark resource that represents a GPU that you want the plugin to use " + "if using custom resources with Spark.") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 177604c693d..edea1e0e7e0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3953,7 +3953,7 @@ object GpuOverrides extends Logging { .withPsNote(TypeEnum.STRUCT, "Round-robin partitioning is not supported for nested " + s"structs if ${SQLConf.SORT_BEFORE_REPARTITION.key} is true") .withPsNote( - Seq(TypeEnum.ARRAY, TypeEnum.MAP), + Seq(TypeEnum.ARRAY, TypeEnum.MAP), "Round-robin partitioning is not supported if " + s"${SQLConf.SORT_BEFORE_REPARTITION.key} is true"), TypeSig.all), @@ -4017,7 +4017,7 @@ object GpuOverrides extends Logging { (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 + TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT) .nested() - .withPsNote(Seq(TypeEnum.ARRAY, TypeEnum.MAP), + .withPsNote(Seq(TypeEnum.ARRAY, TypeEnum.MAP), "not allowed for grouping expressions") .withPsNote(TypeEnum.STRUCT, "not allowed for grouping expressions if containing Array or Map as child"), @@ -4033,7 +4033,7 @@ object GpuOverrides extends Logging { .nested() .withPsNote(TypeEnum.BINARY, "only allowed when aggregate buffers can be " + "converted between CPU and GPU") - .withPsNote(Seq(TypeEnum.ARRAY, TypeEnum.MAP), + .withPsNote(Seq(TypeEnum.ARRAY, TypeEnum.MAP), "not allowed for grouping expressions") .withPsNote(TypeEnum.STRUCT, "not allowed for grouping expressions if containing Array or Map as child"), @@ -4051,7 +4051,7 @@ object GpuOverrides extends Logging { .nested() .withPsNote(TypeEnum.BINARY, "only allowed when aggregate buffers can be " + "converted between CPU and GPU") - .withPsNote(Seq(TypeEnum.ARRAY, TypeEnum.MAP), + .withPsNote(Seq(TypeEnum.ARRAY, TypeEnum.MAP), "not allowed for grouping expressions") .withPsNote(TypeEnum.STRUCT, "not allowed for grouping expressions if containing Array or Map as child"), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index e2766970aef..9c56b96664a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1497,14 +1497,6 @@ object RapidsConf { .booleanConf .createWithDefault(true) - val FORCE_SHIMCALLER_CLASSLOADER = conf("spark.rapids.force.caller.classloader") - .doc("Option to statically add shim's parallel world classloader URLs to " + - "the classloader of the ShimLoader class, typically Bootstrap classloader. This option" + - " uses reflection with setAccessible true on a classloader that is not created by Spark.") - .internal() - .booleanConf - .createWithDefault(value = true) - val SPARK_GPU_RESOURCE_NAME = conf("spark.rapids.gpu.resourceName") .doc("The name of the Spark resource that represents a GPU that you want the plugin to use " + "if using custom resources with Spark.") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimLoader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimLoader.scala index a1b61c13b15..2032cb29a23 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimLoader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShimLoader.scala @@ -23,7 +23,7 @@ import com.nvidia.spark.rapids.iceberg.IcebergProvider import org.apache.commons.lang3.reflect.MethodUtils import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.util.{Failure, Success, Try} +import scala.util.Try import org.apache.spark.{SPARK_BRANCH, SPARK_BUILD_DATE, SPARK_BUILD_USER, SPARK_REPO_URL, SPARK_REVISION, SPARK_VERSION, SparkConf, SparkEnv} import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin} @@ -32,6 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan} +import org.apache.spark.sql.rapids.execution.UnshimmedTrampolineUtil import org.apache.spark.util.MutableURLClassLoader /* @@ -129,52 +130,6 @@ object ShimLoader extends Logging { s"org.apache.spark.sql.rapids.shims.$shimId.RapidsShuffleInternalManager" } - private def serializerClassloader(): Option[ClassLoader] = { - // Hypothesis: serializer is the most universal way to intercept classloaders - - // https://github.com/apache/spark/blob/master/core/src/main/scala/ - // org/apache/spark/serializer/JavaSerializer.scala#L147 - - // https://github.com/apache/spark/blob/master/core/src/main/scala/ - // org/apache/spark/serializer/KryoSerializer.scala#L134 - - Option(SparkEnv.get) - .flatMap { - case env if !env.conf.get("spark.rapids.force.caller.classloader", - true.toString).toBoolean => Option(env.serializer) - case _ if (conventionalSingleShimJarDetected) => None - case _ => - logInfo("Forcing shim caller classloader update (default behavior). " + - "If it causes issues with userClassPathFirst, set " + - "spark.rapids.force.caller.classloader to false!") - None - } - .flatMap { serializer => - logInfo("Looking for a mutable classloader (defaultClassLoader) in SparkEnv.serializer " + - serializer) - // scalac generates accessor methods - val serdeClassLoader = MethodUtils - .invokeMethod(serializer, true, "defaultClassLoader") - .asInstanceOf[Option[ClassLoader]] - .getOrElse { - val threadContextClassLoader = Thread.currentThread().getContextClassLoader - logInfo(s"No defaultClassLoader found in $serializer, falling back " + - s"on Thread context classloader: " + threadContextClassLoader) - threadContextClassLoader - } - - logInfo("Extracted Spark classloader from SparkEnv.serializer " + serdeClassLoader) - findURLClassLoader(serdeClassLoader) - }.orElse { - val shimLoaderCallerCl = getClass.getClassLoader - if (!conventionalSingleShimJarDetected) { - logInfo("Falling back on ShimLoader caller's classloader " + shimLoaderCallerCl) - } - Option(shimLoaderCallerCl) - } - } - - @tailrec private def findURLClassLoader(classLoader: ClassLoader): Option[ClassLoader] = { // walk up the classloader hierarchy until we hit a classloader we can mutate @@ -214,23 +169,12 @@ object ShimLoader extends Logging { } private def updateSparkClassLoader(): Unit = { - // TODO propose a proper addClassPathURL API to Spark similar to addJar but - // accepting non-file-based URI - serializerClassloader().foreach { urlAddable => + findURLClassLoader(UnshimmedTrampolineUtil.sparkClassLoader).foreach { urlAddable => urlsForSparkClassLoader.foreach { url => if (!conventionalSingleShimJarDetected) { logInfo(s"Updating spark classloader $urlAddable with the URLs: " + urlsForSparkClassLoader.mkString(", ")) - Try(MethodUtils.invokeMethod(urlAddable, true, "addURL", url)) - .recoverWith { - case nsm: NoSuchMethodException => - logWarning("JDK8+ detected, consider setting " + - "spark.rapids.force.caller.classloader to false as a workaround") - logDebug(s"JDK8+ detected by catching ${nsm}", nsm) - Success(Unit) - case t => Failure(t) - }.get - + MethodUtils.invokeMethod(urlAddable, true, "addURL", url) logInfo(s"Spark classLoader $urlAddable updated successfully") urlAddable match { case urlCl: java.net.URLClassLoader => @@ -240,7 +184,7 @@ object ShimLoader extends Logging { s"classloader $urlCl although addURL succeeded, maybe pushed up to the " + s"parent classloader ${urlCl.getParent}") } - case _ => () + case _ => Unit } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/UnshimmedTrampolineUtil.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/UnshimmedTrampolineUtil.scala new file mode 100644 index 00000000000..482cbe1ba53 --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/UnshimmedTrampolineUtil.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.execution + +object UnshimmedTrampolineUtil { + def sparkClassLoader: ClassLoader = { + org.apache.spark.util.Utils.getContextOrSparkClassLoader + } +}