Skip to content

Commit

Permalink
Use Shims v2 for ShuffledBatchRDD (#3459)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Sep 14, 2021
1 parent 5fcecae commit 126c7d3
Show file tree
Hide file tree
Showing 30 changed files with 494 additions and 583 deletions.
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
<phase>generate-sources</phase>
<configuration>
<sources>
<source>${project.basedir}/src/main/301until310-nondb/scala</source>
<source>${project.basedir}/src/main/301until320-nondb/scala</source>
<source>${project.basedir}/src/main/301until320-all/scala</source>
</sources>
Expand Down Expand Up @@ -136,6 +137,7 @@
<phase>generate-sources</phase>
<configuration>
<sources>
<source>${project.basedir}/src/main/301until310-nondb/scala</source>
<source>${project.basedir}/src/main/301until320-nondb/scala</source>
<source>${project.basedir}/src/main/301until320-all/scala</source>
</sources>
Expand Down Expand Up @@ -179,6 +181,7 @@
<phase>generate-sources</phase>
<configuration>
<sources>
<source>${project.basedir}/src/main/301until310-nondb/scala</source>
<source>${project.basedir}/src/main/301until320-nondb/scala</source>
<source>${project.basedir}/src/main/301until320-all/scala</source>
</sources>
Expand Down Expand Up @@ -217,6 +220,7 @@
<phase>generate-sources</phase>
<configuration>
<sources>
<source>${project.basedir}/src/main/301until310-nondb/scala</source>
<source>${project.basedir}/src/main/301until320-nondb/scala</source>
<source>${project.basedir}/src/main/301until320-all/scala</source>
</sources>
Expand Down Expand Up @@ -257,6 +261,7 @@
<sources>
<source>${project.basedir}/src/main/301until320-nondb/scala</source>
<source>${project.basedir}/src/main/301until320-all/scala</source>
<source>${project.basedir}/src/main/311until320-all/scala</source>
<source>${project.basedir}/src/main/311+-all/scala</source>
<source>${project.basedir}/src/main/311+-apache/scala</source>
</sources>
Expand Down Expand Up @@ -363,6 +368,7 @@
<sources>
<source>${project.basedir}/src/main/311db/scala</source>
<source>${project.basedir}/src/main/301until320-all/scala</source>
<source>${project.basedir}/src/main/311until320-all/scala</source>
<source>${project.basedir}/src/main/311+-all/scala</source>
<source>${project.basedir}/src/main/311+-apache/scala</source>
</sources>
Expand Down Expand Up @@ -402,6 +408,7 @@
<sources>
<source>${project.basedir}/src/main/301until320-nondb/scala</source>
<source>${project.basedir}/src/main/301until320-all/scala</source>
<source>${project.basedir}/src/main/311until320-all/scala</source>
<source>${project.basedir}/src/main/311+-all/scala</source>
<source>${project.basedir}/src/main/311+-apache/scala</source>
</sources>
Expand Down Expand Up @@ -442,6 +449,7 @@
<sources>
<source>${project.basedir}/src/main/301until320-nondb/scala</source>
<source>${project.basedir}/src/main/301until320-all/scala</source>
<source>${project.basedir}/src/main/311until320-all/scala</source>
<source>${project.basedir}/src/main/311+-all/scala</source>
<source>${project.basedir}/src/main/311+-apache/scala</source>
</sources>
Expand Down Expand Up @@ -520,6 +528,7 @@
<source>${project.basedir}/src/main/301until320-nondb/scala</source>
<source>${project.basedir}/src/main/311cdh/scala</source>
<source>${project.basedir}/src/main/301until320-all/scala</source>
<source>${project.basedir}/src/main/311until320-all/scala</source>
<source>${project.basedir}/src/main/311+-all/scala</source>
</sources>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes
import org.apache.spark.sql.execution.python.{AggregateInPandasExec, ArrowEvalPythonExec, FlatMapGroupsInPandasExec, MapInPandasExec, WindowInPandasExec}
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks}
import org.apache.spark.sql.rapids.execution.python.GpuPythonUDF
import org.apache.spark.sql.rapids.execution.python.shims.spark301._
import org.apache.spark.sql.rapids.shims.spark301.{GpuSchemaUtils, ShuffleManagerShim}
import org.apache.spark.sql.rapids.shims.spark301.GpuSchemaUtils
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.{BlockId, BlockManagerId}
Expand Down Expand Up @@ -380,10 +380,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuJoinUtils.getGpuBuildSide(join.buildSide)
}

