Skip to content

Commit

Permalink
[CORE] fix CoalesceExec (#3372)
Browse files Browse the repository at this point in the history
This operator should be removed out of WholeStageTransform

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
zhouyuan authored Oct 18, 2023
1 parent 1ab906a commit 353c45a
Showing 1 changed file with 10 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*/
package io.glutenproject.execution

import io.glutenproject.extension.ValidationResult
import io.glutenproject.metrics.{MetricsUpdater, NoopMetricsUpdater}
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.extension.{GlutenPlan, ValidationResult}

import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
Expand All @@ -30,7 +28,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch

case class CoalesceExecTransformer(numPartitions: Int, child: SparkPlan)
extends UnaryExecNode
with TransformSupport {
with GlutenPlan {

override def supportsColumnar: Boolean = true

Expand All @@ -40,40 +38,26 @@ case class CoalesceExecTransformer(numPartitions: Int, child: SparkPlan)
if (numPartitions == 1) SinglePartition else UnknownPartitioning(numPartitions)
}

override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
throw new UnsupportedOperationException(s"This operator doesn't support inputRDDs.")
}

override def getBuildPlans: Seq[(SparkPlan, SparkPlan)] = {
throw new UnsupportedOperationException(s"This operator doesn't support getBuildPlans.")
}

override def getStreamedLeafPlan: SparkPlan = child match {
case c: TransformSupport =>
c.getStreamedLeafPlan
case _ =>
this
}

override protected def doValidateInternal(): ValidationResult =
ValidationResult.notOk(s"$nodeName has not been supported")

override def doTransform(context: SubstraitContext): TransformContext = {
throw new UnsupportedOperationException(s"$nodeName doesn't support doTransform.")
}
ValidationResult.ok

override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException()
}

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
throw new UnsupportedOperationException(s"$nodeName doesn't support doExecuteColumnar.")
if (numPartitions == 1 && child.executeColumnar().getNumPartitions < 1) {
// Make sure we don't output an RDD with 0 partitions, when claiming that we have a
// `SinglePartition`.
new CoalesceExecTransformer.EmptyRDDWithPartitions(sparkContext, numPartitions)
} else {
child.executeColumnar().coalesce(numPartitions, shuffle = false)
}
}

override protected def withNewChildInternal(newChild: SparkPlan): CoalesceExecTransformer =
copy(child = newChild)

override def metricsUpdater(): MetricsUpdater = new NoopMetricsUpdater
}

object CoalesceExecTransformer {
Expand Down

0 comments on commit 353c45a

Please sign in to comment.