From fd465222e0a0e24167191e0a5ff0266f04a064e9 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 6 Jul 2022 16:29:42 +0800 Subject: [PATCH] Fix fake memory leaks in some test cases Signed-off-by: Chong Gao --- .../com/nvidia/spark/rapids/Plugin.scala | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index 15a0a06d55a..143cc2d2d82 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.{Map => MutableMap} import scala.util.Try import scala.util.matching.Regex -import ai.rapids.cudf.{CudaException, CudaFatalException, CudfException} +import ai.rapids.cudf.{CudaException, CudaFatalException, CudfException, MemoryCleaner} import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, TaskFailedReason} @@ -209,6 +209,9 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { pluginContext: PluginContext, extraConf: java.util.Map[String, String]): Unit = { try { + // if configured, re-register checking leaks hook. + ReRegisterCheckLeakHook + val conf = new RapidsConf(extraConf.asScala.toMap) // Compare if the cudf version mentioned in the classpath is equal to the version which @@ -259,6 +262,34 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { } } + /** + * Re-register leaks checking hook if configured. + */ + private def ReRegisterCheckLeakHook: Unit = { + // DEFAULT_SHUTDOWN_THREAD in MemoryCleaner is responsible to check the leaks at shutdown time, + // it expects all other hooks are done before the checking + // as other hooks will close some resources. + + val REF_COUNT_DEBUG_STR = System.getProperty(MemoryCleaner.REF_COUNT_DEBUG_KEY, "false") + if (REF_COUNT_DEBUG_STR.equalsIgnoreCase("true")) { + MemoryCleaner.removeDefaultShutdownHook() + // Shutdown hooks are executed concurrently in JVM, and there is no execution order guarantee. + // See the doc of `Runtime.addShutdownHook`. + // Some resources are closed in Spark hooks. + // Here we should wait Spark hooks to be done, or a false leak will be detected. + // See issue: https://github.com/NVIDIA/spark-rapids/issues/5854 + // + // `Spark ShutdownHookManager` leverages `Hadoop ShutdownHookManager` to manage hooks with + // priority. The priority parameter will guarantee the execution order. + // + // Here also use `Hadoop ShutdownHookManager` to add a lower priority hook. + // 20 priority is small enough, will run after Spark hooks. + // Note: `ShutdownHookManager.get()` is a singleton + org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( + MemoryCleaner.DEFAULT_SHUTDOWN_RUNNABLE, 20) + } + } + private def checkCudfVersion(conf: RapidsConf): Unit = { try { val pluginProps = RapidsPluginUtils.loadProps(RapidsPluginUtils.PLUGIN_PROPS_FILENAME)