diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala index 1e5dda6024f..139ca00563a 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala @@ -21,7 +21,7 @@ import java.util.Optional import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf -import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, GroupByAggregation, GroupByOptions, Scalar, SegmentedReductionAggregation, Table} +import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, Scalar, SegmentedReductionAggregation, Table} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.ArrayIndexUtils.firstIndexAndNumElementUnchecked import com.nvidia.spark.rapids.BoolUtils.isAllValidTrue @@ -395,56 +395,6 @@ case class GpuSortArray(base: Expression, ascendingOrder: Expression) } } -// TODO switch over to array aggregations once -// https://github.com/rapidsai/cudf/issues/10417 is done -object SlowGpuArrayAgg extends Arm{ - def reallySlow(input: GpuColumnVector, agg: GroupByAggregation): cudf.ColumnVector = { - val baseInput = input.getBase - val inputTab = withResource(Scalar.fromInt(0)) { zero => - withResource(cudf.ColumnVector.sequence(zero, input.getRowCount.toInt)) { rowNums => - new cudf.Table(rowNums, baseInput) - } - } - - val explodedTab = withResource(inputTab) { inputTab => - inputTab.explodeOuter(1) - } - - val retTab = withResource(explodedTab) { explodedTab => - explodedTab.groupBy(GroupByOptions.builder() - .withKeysSorted(true) - .withIgnoreNullKeys(true) - .build(), 0) - .aggregate(agg.onColumn(1)) - } - - withResource(retTab) { retTab => - assert(retTab.getRowCount == baseInput.getRowCount) - retTab.getColumn(1).incRefCount() - } - } - - def bitCastDecimal( - input: GpuColumnVector, - agg: SegmentedReductionAggregation, - tmpType: DType, - resultType: DType): cudf.ColumnVector = { - val base = input.getBase - val tmpResult = withResource(base.getChildColumnView(0)) { dataCol => - withResource(dataCol.bitCastTo(tmpType)) { castDataCol => - withResource(base.replaceListChild(castDataCol)) { bitCastInput => - bitCastInput.listReduce(agg) - } - } - } - withResource(tmpResult) { tmpResult => - withResource(tmpResult.bitCastTo(resultType)) { bitCastResult => - bitCastResult.copyToColumnVector() - } - } - } -} - case class GpuArrayMin(child: Expression) extends GpuUnaryExpression with ImplicitCastInputTypes { override def nullable: Boolean = true @@ -459,22 +409,7 @@ case class GpuArrayMin(child: Expression) extends GpuUnaryExpression with Implic override def prettyName: String = "array_min" override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = - dataType match { - case StringType => - SlowGpuArrayAgg.reallySlow(input, GroupByAggregation.min()) - case dt: DecimalType => - if (dt.precision > Decimal.MAX_LONG_DIGITS) { - SlowGpuArrayAgg.reallySlow(input, GroupByAggregation.min()) - } else if (dt.precision > Decimal.MAX_INT_DIGITS) { - SlowGpuArrayAgg.bitCastDecimal(input, SegmentedReductionAggregation.min(), - DType.INT64, GpuColumnVector.getNonNestedRapidsType(dataType)) - } else { - SlowGpuArrayAgg.bitCastDecimal(input, SegmentedReductionAggregation.min(), - DType.INT32, GpuColumnVector.getNonNestedRapidsType(dataType)) - } - case _ => - input.getBase.listReduce(SegmentedReductionAggregation.min()) - } + input.getBase.listReduce(SegmentedReductionAggregation.min()) } case class GpuArrayMax(child: Expression) extends GpuUnaryExpression with ImplicitCastInputTypes { @@ -491,22 +426,7 @@ case class GpuArrayMax(child: Expression) extends GpuUnaryExpression with Implic override def prettyName: String = "array_max" override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = - dataType match { - case StringType => - SlowGpuArrayAgg.reallySlow(input, GroupByAggregation.max()) - case dt: DecimalType => - if (dt.precision > Decimal.MAX_LONG_DIGITS) { - SlowGpuArrayAgg.reallySlow(input, GroupByAggregation.max()) - } else if (dt.precision > Decimal.MAX_INT_DIGITS) { - SlowGpuArrayAgg.bitCastDecimal(input, SegmentedReductionAggregation.max(), - DType.INT64, GpuColumnVector.getNonNestedRapidsType(dataType)) - } else { - SlowGpuArrayAgg.bitCastDecimal(input, SegmentedReductionAggregation.max(), - DType.INT32, GpuColumnVector.getNonNestedRapidsType(dataType)) - } - case _ => - input.getBase.listReduce(SegmentedReductionAggregation.max()) - } + input.getBase.listReduce(SegmentedReductionAggregation.max()) } case class GpuArrayRepeat(left: Expression, right: Expression) extends GpuBinaryExpression {