Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6748][CORE] Search stack trace to infer adaptive execution context #7121

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,7 @@ class EnumeratedApplier(session: SparkSession, ruleBuilders: Seq[ColumnarRuleBui
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
// An empirical value.
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
16
} else {
14
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
private val adaptiveContext = AdaptiveContext(session)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,7 @@ class HeuristicApplier(
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
// This is an empirical value, may need to be changed for supporting other versions of spark.
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
19
} else {
17
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
private val adaptiveContext = AdaptiveContext(session)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ sealed trait AdaptiveContext {
}

object AdaptiveContext {
def apply(session: SparkSession, aqeStackTraceIndex: Int): AdaptiveContext =
new AdaptiveContextImpl(session, aqeStackTraceIndex)
def apply(session: SparkSession): AdaptiveContext =
new AdaptiveContextImpl(session)

private val GLUTEN_IS_ADAPTIVE_CONTEXT = "gluten.isAdaptiveContext"

Expand All @@ -45,8 +45,7 @@ object AdaptiveContext {
private val localIsAdaptiveContextFlags: ThreadLocal[ListBuffer[Boolean]] =
ThreadLocal.withInitial(() => ListBuffer.empty[Boolean])

private class AdaptiveContextImpl(session: SparkSession, aqeStackTraceIndex: Int)
extends AdaptiveContext {
private class AdaptiveContextImpl(session: SparkSession) extends AdaptiveContext {
// Just for test use.
override def enableAdaptiveContext(): Unit = {
session.sparkContext.setLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT, "true")
Expand All @@ -60,19 +59,13 @@ object AdaptiveContext {

override def setAdaptiveContext(): Unit = {
val traceElements = Thread.currentThread.getStackTrace
assert(
traceElements.length > aqeStackTraceIndex,
s"The number of stack trace elements is expected to be more than $aqeStackTraceIndex")
// ApplyColumnarRulesAndInsertTransitions is called by either QueryExecution or
// AdaptiveSparkPlanExec. So by checking the stack trace, we can know whether
// columnar rule will be applied in adaptive execution context. This part of code
// needs to be carefully checked when supporting higher versions of spark to make
// sure the calling stack has not been changed.
// columnar rule will be applied in adaptive execution context.
localIsAdaptiveContextFlags
.get()
.prepend(
traceElements(aqeStackTraceIndex).getClassName
.equals(AdaptiveSparkPlanExec.getClass.getName))
traceElements.exists(_.getClassName.equals(AdaptiveSparkPlanExec.getClass.getName)))
}

override def resetAdaptiveContext(): Unit =
Expand Down
Loading