Skip to content

Commit

Permalink
Fallback to CPU when datasource v2 enables RuntimeFiltering (#3456)
Browse files Browse the repository at this point in the history
* Fallback to CPU when datasource v2 enables DPP

Signed-off-by: Bobby Wang <wbo4958@gmail.com>
  • Loading branch information
wbo4958 authored Sep 15, 2021
1 parent 4ae2aea commit 492c15f
Showing 1 changed file with 62 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.CommandResult
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering}
import org.apache.spark.sql.execution.{CommandResultExec, FileSourceScanExec, PartitionedFileUtil, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.{FileIndex, FilePartition, FileScanRDD, HadoopFsRelation, InMemoryFileIndex, PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.rapids.GpuPartitioningUtils
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ReusedExchangeExec, ShuffleExchangeExec}
Expand Down Expand Up @@ -489,15 +491,41 @@ class Spark320Shims extends Spark32XShims {
" the Java process and the Python process. It also supports scheduling GPU resources" +
" for the Python process when enabled.",
ExecChecks(TypeSig.commonCudfTypes, TypeSig.all),
(aggPy, conf, p, r) => new GpuAggregateInPandasExecMeta(aggPy, conf, p, r))
(aggPy, conf, p, r) => new GpuAggregateInPandasExecMeta(aggPy, conf, p, r)),
GpuOverrides.exec[BatchScanExec](
"The backend for most file input",
ExecChecks(
(TypeSig.commonCudfTypes + TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY +
TypeSig.DECIMAL_64).nested(),
TypeSig.all),
(p, conf, parent, r) => new SparkPlanMeta[BatchScanExec](p, conf, parent, r) {
override val childScans: scala.Seq[ScanMeta[_]] =
Seq(GpuOverrides.wrapScan(p.scan, conf, Some(this)))

override def tagPlanForGpu(): Unit = {
if (!p.runtimeFilters.isEmpty) {
willNotWorkOnGpu("Runtime filtering (DPP) on datasource V2 is not supported")
}
}

override def convertToGpu(): GpuExec =
GpuBatchScanExec(p.output, childScans.head.convertToGpu())
})
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}

override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq(
GpuOverrides.scan[ParquetScan](
"Parquet parsing",
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)
override def tagSelfForGpu(): Unit = {
GpuParquetScanBase.tagSupport(this)
// we are being overly cautious and that Parquet does not support this yet
if (a.isInstanceOf[SupportsRuntimeFiltering]) {
willNotWorkOnGpu("Parquet does not support Runtime filtering (DPP)" +
" on datasource V2 yet.")
}
}

override def convertToGpu(): Scan = {
GpuParquetScan(a.sparkSession,
Expand All @@ -516,8 +544,14 @@ class Spark320Shims extends Spark32XShims {
GpuOverrides.scan[OrcScan](
"ORC parsing",
(a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit =
override def tagSelfForGpu(): Unit = {
GpuOrcScanBase.tagSupport(this)
// we are being overly cautious and that Orc does not support this yet
if (a.isInstanceOf[SupportsRuntimeFiltering]) {
willNotWorkOnGpu("Orc does not support Runtime filtering (DPP)" +
" on datasource V2 yet.")
}
}

override def convertToGpu(): Scan =
GpuOrcScan(a.sparkSession,
Expand All @@ -531,6 +565,30 @@ class Spark320Shims extends Spark32XShims {
a.partitionFilters,
a.dataFilters,
conf)
}),
GpuOverrides.scan[CSVScan](
"CSV parsing",
(a, conf, p, r) => new ScanMeta[CSVScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = {
GpuCSVScan.tagSupport(this)
// we are being overly cautious and that Csv does not support this yet
if (a.isInstanceOf[SupportsRuntimeFiltering]) {
willNotWorkOnGpu("Csv does not support Runtime filtering (DPP)" +
" on datasource V2 yet.")
}
}

override def convertToGpu(): Scan =
GpuCSVScan(a.sparkSession,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.partitionFilters,
a.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes)
})
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

Expand Down

0 comments on commit 492c15f

Please sign in to comment.