From 3766c0e97ffc043c8052d25077791ed263bbc3d7 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Fri, 27 Aug 2021 13:52:59 +0800 Subject: [PATCH] refine Signed-off-by: sperlingxx --- .../shims/spark311cdh/Spark311CDHShims.scala | 1 - .../GpuColumnarToRowTransitionExec.scala | 7 +++++-- .../com/nvidia/spark/rapids/RapidsMeta.scala | 19 +++++++++++++------ 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/shims/spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala b/shims/spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala index 40c2c25511f..876d568d622 100644 --- a/shims/spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala +++ b/shims/spark311cdh/src/main/scala/com/nvidia/spark/rapids/shims/spark311cdh/Spark311CDHShims.scala @@ -22,7 +22,6 @@ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.spark311cdh.RapidsShuffleManager import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} diff --git a/shims/spark313/src/main/scala/org/apache/spark/sql/rapids/shims/spark313/GpuColumnarToRowTransitionExec.scala b/shims/spark313/src/main/scala/org/apache/spark/sql/rapids/shims/spark313/GpuColumnarToRowTransitionExec.scala index d5be5e14c25..15792ebc72c 100644 --- a/shims/spark313/src/main/scala/org/apache/spark/sql/rapids/shims/spark313/GpuColumnarToRowTransitionExec.scala +++ b/shims/spark313/src/main/scala/org/apache/spark/sql/rapids/shims/spark313/GpuColumnarToRowTransitionExec.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql.rapids.shims.spark313 import com.nvidia.spark.rapids.GpuColumnarToRowExecParent +import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} case class GpuColumnarToRowTransitionExec(child: SparkPlan, - override val exportColumnarRdd: Boolean = false) - extends GpuColumnarToRowExecParent(child, exportColumnarRdd) with ColumnarToRowTransition + override val exportColumnarRdd: Boolean = false, + override val postProjection: Seq[NamedExpression] = Seq.empty) + extends GpuColumnarToRowExecParent(child, exportColumnarRdd, postProjection) + with ColumnarToRowTransition diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 8b274ee69a0..27063c8ee38 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -1123,19 +1123,26 @@ abstract class TypedImperativeAggExprMeta[INPUT <: TypedImperativeAggregate[_]]( def aggBufferAttribute: AttributeReference /** - * Returns a buffer converter who can generate a Expression to transform the aggregation - * buffer of wrapped function from CPU format to GPU format. + * Returns a buffer converter who can generate a Expression to transform the aggregation buffer + * of wrapped function from CPU format to GPU format. The conversion occurs on the CPU, so the + * generated expression should be a CPU Expression executed by row. */ def createCpuToGpuBufferConverter(): CpuToGpuAggregateBufferConverter = - throw new NotImplementedError("") + throw new NotImplementedError("The method should be implemented by specific functions") /** - * Returns a buffer converter who can generate a Expression to transform the aggregation - * buffer of wrapped function from GPU format to CPU format. + * Returns a buffer converter who can generate a Expression to transform the aggregation buffer + * of wrapped function from GPU format to CPU format. The conversion occurs on the CPU, so the + * generated expression should be a CPU Expression executed by row. */ def createGpuToCpuBufferConverter(): GpuToCpuAggregateBufferConverter = - throw new NotImplementedError("") + throw new NotImplementedError("The method should be implemented by specific functions") + /** + * Whether buffers of current Aggregate is able to be converted from CPU to GPU format and + * reversely in runtime. If true, it assumes both createCpuToGpuBufferConverter and + * createGpuToCpuBufferConverter are implemented. + */ val supportBufferConversion: Boolean = false }