Skip to content

Commit

Permalink
Remove duplicate code
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Mar 21, 2024
1 parent ba4bada commit 187ba36
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 59 deletions.
2 changes: 1 addition & 1 deletion core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::Stream;
use itertools::Itertools;

use arrow::compute::{cast_with_options, CastOptions};
use arrow_array::{make_array, Array, ArrayRef, RecordBatch, RecordBatchOptions};
use arrow_array::{make_array, ArrayRef, RecordBatch, RecordBatchOptions};
use arrow_data::ArrayData;
use arrow_schema::{DataType, Field, Schema, SchemaRef};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,27 +335,6 @@ class CometSparkSessionExtensions
op
}

case op: ShuffledHashJoinExec
if isCometOperatorEnabled(conf, "hash_join") &&
op.children.forall(isCometNative(_)) =>
val newOp = transform1(op)
newOp match {
case Some(nativeOp) =>
CometHashJoinExec(
nativeOp,
op,
op.leftKeys,
op.rightKeys,
op.joinType,
op.condition,
op.buildSide,
op.left,
op.right,
SerializedPlan(None))
case None =>
op
}

case op: SortMergeJoinExec
if isCometOperatorEnabled(conf, "sort_merge_join") &&
op.children.forall(isCometNative(_)) =>
Expand Down
37 changes: 0 additions & 37 deletions spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -589,43 +589,6 @@ case class CometHashAggregateExec(
Objects.hashCode(groupingExpressions, aggregateExpressions, input, mode, child)
}

case class CometHashJoinExec(
override val nativeOp: Operator,
override val originalPlan: SparkPlan,
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
joinType: JoinType,
condition: Option[Expression],
buildSide: BuildSide,
override val left: SparkPlan,
override val right: SparkPlan,
override val serializedPlanOpt: SerializedPlan)
extends CometBinaryExec {
override def withNewChildrenInternal(newLeft: SparkPlan, newRight: SparkPlan): SparkPlan =
this.copy(left = newLeft, right = newRight)

override def stringArgs: Iterator[Any] =
Iterator(leftKeys, rightKeys, joinType, condition, left, right)

override def equals(obj: Any): Boolean = {
obj match {
case other: CometHashJoinExec =>
this.leftKeys == other.leftKeys &&
this.rightKeys == other.rightKeys &&
this.condition == other.condition &&
this.buildSide == other.buildSide &&
this.left == other.left &&
this.right == other.right &&
this.serializedPlanOpt == other.serializedPlanOpt
case _ =>
false
}
}

override def hashCode(): Int =
Objects.hashCode(leftKeys, rightKeys, condition, left, right)
}

case class CometSortMergeJoinExec(
override val nativeOp: Operator,
override val originalPlan: SparkPlan,
Expand Down

0 comments on commit 187ba36

Please sign in to comment.