Skip to content

Commit

Permalink
Fix fake memory leaks in some test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Chong Gao <res_life@163.com>
  • Loading branch information
Chong Gao committed Jul 6, 2022
1 parent 30467f2 commit fd46522
Showing 1 changed file with 32 additions and 1 deletion.
33 changes: 32 additions & 1 deletion sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable.{Map => MutableMap}
import scala.util.Try
import scala.util.matching.Regex

import ai.rapids.cudf.{CudaException, CudaFatalException, CudfException}
import ai.rapids.cudf.{CudaException, CudaFatalException, CudfException, MemoryCleaner}
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore

import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, TaskFailedReason}
Expand Down Expand Up @@ -209,6 +209,9 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
pluginContext: PluginContext,
extraConf: java.util.Map[String, String]): Unit = {
try {
// if configured, re-register checking leaks hook.
ReRegisterCheckLeakHook

val conf = new RapidsConf(extraConf.asScala.toMap)

// Compare if the cudf version mentioned in the classpath is equal to the version which
Expand Down Expand Up @@ -259,6 +262,34 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
}
}

/**
* Re-register leaks checking hook if configured.
*/
private def ReRegisterCheckLeakHook: Unit = {
// DEFAULT_SHUTDOWN_THREAD in MemoryCleaner is responsible to check the leaks at shutdown time,
// it expects all other hooks are done before the checking
// as other hooks will close some resources.

val REF_COUNT_DEBUG_STR = System.getProperty(MemoryCleaner.REF_COUNT_DEBUG_KEY, "false")
if (REF_COUNT_DEBUG_STR.equalsIgnoreCase("true")) {
MemoryCleaner.removeDefaultShutdownHook()
// Shutdown hooks are executed concurrently in JVM, and there is no execution order guarantee.
// See the doc of `Runtime.addShutdownHook`.
// Some resources are closed in Spark hooks.
// Here we should wait Spark hooks to be done, or a false leak will be detected.
// See issue: https://github.com/NVIDIA/spark-rapids/issues/5854
//
// `Spark ShutdownHookManager` leverages `Hadoop ShutdownHookManager` to manage hooks with
// priority. The priority parameter will guarantee the execution order.
//
// Here also use `Hadoop ShutdownHookManager` to add a lower priority hook.
// 20 priority is small enough, will run after Spark hooks.
// Note: `ShutdownHookManager.get()` is a singleton
org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
MemoryCleaner.DEFAULT_SHUTDOWN_RUNNABLE, 20)
}
}

private def checkCudfVersion(conf: RapidsConf): Unit = {
try {
val pluginProps = RapidsPluginUtils.loadProps(RapidsPluginUtils.PLUGIN_PROPS_FILENAME)
Expand Down

0 comments on commit fd46522

Please sign in to comment.