Skip to content

Commit

Permalink
[GLUTEN-4853][CORE] Only trim Alias when its child is semantically eq…
Browse files Browse the repository at this point in the history
…ual to resAttr (apache#4857)
  • Loading branch information
liujiayi771 authored and taiyang-li committed Mar 25, 2024
1 parent b8d2345 commit c4a4d1d
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package io.glutenproject.extension.columnar
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.utils.PullOutProjectHelper

import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, Attribute, NamedExpression, WindowExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression, WindowExpression}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
Expand All @@ -33,7 +33,7 @@ import scala.collection.mutable.ArrayBuffer
* the output of Spark, ensuring that the output data of the native plan can match the Spark plan
* when a fallback occurs.
*/
object PullOutPostProject extends Rule[SparkPlan] with PullOutProjectHelper with AliasHelper {
object PullOutPostProject extends Rule[SparkPlan] with PullOutProjectHelper {

private def needsPostProjection(plan: SparkPlan): Boolean = {
plan match {
Expand All @@ -49,7 +49,11 @@ object PullOutPostProject extends Rule[SparkPlan] with PullOutProjectHelper with
agg.resultExpressions.size != allAggregateResultAttributes.size ||
// Compare each item in result expressions and output attributes. Attribute in Alias
// should be trimmed before checking.
agg.resultExpressions.map(trimAliases).zip(allAggregateResultAttributes).exists {
agg.resultExpressions.zip(allAggregateResultAttributes).exists {
case (alias: Alias, resAttr) =>
// If the child of the Alias is semantically equal to the corresponding Attribute
// in the native result attributes, that Alias can be trimmed.
!alias.child.semanticEquals(resAttr)
case (exprAttr: Attribute, resAttr) =>
// If the result attribute and result expression has different name or type,
// post-projection is needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("SPARK-23207: Make repartition() generate consistent output")
.exclude("Exchange reuse across the whole plan")
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenSQLAggregateFunctionSuite]
enableSuite[GlutenSQLWindowFunctionSuite]
.exclude("window function: partition and order expressions")
.exclude("window function: expressions in arguments of a window functions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ class VeloxTestSettings extends BackendTestSettings {
"pivot with null and aggregate type not supported by PivotFirst returns correct result")
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenSameResultSuite]
enableSuite[GlutenSQLAggregateFunctionSuite]
// spill not supported yet.
enableSuite[GlutenSQLWindowFunctionSuite].exclude("test with low buffer spill threshold")
enableSuite[GlutenSortSuite]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import io.glutenproject.execution.HashAggregateExecBaseTransformer

import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}

class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {

testGluten("GLUTEN-4853: The result order is reversed for count and count distinct") {
val query =
"""
|select count(distinct if(sex = 'x', id, null)) as uv, count(if(sex = 'x', id, null)) as pv
|from values (1, 'x'), (1, 'x'), (2, 'y'), (3, 'x'), (3, 'x'), (4, 'y'), (5, 'x')
|AS tab(id, sex)
|""".stripMargin
val df = sql(query)
checkAnswer(df, Seq(Row(3, 5)))
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer]) == 4)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("do not replace hash aggregate if there is no group-by column")
.excludeGlutenTest("replace partial hash aggregate with sort aggregate")
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenSQLAggregateFunctionSuite]
enableSuite[GlutenSQLWindowFunctionSuite]
.exclude("window function: partition and order expressions")
.exclude("window function: expressions in arguments of a window functions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{GlutenAnsiCastSuiteWithAnsiModeOff, GlutenAnsiCastSuiteWithAnsiModeOn, GlutenArithmeticExpressionSuite, GlutenBitwiseExpressionsSuite, GlutenCastSuite, GlutenCastSuiteWithAnsiModeOn, GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite, GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite, GlutenDecimalExpressionSuite, GlutenHashExpressionsSuite, GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite, GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite, GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPredicateSuite, GlutenRandomSuite, GlutenRegexpExpressionsSuite, GlutenSortOrderExpressionsSuite, GlutenStringExpressionsSuite, GlutenTryCastSuite}
import org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalogSuite, GlutenDataSourceV2DataFrameSuite, GlutenDataSourceV2FunctionSuite, GlutenDataSourceV2SQLSessionCatalogSuite, GlutenDataSourceV2SQLSuite, GlutenDataSourceV2Suite, GlutenDeleteFromTableSuite, GlutenFileDataSourceV2FallBackSuite, GlutenKeyGroupedPartitioningSuite, GlutenLocalScanSuite, GlutenMetadataColumnSuite, GlutenSupportsCatalogOptionsSuite, GlutenTableCapabilityCheckSuite, GlutenWriteDistributionAndOrderingSuite}
import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, GlutenQueryParsingErrorsSuite}
import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
import org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite
import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, GlutenFileFormatWriterSuite, GlutenFileIndexSuite, GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite, GlutenOrcReadSchemaSuite, GlutenOrcV1AggregatePushDownSuite, GlutenOrcV2AggregatePushDownSuite, GlutenParquetCodecSuite, GlutenParquetReadSchemaSuite, GlutenParquetV1AggregatePushDownSuite, GlutenParquetV2AggregatePushDownSuite, GlutenPathFilterStrategySuite, GlutenPathFilterSuite, GlutenPruneFileSourcePartitionsSuite, GlutenVectorizedOrcReadSchemaSuite, GlutenVectorizedParquetReadSchemaSuite, GlutenWriterColumnarRulesSuite}
import org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite
Expand Down Expand Up @@ -856,6 +856,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenSameResultSuite]
enableSuite[GlutenSortSuite]
enableSuite[GlutenSQLAggregateFunctionSuite]
// spill not supported yet.
enableSuite[GlutenSQLWindowFunctionSuite].exclude("test with low buffer spill threshold")
enableSuite[GlutenTakeOrderedAndProjectSuite]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import io.glutenproject.execution.HashAggregateExecBaseTransformer

