Skip to content

Commit

Permalink
[VL][1.2] Port #6563 #6679 for build options and collectQueryExecutio…
Browse files Browse the repository at this point in the history
…nFallbackSummary fix (#7919)
  • Loading branch information
weiting-chen authored Nov 18, 2024
1 parent a7aacaf commit 73b0fb6
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 118 deletions.
4 changes: 2 additions & 2 deletions cpp/velox/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ function(add_velox_test TEST_EXEC)
target_include_directories(
${TEST_EXEC} PRIVATE ${CMAKE_SOURCE_DIR}/velox ${CMAKE_SOURCE_DIR}/src
${VELOX_BUILD_PATH}/_deps/duckdb-src/src/include)
target_link_libraries(${TEST_EXEC} velox_benchmark_common GTest::gtest
GTest::gtest_main)
target_link_libraries(${TEST_EXEC} velox GTest::gtest GTest::gtest_main
google::glog)
gtest_discover_tests(${TEST_EXEC} DISCOVERY_MODE PRE_TEST)
endfunction()

Expand Down
3 changes: 1 addition & 2 deletions cpp/velox/tests/MemoryManagerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* limitations under the License.
*/

#include "benchmarks/common/BenchmarkUtils.h"
#include "compute/VeloxBackend.h"
#include "config/VeloxConfig.h"
#include "memory/VeloxMemoryManager.h"
Expand Down Expand Up @@ -50,7 +49,7 @@ class MemoryManagerTest : public ::testing::Test {
std::unordered_map<std::string, std::string> conf = {
{kMemoryReservationBlockSize, std::to_string(kMemoryReservationBlockSizeDefault)},
{kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}};
initVeloxBackend(conf);
gluten::VeloxBackend::create(conf);
}

void SetUp() override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.gluten.execution.WholeStageTransformer
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.utils.PlanUtil

import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
Expand Down Expand Up @@ -87,131 +87,134 @@ object GlutenImplicits {
}
}

implicit class DatasetTransformer[T](dateset: Dataset[T]) {
private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
val args = p.argString(Int.MaxValue)
val index = args.indexOf("isFinalPlan=")
assert(index >= 0)
args.substring(index + "isFinalPlan=".length).trim.toBoolean
}
private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
val args = p.argString(Int.MaxValue)
val index = args.indexOf("isFinalPlan=")
assert(index >= 0)
args.substring(index + "isFinalPlan=".length).trim.toBoolean
}

private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = {
var numGlutenNodes = 0
val fallbackNodeToReason = new mutable.HashMap[String, String]

def collect(tmp: QueryPlan[_]): Unit = {
tmp.foreachUp {
case _: ExecutedCommandExec =>
case _: CommandResultExec =>
case _: V2CommandExec =>
case _: DataWritingCommandExec =>
case _: WholeStageCodegenExec =>
case _: WholeStageTransformer =>
case _: InputAdapter =>
case _: ColumnarInputAdapter =>
case _: InputIteratorTransformer =>
case _: ColumnarToRowTransition =>
case _: RowToColumnarTransition =>
case p: ReusedExchangeExec =>
case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
collect(p.executedPlan)
case p: AdaptiveSparkPlanExec =>
// if we are here that means we are inside table cache.
val (innerNumGlutenNodes, innerFallbackNodeToReason) =
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// re-plan manually to skip cached data
val newSparkPlan = QueryExecution.createSparkPlan(
dateset.sparkSession,
dateset.sparkSession.sessionState.planner,
p.inputPlan.logicalLink.get)
val newExecutedPlan = QueryExecution.prepareExecutedPlan(
dateset.sparkSession,
newSparkPlan
)
processPlan(
newExecutedPlan,
new PlanStringConcat().append,
Some(plan => collectFallbackNodes(plan)))
}
numGlutenNodes += innerNumGlutenNodes
fallbackNodeToReason.++=(innerFallbackNodeToReason)
case p: QueryStageExec => collect(p.plan)
case p: GlutenPlan =>
numGlutenNodes += 1
p.innerChildren.foreach(collect)
case i: InMemoryTableScanExec =>
if (PlanUtil.isGlutenTableCache(i)) {
numGlutenNodes += 1
} else {
addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason)
private def collectFallbackNodes(spark: SparkSession, plan: QueryPlan[_]): FallbackInfo = {
var numGlutenNodes = 0
val fallbackNodeToReason = new mutable.HashMap[String, String]

def collect(tmp: QueryPlan[_]): Unit = {
tmp.foreachUp {
case _: ExecutedCommandExec =>
case _: CommandResultExec =>
case _: V2CommandExec =>
case _: DataWritingCommandExec =>
case _: WholeStageCodegenExec =>
case _: WholeStageTransformer =>
case _: InputAdapter =>
case _: ColumnarInputAdapter =>
case _: InputIteratorTransformer =>
case _: ColumnarToRowTransition =>
case _: RowToColumnarTransition =>
case p: ReusedExchangeExec =>
case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
collect(p.executedPlan)
case p: AdaptiveSparkPlanExec =>
// if we are here that means we are inside table cache.
val (innerNumGlutenNodes, innerFallbackNodeToReason) =
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// re-plan manually to skip cached data
val newSparkPlan = QueryExecution.createSparkPlan(
spark,
spark.sessionState.planner,
p.inputPlan.logicalLink.get)
val newExecutedPlan = QueryExecution.prepareExecutedPlan(
spark,
newSparkPlan
)
processPlan(
newExecutedPlan,
new PlanStringConcat().append,
Some(plan => collectFallbackNodes(spark, plan)))
}
collect(i.relation.cachedPlan)
case _: AQEShuffleReadExec => // Ignore
case p: SparkPlan =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
p.innerChildren.foreach(collect)
case _ =>
}
numGlutenNodes += innerNumGlutenNodes
fallbackNodeToReason.++=(innerFallbackNodeToReason)
case p: QueryStageExec => collect(p.plan)
case p: GlutenPlan =>
numGlutenNodes += 1
p.innerChildren.foreach(collect)
case i: InMemoryTableScanExec =>
if (PlanUtil.isGlutenTableCache(i)) {
numGlutenNodes += 1
} else {
addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason)
}
collect(i.relation.cachedPlan)
case _: AQEShuffleReadExec => // Ignore
case p: SparkPlan =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
p.innerChildren.foreach(collect)
case _ =>
}

collect(plan)
(numGlutenNodes, fallbackNodeToReason.toMap)
}

