From c4a4d1d2a900e698d685d752f4bb828f540e73f2 Mon Sep 17 00:00:00 2001 From: Joey Date: Wed, 6 Mar 2024 19:43:12 +0800 Subject: [PATCH] [GLUTEN-4853][CORE] Only trim Alias when its child is semantically equal to resAttr (#4857) --- .../columnar/PullOutPostProject.scala | 10 ++++-- .../clickhouse/ClickHouseTestSettings.scala | 1 + .../utils/velox/VeloxTestSettings.scala | 1 + .../GlutenSQLAggregateFunctionSuite.scala | 36 +++++++++++++++++++ .../clickhouse/ClickHouseTestSettings.scala | 1 + .../utils/velox/VeloxTestSettings.scala | 3 +- .../GlutenSQLAggregateFunctionSuite.scala | 36 +++++++++++++++++++ .../clickhouse/ClickHouseTestSettings.scala | 1 + .../utils/velox/VeloxTestSettings.scala | 3 +- .../GlutenSQLAggregateFunctionSuite.scala | 36 +++++++++++++++++++ 10 files changed, 123 insertions(+), 5 deletions(-) create mode 100644 gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala create mode 100644 gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala create mode 100644 gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/PullOutPostProject.scala b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/PullOutPostProject.scala index 97590a5ad5a62..0a39ef8196b9f 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/PullOutPostProject.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/PullOutPostProject.scala @@ -19,7 +19,7 @@ package io.glutenproject.extension.columnar import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.utils.PullOutProjectHelper -import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, Attribute, NamedExpression, WindowExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression, WindowExpression} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec @@ -33,7 +33,7 @@ import scala.collection.mutable.ArrayBuffer * the output of Spark, ensuring that the output data of the native plan can match the Spark plan * when a fallback occurs. */ -object PullOutPostProject extends Rule[SparkPlan] with PullOutProjectHelper with AliasHelper { +object PullOutPostProject extends Rule[SparkPlan] with PullOutProjectHelper { private def needsPostProjection(plan: SparkPlan): Boolean = { plan match { @@ -49,7 +49,11 @@ object PullOutPostProject extends Rule[SparkPlan] with PullOutProjectHelper with agg.resultExpressions.size != allAggregateResultAttributes.size || // Compare each item in result expressions and output attributes. Attribute in Alias // should be trimmed before checking. - agg.resultExpressions.map(trimAliases).zip(allAggregateResultAttributes).exists { + agg.resultExpressions.zip(allAggregateResultAttributes).exists { + case (alias: Alias, resAttr) => + // If the child of the Alias is semantically equal to the corresponding Attribute + // in the native result attributes, that Alias can be trimmed. + !alias.child.semanticEquals(resAttr) case (exprAttr: Attribute, resAttr) => // If the result attribute and result expression has different name or type, // post-projection is needed. diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala index 13769ef6cde78..e872e53d4fa99 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala @@ -1021,6 +1021,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-23207: Make repartition() generate consistent output") .exclude("Exchange reuse across the whole plan") enableSuite[GlutenReuseExchangeAndSubquerySuite] + enableSuite[GlutenSQLAggregateFunctionSuite] enableSuite[GlutenSQLWindowFunctionSuite] .exclude("window function: partition and order expressions") .exclude("window function: expressions in arguments of a window functions") diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index df48a05b2f694..1469606d01495 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -307,6 +307,7 @@ class VeloxTestSettings extends BackendTestSettings { "pivot with null and aggregate type not supported by PivotFirst returns correct result") enableSuite[GlutenReuseExchangeAndSubquerySuite] enableSuite[GlutenSameResultSuite] + enableSuite[GlutenSQLAggregateFunctionSuite] // spill not supported yet. enableSuite[GlutenSQLWindowFunctionSuite].exclude("test with low buffer spill threshold") enableSuite[GlutenSortSuite] diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala new file mode 100644 index 0000000000000..6046142243aab --- /dev/null +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import io.glutenproject.execution.HashAggregateExecBaseTransformer + +import org.apache.spark.sql.{GlutenSQLTestsTrait, Row} + +class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait { + + testGluten("GLUTEN-4853: The result order is reversed for count and count distinct") { + val query = + """ + |select count(distinct if(sex = 'x', id, null)) as uv, count(if(sex = 'x', id, null)) as pv + |from values (1, 'x'), (1, 'x'), (2, 'y'), (3, 'x'), (3, 'x'), (4, 'y'), (5, 'x') + |AS tab(id, sex) + |""".stripMargin + val df = sql(query) + checkAnswer(df, Seq(Row(3, 5))) + assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer]) == 4) + } +} diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala index 161aa0df78961..9fc37dc0cdb02 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala @@ -1058,6 +1058,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("do not replace hash aggregate if there is no group-by column") .excludeGlutenTest("replace partial hash aggregate with sort aggregate") enableSuite[GlutenReuseExchangeAndSubquerySuite] + enableSuite[GlutenSQLAggregateFunctionSuite] enableSuite[GlutenSQLWindowFunctionSuite] .exclude("window function: partition and order expressions") .exclude("window function: expressions in arguments of a window functions") diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index c369587ab6107..355fa92012bcd 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{GlutenAnsiCastSuiteWithAnsiModeOff, GlutenAnsiCastSuiteWithAnsiModeOn, GlutenArithmeticExpressionSuite, GlutenBitwiseExpressionsSuite, GlutenCastSuite, GlutenCastSuiteWithAnsiModeOn, GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite, GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite, GlutenDecimalExpressionSuite, GlutenHashExpressionsSuite, GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite, GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite, GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPredicateSuite, GlutenRandomSuite, GlutenRegexpExpressionsSuite, GlutenSortOrderExpressionsSuite, GlutenStringExpressionsSuite, GlutenTryCastSuite} import org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalogSuite, GlutenDataSourceV2DataFrameSuite, GlutenDataSourceV2FunctionSuite, GlutenDataSourceV2SQLSessionCatalogSuite, GlutenDataSourceV2SQLSuite, GlutenDataSourceV2Suite, GlutenDeleteFromTableSuite, GlutenFileDataSourceV2FallBackSuite, GlutenKeyGroupedPartitioningSuite, GlutenLocalScanSuite, GlutenMetadataColumnSuite, GlutenSupportsCatalogOptionsSuite, GlutenTableCapabilityCheckSuite, GlutenWriteDistributionAndOrderingSuite} import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, GlutenQueryParsingErrorsSuite} -import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite} +import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite} import org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, GlutenFileFormatWriterSuite, GlutenFileIndexSuite, GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite, GlutenOrcReadSchemaSuite, GlutenOrcV1AggregatePushDownSuite, GlutenOrcV2AggregatePushDownSuite, GlutenParquetCodecSuite, GlutenParquetReadSchemaSuite, GlutenParquetV1AggregatePushDownSuite, GlutenParquetV2AggregatePushDownSuite, GlutenPathFilterStrategySuite, GlutenPathFilterSuite, GlutenPruneFileSourcePartitionsSuite, GlutenVectorizedOrcReadSchemaSuite, GlutenVectorizedParquetReadSchemaSuite, GlutenWriterColumnarRulesSuite} import org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite @@ -856,6 +856,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenReuseExchangeAndSubquerySuite] enableSuite[GlutenSameResultSuite] enableSuite[GlutenSortSuite] + enableSuite[GlutenSQLAggregateFunctionSuite] // spill not supported yet. enableSuite[GlutenSQLWindowFunctionSuite].exclude("test with low buffer spill threshold") enableSuite[GlutenTakeOrderedAndProjectSuite] diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala new file mode 100644 index 0000000000000..6046142243aab --- /dev/null +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import io.glutenproject.execution.HashAggregateExecBaseTransformer + +import org.apache.spark.sql.{GlutenSQLTestsTrait, Row} + +class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait { + + testGluten("GLUTEN-4853: The result order is reversed for count and count distinct") { + val query = + """ + |select count(distinct if(sex = 'x', id, null)) as uv, count(if(sex = 'x', id, null)) as pv + |from values (1, 'x'), (1, 'x'), (2, 'y'), (3, 'x'), (3, 'x'), (4, 'y'), (5, 'x') + |AS tab(id, sex) + |""".stripMargin + val df = sql(query) + checkAnswer(df, Seq(Row(3, 5))) + assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer]) == 4) + } +} diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala index 1c18bda43b48a..01130181206b1 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala @@ -837,6 +837,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .excludeGlutenTest("replace partial hash aggregate with sort aggregate") .excludeGlutenTest("replace partial and final hash aggregate together with sort aggregate") enableSuite[GlutenReuseExchangeAndSubquerySuite] + enableSuite[GlutenSQLAggregateFunctionSuite] enableSuite[GlutenSQLWindowFunctionSuite] .exclude("window function: partition and order expressions") .exclude("window function: expressions in arguments of a window functions") diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index aeff1358869fc..b827bd2ff8018 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{GlutenArithmeticExpressionSuite, GlutenBitwiseExpressionsSuite, GlutenCastSuite, GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite, GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite, GlutenDecimalExpressionSuite, GlutenExpressionMappingSuite, GlutenHashExpressionsSuite, GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite, GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite, GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPredicateSuite, GlutenRandomSuite, GlutenRegexpExpressionsSuite, GlutenSortOrderExpressionsSuite, GlutenStringExpressionsSuite} import org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalogSuite, GlutenDataSourceV2DataFrameSuite, GlutenDataSourceV2FunctionSuite, GlutenDataSourceV2SQLSessionCatalogSuite, GlutenDataSourceV2SQLSuiteV1Filter, GlutenDataSourceV2SQLSuiteV2Filter, GlutenDataSourceV2Suite, GlutenDeleteFromTableSuite, GlutenFileDataSourceV2FallBackSuite, GlutenKeyGroupedPartitioningSuite, GlutenLocalScanSuite, GlutenMetadataColumnSuite, GlutenSupportsCatalogOptionsSuite, GlutenTableCapabilityCheckSuite, GlutenWriteDistributionAndOrderingSuite} import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, GlutenQueryParsingErrorsSuite} -import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite} +import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite} import org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, GlutenFileFormatWriterSuite, GlutenFileIndexSuite, GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite, GlutenOrcReadSchemaSuite, GlutenOrcV1AggregatePushDownSuite, GlutenOrcV2AggregatePushDownSuite, GlutenParquetCodecSuite, GlutenParquetReadSchemaSuite, GlutenParquetV1AggregatePushDownSuite, GlutenParquetV2AggregatePushDownSuite, GlutenPathFilterStrategySuite, GlutenPathFilterSuite, GlutenPruneFileSourcePartitionsSuite, GlutenV1WriteCommandSuite, GlutenVectorizedOrcReadSchemaSuite, GlutenVectorizedParquetReadSchemaSuite} import org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite @@ -848,6 +848,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenReuseExchangeAndSubquerySuite] enableSuite[GlutenSameResultSuite] enableSuite[GlutenSortSuite] + enableSuite[GlutenSQLAggregateFunctionSuite] // spill not supported yet. enableSuite[GlutenSQLWindowFunctionSuite].exclude("test with low buffer spill threshold") enableSuite[GlutenTakeOrderedAndProjectSuite] diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala new file mode 100644 index 0000000000000..6046142243aab --- /dev/null +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLAggregateFunctionSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import io.glutenproject.execution.HashAggregateExecBaseTransformer + +import org.apache.spark.sql.{GlutenSQLTestsTrait, Row} + +class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait { + + testGluten("GLUTEN-4853: The result order is reversed for count and count distinct") { + val query = + """ + |select count(distinct if(sex = 'x', id, null)) as uv, count(if(sex = 'x', id, null)) as pv + |from values (1, 'x'), (1, 'x'), (2, 'y'), (3, 'x'), (3, 'x'), (4, 'y'), (5, 'x') + |AS tab(id, sex) + |""".stripMargin + val df = sql(query) + checkAnswer(df, Seq(Row(3, 5))) + assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer]) == 4) + } +}