import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}

class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {

testGluten("GLUTEN-4853: The result order is reversed for count and count distinct") {
val query =
"""
|select count(distinct if(sex = 'x', id, null)) as uv, count(if(sex = 'x', id, null)) as pv
|from values (1, 'x'), (1, 'x'), (2, 'y'), (3, 'x'), (3, 'x'), (4, 'y'), (5, 'x')
|AS tab(id, sex)
|""".stripMargin
val df = sql(query)
checkAnswer(df, Seq(Row(3, 5)))
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer]) == 4)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.excludeGlutenTest("replace partial hash aggregate with sort aggregate")
.excludeGlutenTest("replace partial and final hash aggregate together with sort aggregate")
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenSQLAggregateFunctionSuite]
enableSuite[GlutenSQLWindowFunctionSuite]
.exclude("window function: partition and order expressions")
.exclude("window function: expressions in arguments of a window functions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{GlutenArithmeticExpressionSuite, GlutenBitwiseExpressionsSuite, GlutenCastSuite, GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite, GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite, GlutenDecimalExpressionSuite, GlutenExpressionMappingSuite, GlutenHashExpressionsSuite, GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite, GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite, GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPredicateSuite, GlutenRandomSuite, GlutenRegexpExpressionsSuite, GlutenSortOrderExpressionsSuite, GlutenStringExpressionsSuite}
import org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalogSuite, GlutenDataSourceV2DataFrameSuite, GlutenDataSourceV2FunctionSuite, GlutenDataSourceV2SQLSessionCatalogSuite, GlutenDataSourceV2SQLSuiteV1Filter, GlutenDataSourceV2SQLSuiteV2Filter, GlutenDataSourceV2Suite, GlutenDeleteFromTableSuite, GlutenFileDataSourceV2FallBackSuite, GlutenKeyGroupedPartitioningSuite, GlutenLocalScanSuite, GlutenMetadataColumnSuite, GlutenSupportsCatalogOptionsSuite, GlutenTableCapabilityCheckSuite, GlutenWriteDistributionAndOrderingSuite}
import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, GlutenQueryParsingErrorsSuite}
import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
import org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite
import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, GlutenFileFormatWriterSuite, GlutenFileIndexSuite, GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite, GlutenOrcReadSchemaSuite, GlutenOrcV1AggregatePushDownSuite, GlutenOrcV2AggregatePushDownSuite, GlutenParquetCodecSuite, GlutenParquetReadSchemaSuite, GlutenParquetV1AggregatePushDownSuite, GlutenParquetV2AggregatePushDownSuite, GlutenPathFilterStrategySuite, GlutenPathFilterSuite, GlutenPruneFileSourcePartitionsSuite, GlutenV1WriteCommandSuite, GlutenVectorizedOrcReadSchemaSuite, GlutenVectorizedParquetReadSchemaSuite}
import org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite
Expand Down Expand Up @@ -848,6 +848,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenSameResultSuite]
enableSuite[GlutenSortSuite]
enableSuite[GlutenSQLAggregateFunctionSuite]
// spill not supported yet.
enableSuite[GlutenSQLWindowFunctionSuite].exclude("test with low buffer spill threshold")
enableSuite[GlutenTakeOrderedAndProjectSuite]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import io.glutenproject.execution.HashAggregateExecBaseTransformer

import org.apache.spark.sql.{GlutenSQLTestsTrait, Row}

class GlutenSQLAggregateFunctionSuite extends GlutenSQLTestsTrait {

testGluten("GLUTEN-4853: The result order is reversed for count and count distinct") {
val query =
"""
|select count(distinct if(sex = 'x', id, null)) as uv, count(if(sex = 'x', id, null)) as pv
|from values (1, 'x'), (1, 'x'), (2, 'y'), (3, 'x'), (3, 'x'), (4, 'y'), (5, 'x')
|AS tab(id, sex)
|""".stripMargin
val df = sql(query)
checkAnswer(df, Seq(Row(3, 5)))
assert(getExecutedPlan(df).count(_.isInstanceOf[HashAggregateExecBaseTransformer]) == 4)
}
}

0 comments on commit c4a4d1d

Please sign in to comment.