From 73b0fb6ebfb3c8fe20db15332a797786e7e03972 Mon Sep 17 00:00:00 2001 From: Wei-Ting Chen Date: Mon, 18 Nov 2024 10:01:27 +0800 Subject: [PATCH] [VL][1.2] Port #6563 #6679 for build options and collectQueryExecutionFallbackSummary fix (#7919) --- cpp/velox/tests/CMakeLists.txt | 4 +- cpp/velox/tests/MemoryManagerTest.cc | 3 +- .../spark/sql/execution/GlutenImplicits.scala | 231 +++++++++--------- 3 files changed, 120 insertions(+), 118 deletions(-) diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt index f3d65f127f67..dac83cd87781 100644 --- a/cpp/velox/tests/CMakeLists.txt +++ b/cpp/velox/tests/CMakeLists.txt @@ -29,8 +29,8 @@ function(add_velox_test TEST_EXEC) target_include_directories( ${TEST_EXEC} PRIVATE ${CMAKE_SOURCE_DIR}/velox ${CMAKE_SOURCE_DIR}/src ${VELOX_BUILD_PATH}/_deps/duckdb-src/src/include) - target_link_libraries(${TEST_EXEC} velox_benchmark_common GTest::gtest - GTest::gtest_main) + target_link_libraries(${TEST_EXEC} velox GTest::gtest GTest::gtest_main + google::glog) gtest_discover_tests(${TEST_EXEC} DISCOVERY_MODE PRE_TEST) endfunction() diff --git a/cpp/velox/tests/MemoryManagerTest.cc b/cpp/velox/tests/MemoryManagerTest.cc index 52f2fa8b661c..d86bd46e230d 100644 --- a/cpp/velox/tests/MemoryManagerTest.cc +++ b/cpp/velox/tests/MemoryManagerTest.cc @@ -15,7 +15,6 @@ * limitations under the License. */ -#include "benchmarks/common/BenchmarkUtils.h" #include "compute/VeloxBackend.h" #include "config/VeloxConfig.h" #include "memory/VeloxMemoryManager.h" @@ -50,7 +49,7 @@ class MemoryManagerTest : public ::testing::Test { std::unordered_map conf = { {kMemoryReservationBlockSize, std::to_string(kMemoryReservationBlockSizeDefault)}, {kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}}; - initVeloxBackend(conf); + gluten::VeloxBackend::create(conf); } void SetUp() override { diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala index eb42f0a88460..2e2af6517d9c 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala @@ -20,7 +20,7 @@ import org.apache.gluten.execution.WholeStageTransformer import org.apache.gluten.extension.GlutenPlan import org.apache.gluten.utils.PlanUtil -import org.apache.spark.sql.{AnalysisException, Dataset} +import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan} import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat @@ -87,131 +87,134 @@ object GlutenImplicits { } } - implicit class DatasetTransformer[T](dateset: Dataset[T]) { - private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = { - val args = p.argString(Int.MaxValue) - val index = args.indexOf("isFinalPlan=") - assert(index >= 0) - args.substring(index + "isFinalPlan=".length).trim.toBoolean - } + private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = { + val args = p.argString(Int.MaxValue) + val index = args.indexOf("isFinalPlan=") + assert(index >= 0) + args.substring(index + "isFinalPlan=".length).trim.toBoolean + } - private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = { - var numGlutenNodes = 0 - val fallbackNodeToReason = new mutable.HashMap[String, String] - - def collect(tmp: QueryPlan[_]): Unit = { - tmp.foreachUp { - case _: ExecutedCommandExec => - case _: CommandResultExec => - case _: V2CommandExec => - case _: DataWritingCommandExec => - case _: WholeStageCodegenExec => - case _: WholeStageTransformer => - case _: InputAdapter => - case _: ColumnarInputAdapter => - case _: InputIteratorTransformer => - case _: ColumnarToRowTransition => - case _: RowToColumnarTransition => - case p: ReusedExchangeExec => - case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) => - collect(p.executedPlan) - case p: AdaptiveSparkPlanExec => - // if we are here that means we are inside table cache. - val (innerNumGlutenNodes, innerFallbackNodeToReason) = - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { - // re-plan manually to skip cached data - val newSparkPlan = QueryExecution.createSparkPlan( - dateset.sparkSession, - dateset.sparkSession.sessionState.planner, - p.inputPlan.logicalLink.get) - val newExecutedPlan = QueryExecution.prepareExecutedPlan( - dateset.sparkSession, - newSparkPlan - ) - processPlan( - newExecutedPlan, - new PlanStringConcat().append, - Some(plan => collectFallbackNodes(plan))) - } - numGlutenNodes += innerNumGlutenNodes - fallbackNodeToReason.++=(innerFallbackNodeToReason) - case p: QueryStageExec => collect(p.plan) - case p: GlutenPlan => - numGlutenNodes += 1 - p.innerChildren.foreach(collect) - case i: InMemoryTableScanExec => - if (PlanUtil.isGlutenTableCache(i)) { - numGlutenNodes += 1 - } else { - addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason) + private def collectFallbackNodes(spark: SparkSession, plan: QueryPlan[_]): FallbackInfo = { + var numGlutenNodes = 0 + val fallbackNodeToReason = new mutable.HashMap[String, String] + + def collect(tmp: QueryPlan[_]): Unit = { + tmp.foreachUp { + case _: ExecutedCommandExec => + case _: CommandResultExec => + case _: V2CommandExec => + case _: DataWritingCommandExec => + case _: WholeStageCodegenExec => + case _: WholeStageTransformer => + case _: InputAdapter => + case _: ColumnarInputAdapter => + case _: InputIteratorTransformer => + case _: ColumnarToRowTransition => + case _: RowToColumnarTransition => + case p: ReusedExchangeExec => + case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) => + collect(p.executedPlan) + case p: AdaptiveSparkPlanExec => + // if we are here that means we are inside table cache. + val (innerNumGlutenNodes, innerFallbackNodeToReason) = + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + // re-plan manually to skip cached data + val newSparkPlan = QueryExecution.createSparkPlan( + spark, + spark.sessionState.planner, + p.inputPlan.logicalLink.get) + val newExecutedPlan = QueryExecution.prepareExecutedPlan( + spark, + newSparkPlan + ) + processPlan( + newExecutedPlan, + new PlanStringConcat().append, + Some(plan => collectFallbackNodes(spark, plan))) } - collect(i.relation.cachedPlan) - case _: AQEShuffleReadExec => // Ignore - case p: SparkPlan => - handleVanillaSparkPlan(p, fallbackNodeToReason) - p.innerChildren.foreach(collect) - case _ => - } + numGlutenNodes += innerNumGlutenNodes + fallbackNodeToReason.++=(innerFallbackNodeToReason) + case p: QueryStageExec => collect(p.plan) + case p: GlutenPlan => + numGlutenNodes += 1 + p.innerChildren.foreach(collect) + case i: InMemoryTableScanExec => + if (PlanUtil.isGlutenTableCache(i)) { + numGlutenNodes += 1 + } else { + addFallbackNodeWithReason(i, "Columnar table cache is disabled", fallbackNodeToReason) + } + collect(i.relation.cachedPlan) + case _: AQEShuffleReadExec => // Ignore + case p: SparkPlan => + handleVanillaSparkPlan(p, fallbackNodeToReason) + p.innerChildren.foreach(collect) + case _ => } - - collect(plan) - (numGlutenNodes, fallbackNodeToReason.toMap) } - private def collectQueryExecutionFallbackSummary(qe: QueryExecution): FallbackSummary = { - var totalNumGlutenNodes = 0 - var totalNumFallbackNodes = 0 - val totalPhysicalPlanDescription = new ArrayBuffer[String]() - val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]() - - def handlePlanWithAQEAndTableCache( - plan: SparkPlan, - logicalPlan: LogicalPlan, - isMaterialized: Boolean): Unit = { - val concat = new PlanStringConcat() - val collectFallbackFunc = Some(plan => collectFallbackNodes(plan)) - val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { - // AQE is not materialized, so the columnar rules are not applied. - // For this case, We apply columnar rules manually with disable AQE. - val qe = dateset.sparkSession.sessionState.executePlan(logicalPlan) - processPlan(qe.executedPlan, concat.append, collectFallbackFunc) - } - } else { - processPlan(plan, concat.append, collectFallbackFunc) - } - totalNumGlutenNodes += numGlutenNodes - totalNumFallbackNodes += fallbackNodeToReason.size - totalPhysicalPlanDescription.append(concat.toString()) - totalFallbackNodeToReason.append(fallbackNodeToReason) - } + collect(plan) + (numGlutenNodes, fallbackNodeToReason.toMap) + } - // For command-like query, e.g., `INSERT INTO TABLE ...` - qe.commandExecuted.foreach { - case r: CommandResult => - handlePlanWithAQEAndTableCache(r.commandPhysicalPlan, r.commandLogicalPlan, true) - case _ => // ignore + // collect fallback sumaary from query execution, make this method public as a util method + def collectQueryExecutionFallbackSummary( + spark: SparkSession, + qe: QueryExecution): FallbackSummary = { + var totalNumGlutenNodes = 0 + var totalNumFallbackNodes = 0 + val totalPhysicalPlanDescription = new ArrayBuffer[String]() + val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]() + + def handlePlanWithAQEAndTableCache( + plan: SparkPlan, + logicalPlan: LogicalPlan, + isMaterialized: Boolean): Unit = { + val concat = new PlanStringConcat() + val collectFallbackFunc = Some(plan => collectFallbackNodes(spark, plan)) + val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + // AQE is not materialized, so the columnar rules are not applied. + // For this case, We apply columnar rules manually with disable AQE. + val qe = spark.sessionState.executePlan(logicalPlan) + processPlan(qe.executedPlan, concat.append, collectFallbackFunc) + } + } else { + processPlan(plan, concat.append, collectFallbackFunc) } + totalNumGlutenNodes += numGlutenNodes + totalNumFallbackNodes += fallbackNodeToReason.size + totalPhysicalPlanDescription.append(concat.toString()) + totalFallbackNodeToReason.append(fallbackNodeToReason) + } - // For query, e.g., `SELECT * FROM ...` - if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) { - val isMaterialized = qe.executedPlan.find { - case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true - case _ => false - }.isDefined - handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed, isMaterialized) - } + // For command-like query, e.g., `INSERT INTO TABLE ...` + qe.commandExecuted.foreach { + case r: CommandResult => + handlePlanWithAQEAndTableCache(r.commandPhysicalPlan, r.commandLogicalPlan, true) + case _ => // ignore + } - FallbackSummary( - totalNumGlutenNodes, - totalNumFallbackNodes, - totalPhysicalPlanDescription.toSeq, - totalFallbackNodeToReason.toSeq - ) + // For query, e.g., `SELECT * FROM ...` + if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) { + val isMaterialized = qe.executedPlan.find { + case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true + case _ => false + }.isDefined + handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed, isMaterialized) } + FallbackSummary( + totalNumGlutenNodes, + totalNumFallbackNodes, + totalPhysicalPlanDescription.toSeq, + totalFallbackNodeToReason.toSeq + ) + } + + implicit class DatasetTransformer[T](dateset: Dataset[T]) { def fallbackSummary(): FallbackSummary = { - collectQueryExecutionFallbackSummary(dateset.queryExecution) + collectQueryExecutionFallbackSummary(dateset.sparkSession, dateset.queryExecution) } } }