Skip to content

Commit

Permalink
Unshim many SparkShim interfaces [databricks] (#5031)
Browse files Browse the repository at this point in the history
* Unshim Arrow

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim Alluxio

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim Kryo

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim getGpuColumnarToRowTransition

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim createTable

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim hasSeparateINT96RebaseConf

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim getScalaUDFAsExpression

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim getMapSizesByExecutorId

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim getFileSourceMaxMetadataValueLength

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim getGpuShuffleExchangeExec

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim SortOrder

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Remove HadoopFSUtils shim

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim getLegacyComplexTypeToString

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim shouldFailDivByZero

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Unshim shouldFailOnElementNotExists

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Remove registerKryo from Spark31XdbShims

* Unshim shouldFallbackOnAnsiTimestamp
  • Loading branch information
jlowe authored Mar 24, 2022
1 parent 384f0d5 commit 0d7750d
Show file tree
Hide file tree
Showing 39 changed files with 244 additions and 915 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids.execution.python.shims

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import com.nvidia.spark.rapids.shims.{ShimUnaryExecNode, SparkShimImpl}
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode

import org.apache.spark.TaskContext
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
Expand Down Expand Up @@ -96,7 +96,7 @@ case class GpuFlatMapGroupsInPandasExec(
}

override def requiredChildOrdering: Seq[Seq[SortOrder]] =
Seq(groupingAttributes.map(SparkShimImpl.sortOrder(_, Ascending)))
Seq(groupingAttributes.map(SortOrder(_, Ascending)))

private val pandasFunction = func.asInstanceOf[GpuPythonUDF].func

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.apache.spark.rapids.shims

import com.nvidia.spark.rapids.shims.SparkShimImpl

import org.apache.spark.{MapOutputTrackerMaster, Partition, ShuffleDependency, SparkEnv, TaskContext}
import org.apache.spark.shuffle.ShuffleReader
import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialMapperPartitionSpec, PartialReducerPartitionSpec}
Expand Down Expand Up @@ -57,7 +55,6 @@ object ShuffledBatchRDDUtil {
dependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch],
sqlMetricsReporter: SQLShuffleReadMetricsReporter):
(ShuffleReader[Nothing, Nothing], Long) = {
val shim = SparkShimImpl
split.asInstanceOf[ShuffledBatchRDDPartition].spec match {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
val reader = SparkEnv.get.shuffleManager.getReader(
Expand All @@ -66,7 +63,7 @@ object ShuffledBatchRDDUtil {
endReducerIndex,
context,
sqlMetricsReporter)
val blocksByAddress = shim.getMapSizesByExecutorId(
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
dependency.shuffleHandle.shuffleId, 0, Int.MaxValue, startReducerIndex, endReducerIndex)
val partitionSize = blocksByAddress.flatMap(_._2).map(_._2).sum
(reader, partitionSize)
Expand All @@ -80,7 +77,7 @@ object ShuffledBatchRDDUtil {
reducerIndex + 1,
context,
sqlMetricsReporter)
val blocksByAddress = shim.getMapSizesByExecutorId(
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
dependency.shuffleHandle.shuffleId, 0, Int.MaxValue, reducerIndex,
reducerIndex + 1)
val partitionSize = blocksByAddress.flatMap(_._2)
Expand All @@ -96,7 +93,7 @@ object ShuffledBatchRDDUtil {
endReducerIndex,
context,
sqlMetricsReporter)
val blocksByAddress = shim.getMapSizesByExecutorId(
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
dependency.shuffleHandle.shuffleId, 0, Int.MaxValue, startReducerIndex, endReducerIndex)
val partitionSize = blocksByAddress.flatMap(_._2)
.filter(_._3 == mapIndex)
Expand Down
Loading

0 comments on commit 0d7750d

Please sign in to comment.