From 353c45ad10649c699726709415478f7c54bd4b4a Mon Sep 17 00:00:00 2001 From: Yuan Date: Wed, 18 Oct 2023 15:07:55 +0800 Subject: [PATCH] [CORE] fix CoalesceExec (#3372) This operator should be removed out of WholeStageTransform Signed-off-by: Yuan Zhou --- .../execution/CoalesceExecTransformer.scala | 36 ++++++------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/CoalesceExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/CoalesceExecTransformer.scala index 1e488af1ba62..5ffdaf19d3e6 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/CoalesceExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/CoalesceExecTransformer.scala @@ -16,9 +16,7 @@ */ package io.glutenproject.execution -import io.glutenproject.extension.ValidationResult -import io.glutenproject.metrics.{MetricsUpdater, NoopMetricsUpdater} -import io.glutenproject.substrait.SubstraitContext +import io.glutenproject.extension.{GlutenPlan, ValidationResult} import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD @@ -30,7 +28,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch case class CoalesceExecTransformer(numPartitions: Int, child: SparkPlan) extends UnaryExecNode - with TransformSupport { + with GlutenPlan { override def supportsColumnar: Boolean = true @@ -40,40 +38,26 @@ case class CoalesceExecTransformer(numPartitions: Int, child: SparkPlan) if (numPartitions == 1) SinglePartition else UnknownPartitioning(numPartitions) } - override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = { - throw new UnsupportedOperationException(s"This operator doesn't support inputRDDs.") - } - - override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = { - throw new UnsupportedOperationException(s"This operator doesn't support getBuildPlans.") - } - - override def getStreamedLeafPlan: SparkPlan = child match { - case c: TransformSupport => - c.getStreamedLeafPlan - case _ => - this - } - override protected def doValidateInternal(): ValidationResult = - ValidationResult.notOk(s"$nodeName has not been supported") - - override def doTransform(context: SubstraitContext): TransformContext = { - throw new UnsupportedOperationException(s"$nodeName doesn't support doTransform.") - } + ValidationResult.ok override protected def doExecute(): RDD[InternalRow] = { throw new UnsupportedOperationException() } override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { - throw new UnsupportedOperationException(s"$nodeName doesn't support doExecuteColumnar.") + if (numPartitions == 1 && child.executeColumnar().getNumPartitions < 1) { + // Make sure we don't output an RDD with 0 partitions, when claiming that we have a + // `SinglePartition`. + new CoalesceExecTransformer.EmptyRDDWithPartitions(sparkContext, numPartitions) + } else { + child.executeColumnar().coalesce(numPartitions, shuffle = false) + } } override protected def withNewChildInternal(newChild: SparkPlan): CoalesceExecTransformer = copy(child = newChild) - override def metricsUpdater(): MetricsUpdater = new NoopMetricsUpdater } object CoalesceExecTransformer {