override def getShuffleManagerShims(): ShuffleManagerShimBase = {
new ShuffleManagerShim
}

override def getPartitionFileNames(
partitions: Seq[PartitionDirectory]): Seq[String] = {
val files = partitions.flatMap(partition => partition.files)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, TrampolineUtil}
import org.apache.spark.sql.rapids.execution.python.{GpuFlatMapGroupsInPandasExecMeta, GpuPythonUDF}
import org.apache.spark.sql.rapids.execution.python.shims.spark301db._
Expand Down Expand Up @@ -403,10 +403,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuJoinUtils.getGpuBuildSide(join.buildSide)
}

override def getShuffleManagerShims(): ShuffleManagerShimBase = {
new ShuffleManagerShim
}

override def getPartitionFileNames(
partitions: Seq[PartitionDirectory]): Seq[String] = {
val files = partitions.flatMap(partition => partition.files)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes
import org.apache.spark.sql.execution.python.{AggregateInPandasExec, ArrowEvalPythonExec, FlatMapGroupsInPandasExec, MapInPandasExec, WindowInPandasExec}
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks}
import org.apache.spark.sql.rapids.execution.python.GpuPythonUDF
import org.apache.spark.sql.rapids.execution.python.shims.spark302._
import org.apache.spark.sql.rapids.shims.spark302.{GpuSchemaUtils, ShuffleManagerShim}
import org.apache.spark.sql.rapids.shims.spark302.GpuSchemaUtils
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.{BlockId, BlockManagerId}
Expand Down Expand Up @@ -380,10 +380,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuJoinUtils.getGpuBuildSide(join.buildSide)
}

override def getShuffleManagerShims(): ShuffleManagerShimBase = {
new ShuffleManagerShim
}

override def getPartitionFileNames(
partitions: Seq[PartitionDirectory]): Seq[String] = {
val files = partitions.flatMap(partition => partition.files)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes
import org.apache.spark.sql.execution.python.{AggregateInPandasExec, ArrowEvalPythonExec, FlatMapGroupsInPandasExec, MapInPandasExec, WindowInPandasExec}
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks}
import org.apache.spark.sql.rapids.execution.python.GpuPythonUDF
import org.apache.spark.sql.rapids.execution.python.shims.spark303._
import org.apache.spark.sql.rapids.shims.spark303.{GpuSchemaUtils, ShuffleManagerShim}
import org.apache.spark.sql.rapids.shims.spark303.GpuSchemaUtils
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.{BlockId, BlockManagerId}
Expand Down Expand Up @@ -380,10 +380,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuJoinUtils.getGpuBuildSide(join.buildSide)
}

override def getShuffleManagerShims(): ShuffleManagerShimBase = {
new ShuffleManagerShim
}

override def getPartitionFileNames(
partitions: Seq[PartitionDirectory]): Seq[String] = {
val files = partitions.flatMap(partition => partition.files)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes
import org.apache.spark.sql.execution.python.{AggregateInPandasExec, ArrowEvalPythonExec, FlatMapGroupsInPandasExec, MapInPandasExec, WindowInPandasExec}
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks}
import org.apache.spark.sql.rapids.execution.python.GpuPythonUDF
import org.apache.spark.sql.rapids.execution.python.shims.spark304._
import org.apache.spark.sql.rapids.shims.spark304.{GpuSchemaUtils, ShuffleManagerShim}
import org.apache.spark.sql.rapids.shims.spark304.GpuSchemaUtils
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.{BlockId, BlockManagerId}
Expand Down Expand Up @@ -380,10 +380,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuJoinUtils.getGpuBuildSide(join.buildSide)
}

override def getShuffleManagerShims(): ShuffleManagerShimBase = {
new ShuffleManagerShim
}

override def getPartitionFileNames(
partitions: Seq[PartitionDirectory]): Seq[String] = {
val files = partitions.flatMap(partition => partition.files)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -550,10 +550,6 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuJoinUtils.getGpuBuildSide(join.buildSide)
}

override def getShuffleManagerShims(): ShuffleManagerShimBase = {
new ShuffleManagerShim
}

override def getPartitionFileNames(
partitions: Seq[PartitionDirectory]): Seq[String] = {
val files = partitions.flatMap(partition => partition.files)
Expand Down
Loading

0 comments on commit 126c7d3

Please sign in to comment.