Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong Gao committed Jul 7, 2022
1 parent fd46522 commit ff75fce
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 12 deletions.
15 changes: 4 additions & 11 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStag
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.rapids.GpuShuffleEnv
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.util.QueryExecutionListener

class PluginException(msg: String) extends RuntimeException(msg)
Expand Down Expand Up @@ -270,23 +271,15 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
// it expects all other hooks are done before the checking
// as other hooks will close some resources.

val REF_COUNT_DEBUG_STR = System.getProperty(MemoryCleaner.REF_COUNT_DEBUG_KEY, "false")
if (REF_COUNT_DEBUG_STR.equalsIgnoreCase("true")) {
MemoryCleaner.removeDefaultShutdownHook()
if (MemoryCleaner.configuredDefaultShutdownHook) {
// Shutdown hooks are executed concurrently in JVM, and there is no execution order guarantee.
// See the doc of `Runtime.addShutdownHook`.
// Some resources are closed in Spark hooks.
// Here we should wait Spark hooks to be done, or a false leak will be detected.
// See issue: https://github.com/NVIDIA/spark-rapids/issues/5854
//
// `Spark ShutdownHookManager` leverages `Hadoop ShutdownHookManager` to manage hooks with
// priority. The priority parameter will guarantee the execution order.
//
// Here also use `Hadoop ShutdownHookManager` to add a lower priority hook.
// Here use `Spark ShutdownHookManager` to manage hooks with priority.
// 20 priority is small enough, will run after Spark hooks.
// Note: `ShutdownHookManager.get()` is a singleton
org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
MemoryCleaner.DEFAULT_SHUTDOWN_RUNNABLE, 20)
TrampolineUtil.addShutdownHook(20, MemoryCleaner.removeDefaultShutdownHook())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
import org.apache.spark.sql.rapids.shims.SparkUpgradeExceptionShims
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
import org.apache.spark.util.{ShutdownHookManager, Utils}

object TrampolineUtil {
def doExecuteBroadcast[T](child: SparkPlan): Broadcast[T] = child.doExecuteBroadcast()
Expand Down Expand Up @@ -152,4 +152,9 @@ object TrampolineUtil {

/** Remove the task context for the current thread */
def unsetTaskContext(): Unit = TaskContext.unset()

/** Add shutdown hook with priority */
def addShutdownHook(priority: Int, runnable: Runnable): AnyRef = {
ShutdownHookManager.addShutdownHook(priority)(() => runnable.run())
}
}

0 comments on commit ff75fce

Please sign in to comment.