diff --git a/.github/workflows/build-ce7-releases.yml b/.github/workflows/build-ce7-releases.yml index d5c1df20..d84825e0 100644 --- a/.github/workflows/build-ce7-releases.yml +++ b/.github/workflows/build-ce7-releases.yml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - sparkver: [spark-3.0, spark-3.1, spark-3.2, spark-3.3, spark-3.5] + sparkver: [spark-3.0, spark-3.1, spark-3.2, spark-3.3, spark-3.4, spark-3.5] blazever: [3.0.1] steps: diff --git a/.github/workflows/tpcds.yml b/.github/workflows/tpcds.yml index f710f377..bf26d1a7 100644 --- a/.github/workflows/tpcds.yml +++ b/.github/workflows/tpcds.yml @@ -31,11 +31,18 @@ jobs: uses: ./.github/workflows/tpcds-reusable.yml with: sparkver: spark-3.3 - sparkurl: https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz + sparkurl: https://archive.apache.org/dist/spark/spark-3.3.4/spark-3.3.4-bin-hadoop3.tgz + + test-spark-34: + name: Test spark-3.4 + uses: ./.github/workflows/tpcds-reusable.yml + with: + sparkver: spark-3.4 + sparkurl: https://archive.apache.org/dist/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz test-spark-35: name: Test spark-3.5 uses: ./.github/workflows/tpcds-reusable.yml with: sparkver: spark-3.5 - sparkurl: https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz + sparkurl: https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz diff --git a/README.md b/README.md index 1526ac31..ee545e12 100644 --- a/README.md +++ b/README.md @@ -80,13 +80,14 @@ Currently we have supported these shims: * spark-3.1 - for spark3.1.x * spark-3.2 - for spark3.2.x * spark-3.3 - for spark3.3.x +* spark-3.4 - for spark3.4.x * spark-3.5 - for spark3.5.x. You could either build Blaze in pre mode for debugging or in release mode to unlock the full potential of Blaze. ```shell -SHIM=spark-3.3 # or spark-3.0/spark-3.1/spark-3.2/spark-3.3/spark-3.5 +SHIM=spark-3.3 # or spark-3.0/spark-3.1/spark-3.2/spark-3.3/spark-3.4/spark-3.5 MODE=release # or pre mvn package -P"${SHIM}" -P"${MODE}" ``` diff --git a/pom.xml b/pom.xml index 24d5c9e7..460dc5c4 100644 --- a/pom.xml +++ b/pom.xml @@ -318,7 +318,21 @@ 2.12.15 3.2.9 3.0.0 - 3.3.3 + 3.3.4 + + + + + spark-3.4 + + spark-3.4 + spark-extension-shims-spark3 + 1.8 + 2.12 + 2.12.15 + 3.2.9 + 3.0.0 + 3.4.3 @@ -332,7 +346,7 @@ 2.12.15 3.2.9 3.0.0 - 3.5.2 + 3.5.3 diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala index 590d904d..4d1c8ef6 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf object InterceptedValidateSparkPlan extends Logging { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) def validate(plan: SparkPlan): Unit = { import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec import org.apache.spark.sql.execution.blaze.plan.NativeRenameColumnsBase @@ -75,7 +77,9 @@ object InterceptedValidateSparkPlan extends Logging { throw new UnsupportedOperationException("validate is not supported in spark 3.0.3 or 3.1.3") } - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) private def errorOnInvalidBroadcastQueryStage(plan: SparkPlan): Unit = { import org.apache.spark.sql.execution.adaptive.InvalidAQEPlanException throw InvalidAQEPlanException("Invalid broadcast query stage", plan) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala index 2d554c66..cf26e487 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala @@ -123,10 +123,14 @@ class ShimsImpl extends Shims with Logging { override def shimVersion: String = "spark-3.2" @enableIf(Seq("spark-3.3").contains(System.getProperty("blaze.shim"))) override def shimVersion: String = "spark-3.3" + @enableIf(Seq("spark-3.4").contains(System.getProperty("blaze.shim"))) + override def shimVersion: String = "spark-3.4" @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) override def shimVersion: String = "spark-3.5" - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override def initExtension(): Unit = { ValidateSparkPlanInjector.inject() @@ -135,7 +139,7 @@ class ShimsImpl extends Shims with Logging { } // disable MultiCommutativeOp suggested in spark3.4+ - if (shimVersion >= "spark340") { + if (shimVersion >= "spark-3.4") { val confName = "spark.sql.analyzer.canonicalization.multiCommutativeOpMemoryOptThreshold" SparkEnv.get.conf.set(confName, Int.MaxValue.toString) } @@ -368,7 +372,9 @@ class ShimsImpl extends Shims with Logging { length: Long, numRecords: Long): FileSegment = new FileSegment(file, offset, length) - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override def commit( dep: ShuffleDependency[_, _, _], shuffleBlockResolver: IndexShuffleBlockResolver, @@ -509,7 +515,9 @@ class ShimsImpl extends Shims with Logging { expr.asInstanceOf[AggregateExpression].filter } - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) private def isAQEShuffleRead(exec: SparkPlan): Boolean = { import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec exec.isInstanceOf[AQEShuffleReadExec] @@ -521,7 +529,9 @@ class ShimsImpl extends Shims with Logging { exec.isInstanceOf[CustomShuffleReaderExec] } - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) private def executeNativeAQEShuffleReader(exec: SparkPlan): NativeRDD = { import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec @@ -779,7 +789,9 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override def getSqlContext(sparkPlan: SparkPlan): SQLContext = sparkPlan.session.sqlContext @@ -808,13 +820,13 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim"))) private def convertPromotePrecision( e: Expression, isPruningExpr: Boolean, fallback: Expression => pb.PhysicalExprNode): Option[pb.PhysicalExprNode] = None - @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.3", "spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim"))) private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = { import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate agg match { @@ -837,7 +849,7 @@ class ShimsImpl extends Shims with Logging { @enableIf(Seq("spark-3.0", "spark-3.1", "spark-3.2").contains(System.getProperty("blaze.shim"))) private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = None - @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.3", "spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim"))) private def convertBloomFilterMightContain( e: Expression, isPruningExpr: Boolean, @@ -869,7 +881,9 @@ class ShimsImpl extends Shims with Logging { case class ForceNativeExecutionWrapper(override val child: SparkPlan) extends ForceNativeExecutionWrapperBase(child) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) @@ -884,6 +898,8 @@ case class NativeExprWrapper( override val nullable: Boolean) extends NativeExprWrapperBase(nativeExpr, dataType, nullable) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy() } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala index e44ea1cd..5de82883 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala @@ -21,7 +21,9 @@ import com.thoughtworks.enableIf case class ConvertToNativeExec(override val child: SparkPlan) extends ConvertToNativeBase(child) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala index 2f91b1fe..8375a5fb 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala @@ -48,12 +48,12 @@ case class NativeAggExec( with BaseAggregateExec { @enableIf( - Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( System.getProperty("blaze.shim"))) override val requiredChildDistributionExpressions: Option[Seq[Expression]] = theRequiredChildDistributionExpressions - @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.3", "spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim"))) override val initialInputBufferOffset: Int = theInitialInputBufferOffset override def output: Seq[Attribute] = @@ -65,15 +65,21 @@ case class NativeAggExec( ExprId.apply(NativeAggBase.AGG_BUF_COLUMN_EXPR_ID)) } - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override def isStreaming: Boolean = false - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override def numShufflePartitions: Option[Int] = None override def resultExpressions: Seq[NamedExpression] = output - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala index 8838d8bc..0a0213b8 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala @@ -42,7 +42,9 @@ case class NativeBroadcastExchangeExec(mode: BroadcastMode, override val child: relationFuturePromise.future } - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala index b0405751..c22fb6eb 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala @@ -27,7 +27,9 @@ case class NativeExpandExec( override val child: SparkPlan) extends NativeExpandBase(projections, output, child) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala index 2a4b06c6..ea0a17aa 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala @@ -23,7 +23,9 @@ import com.thoughtworks.enableIf case class NativeFilterExec(condition: Expression, override val child: SparkPlan) extends NativeFilterBase(condition, child) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala index 3168036f..c372fe6d 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala @@ -29,7 +29,9 @@ case class NativeGenerateExec( override val child: SparkPlan) extends NativeGenerateBase(generator, requiredChildOutput, outer, generatorOutput, child) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala index 25c387e0..d8f18dd8 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf case class NativeGlobalLimitExec(limit: Long, override val child: SparkPlan) extends NativeGlobalLimitBase(limit, child) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala index faf3b28b..bbd8b48e 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf case class NativeLocalLimitExec(limit: Long, override val child: SparkPlan) extends NativeLocalLimitBase(limit, child) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala index 00b8e6d1..8ac2ab16 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala @@ -42,7 +42,7 @@ case class NativeParquetInsertIntoHiveTableExec( ifPartitionNotExists: Boolean, outputColumnNames: Seq[String], metrics: Map[String, SQLMetric]): InsertIntoHiveTable = { - new BlazeInsertIntoHiveTable303( + new BlazeInsertIntoHiveTable30( table, partition, query, @@ -52,7 +52,7 @@ case class NativeParquetInsertIntoHiveTableExec( metrics) } - @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def getInsertIntoHiveTableCommand( table: CatalogTable, partition: Map[String, Option[String]], @@ -61,7 +61,7 @@ case class NativeParquetInsertIntoHiveTableExec( ifPartitionNotExists: Boolean, outputColumnNames: Seq[String], metrics: Map[String, SQLMetric]): InsertIntoHiveTable = { - new BlazeInsertIntoHiveTable351( + new BlazeInsertIntoHiveTable34( table, partition, query, @@ -71,7 +71,9 @@ case class NativeParquetInsertIntoHiveTableExec( metrics) } - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) @@ -82,7 +84,7 @@ case class NativeParquetInsertIntoHiveTableExec( @enableIf( Seq("spark-3.0", "spark-3.1", "spark-3.2", "spark-3.3").contains( System.getProperty("blaze.shim"))) - class BlazeInsertIntoHiveTable303( + class BlazeInsertIntoHiveTable30( table: CatalogTable, partition: Map[String, Option[String]], query: LogicalPlan, @@ -200,8 +202,8 @@ case class NativeParquetInsertIntoHiveTableExec( } } - @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) - class BlazeInsertIntoHiveTable351( + @enableIf(Seq("spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim"))) + class BlazeInsertIntoHiveTable34( table: CatalogTable, partition: Map[String, Option[String]], query: LogicalPlan, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala index aa673db8..e701921d 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala @@ -30,7 +30,9 @@ case class NativeParquetSinkExec( override val metrics: Map[String, SQLMetric]) extends NativeParquetSinkBase(sparkSession, table, partition, child, metrics) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala index c30350db..91b8a857 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala @@ -28,7 +28,9 @@ case class NativePartialTakeOrderedExec( override val metrics: Map[String, SQLMetric]) extends NativePartialTakeOrderedBase(limit, sortOrder, child, metrics) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala index 3943516d..f3b8dd49 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.execution.SparkPlan import com.thoughtworks.enableIf case object NativeProjectExecProvider { - @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim"))) def provide( projectList: Seq[NamedExpression], child: SparkPlan, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala index 853b7668..b57f0072 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.execution.SparkPlan import com.thoughtworks.enableIf case object NativeRenameColumnsExecProvider { - @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim"))) def provide(child: SparkPlan, renamedColumnNames: Seq[String]): NativeRenameColumnsBase = { import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.expressions.SortOrder diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala index 78c8dd48..ba5cf6b7 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala @@ -158,12 +158,14 @@ case class NativeShuffleExchangeExec( outputPartitioning != SinglePartition @enableIf( - Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def shuffleOrigin = org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala index 7e569837..5eff2206 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala @@ -26,7 +26,9 @@ case class NativeSortExec( override val child: SparkPlan) extends NativeSortBase(sortOrder, global, child) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala index 03afc5e3..d4cb0c9d 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala @@ -26,7 +26,9 @@ case class NativeTakeOrderedExec( override val child: SparkPlan) extends NativeTakeOrderedBase(limit, sortOrder, child) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala index be790c55..6a9ce256 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf case class NativeUnionExec(override val children: Seq[SparkPlan]) extends NativeUnionBase(children) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = copy(children = newChildren) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala index 0b2f4a0d..c16223e9 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala @@ -29,7 +29,9 @@ case class NativeWindowExec( override val child: SparkPlan) extends NativeWindowBase(windowExpression, partitionSpec, orderSpec, child) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala index 9d9d1003..15435750 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala @@ -45,7 +45,8 @@ class BlazeBlockStoreShuffleReader[K, C]( override def readBlocks(): Iterator[(BlockId, InputStream)] = { @enableIf( - Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) def fetchIterator = new ShuffleBlockFetcherIterator( context, blockManager.blockStoreClient, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala index 96a6b3f3..74a74913 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala @@ -76,7 +76,7 @@ abstract class BlazeRssShuffleManagerBase(conf: SparkConf) extends ShuffleManage metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] @enableIf( - Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def getReader[K, C]( handle: ShuffleHandle, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala index 06c8fb2c..641980ea 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala @@ -48,7 +48,9 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { sortShuffleManager.registerShuffle(shuffleId, dependency) } - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override def getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, @@ -63,7 +65,8 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { @enableIf(Seq("spark-3.2").contains(System.getProperty("blaze.shim"))) def shuffleMergeFinalized = baseShuffleHandle.dependency.shuffleMergeFinalized - @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.3", "spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim"))) def shuffleMergeFinalized = baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked val (blocksByAddress, canEnableBatchFetch) = diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala index 8ad299f3..ba796786 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala @@ -22,6 +22,8 @@ import com.thoughtworks.enableIf class BlazeShuffleWriter[K, V](metrics: ShuffleWriteMetricsReporter) extends BlazeShuffleWriterBase[K, V](metrics) { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override def getPartitionLengths(): Array[Long] = partitionLengths } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala index 45b6f55f..5e51ae06 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala @@ -48,7 +48,7 @@ case class NativeBroadcastJoinExec( override val condition: Option[Expression] = None @enableIf( - Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def buildSide: org.apache.spark.sql.catalyst.optimizer.BuildSide = broadcastSide match { @@ -63,7 +63,7 @@ case class NativeBroadcastJoinExec( } @enableIf( - Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def requiredChildDistribution = { import org.apache.spark.sql.catalyst.plans.physical.BroadcastDistribution @@ -80,19 +80,19 @@ case class NativeBroadcastJoinExec( } @enableIf( - Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def supportCodegen: Boolean = false @enableIf( - Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def inputRDDs() = { throw new NotImplementedError("NativeBroadcastJoin dose not support codegen") } @enableIf( - Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( System.getProperty("blaze.shim"))) override protected def prepareRelation( ctx: org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext) @@ -100,7 +100,9 @@ case class NativeBroadcastJoinExec( throw new NotImplementedError("NativeBroadcastJoin dose not support codegen") } - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala index 2e04013a..89ed43ea 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala @@ -25,7 +25,9 @@ import com.thoughtworks.enableIf case object NativeShuffledHashJoinExecProvider { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) def provide( left: SparkPlan, right: SparkPlan, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala index 50fbd98b..2494d3ad 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala @@ -24,7 +24,9 @@ import com.thoughtworks.enableIf case object NativeSortMergeJoinExecProvider { - @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) + @enableIf( + Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains( + System.getProperty("blaze.shim"))) def provide( left: SparkPlan, right: SparkPlan,