private def collectQueryExecutionFallbackSummary(qe: QueryExecution): FallbackSummary = {
var totalNumGlutenNodes = 0
var totalNumFallbackNodes = 0
val totalPhysicalPlanDescription = new ArrayBuffer[String]()
val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]()

def handlePlanWithAQEAndTableCache(
plan: SparkPlan,
logicalPlan: LogicalPlan,
isMaterialized: Boolean): Unit = {
val concat = new PlanStringConcat()
val collectFallbackFunc = Some(plan => collectFallbackNodes(plan))
val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// AQE is not materialized, so the columnar rules are not applied.
// For this case, We apply columnar rules manually with disable AQE.
val qe = dateset.sparkSession.sessionState.executePlan(logicalPlan)
processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
}
} else {
processPlan(plan, concat.append, collectFallbackFunc)
}
totalNumGlutenNodes += numGlutenNodes
totalNumFallbackNodes += fallbackNodeToReason.size
totalPhysicalPlanDescription.append(concat.toString())
totalFallbackNodeToReason.append(fallbackNodeToReason)
}
collect(plan)
(numGlutenNodes, fallbackNodeToReason.toMap)
}

// For command-like query, e.g., `INSERT INTO TABLE ...`
qe.commandExecuted.foreach {
case r: CommandResult =>
handlePlanWithAQEAndTableCache(r.commandPhysicalPlan, r.commandLogicalPlan, true)
case _ => // ignore
// collect fallback sumaary from query execution, make this method public as a util method
def collectQueryExecutionFallbackSummary(
spark: SparkSession,
qe: QueryExecution): FallbackSummary = {
var totalNumGlutenNodes = 0
var totalNumFallbackNodes = 0
val totalPhysicalPlanDescription = new ArrayBuffer[String]()
val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]()

def handlePlanWithAQEAndTableCache(
plan: SparkPlan,
logicalPlan: LogicalPlan,
isMaterialized: Boolean): Unit = {
val concat = new PlanStringConcat()
val collectFallbackFunc = Some(plan => collectFallbackNodes(spark, plan))
val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
// AQE is not materialized, so the columnar rules are not applied.
// For this case, We apply columnar rules manually with disable AQE.
val qe = spark.sessionState.executePlan(logicalPlan)
processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
}
} else {
processPlan(plan, concat.append, collectFallbackFunc)
}
totalNumGlutenNodes += numGlutenNodes
totalNumFallbackNodes += fallbackNodeToReason.size
totalPhysicalPlanDescription.append(concat.toString())
totalFallbackNodeToReason.append(fallbackNodeToReason)
}

// For query, e.g., `SELECT * FROM ...`
if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) {
val isMaterialized = qe.executedPlan.find {
case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true
case _ => false
}.isDefined
handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed, isMaterialized)
}
// For command-like query, e.g., `INSERT INTO TABLE ...`
qe.commandExecuted.foreach {
case r: CommandResult =>
handlePlanWithAQEAndTableCache(r.commandPhysicalPlan, r.commandLogicalPlan, true)
case _ => // ignore
}

FallbackSummary(
totalNumGlutenNodes,
totalNumFallbackNodes,
totalPhysicalPlanDescription.toSeq,
totalFallbackNodeToReason.toSeq
)
// For query, e.g., `SELECT * FROM ...`
if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) {
val isMaterialized = qe.executedPlan.find {
case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true
case _ => false
}.isDefined
handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed, isMaterialized)
}

FallbackSummary(
totalNumGlutenNodes,
totalNumFallbackNodes,
totalPhysicalPlanDescription.toSeq,
totalFallbackNodeToReason.toSeq
)
}

implicit class DatasetTransformer[T](dateset: Dataset[T]) {
def fallbackSummary(): FallbackSummary = {
collectQueryExecutionFallbackSummary(dateset.queryExecution)
collectQueryExecutionFallbackSummary(dateset.sparkSession, dateset.queryExecution)
}
}
}

0 comments on commit 73b0fb6

Please sign in to comment.