diff --git a/docs/configs.md b/docs/configs.md index 06614aa4bb4..3db47bdfbc4 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -79,6 +79,7 @@ Name | Description | Default Value spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE +spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GatherUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GatherUtils.scala new file mode 100644 index 00000000000..7f117d7729c --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GatherUtils.scala @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids + +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf.ColumnVector + +import org.apache.spark.sql.vectorized.ColumnarBatch + +object GatherUtils extends Arm { + def gather(cb: ColumnarBatch, rows: ArrayBuffer[Int]): ColumnarBatch = { + val colTypes = GpuColumnVector.extractTypes(cb) + if (rows.isEmpty) { + GpuColumnVector.emptyBatchFromTypes(colTypes) + } else if (cb.numCols() == 0) { + // for count agg, num of cols is 0 + val c = GpuColumnVector.emptyBatchFromTypes(colTypes) + c.setNumRows(rows.length) + c + } else { + withResource(ColumnVector.fromInts(rows: _*)) { gatherCv => + withResource(GpuColumnVector.from(cb)) { table => + // GPU gather + withResource(table.gather(gatherCv)) { gatheredTable => + GpuColumnVector.from(gatheredTable, colTypes) + } + } + } + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 169840f4b6e..60802477213 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1319,6 +1319,12 @@ object RapidsConf { .booleanConf .createWithDefault(value = false) + val ENABLE_FAST_SAMPLE = conf("spark.rapids.sql.fast.sample") + .doc("Option to turn on fast sample. If enable it is inconsistent with CPU sample " + + "because of GPU sample algorithm is inconsistent with CPU.") + .booleanConf + .createWithDefault(value = false) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -1733,6 +1739,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCpuBasedUDFEnabled: Boolean = get(ENABLE_CPU_BASED_UDF) + lazy val isFastSampleEnabled: Boolean = get(ENABLE_FAST_SAMPLE) + private val optimizerDefaults = Map( // this is not accurate because CPU projections do have a cost due to appending values // to each row that is produced, but this needs to be a really small number because diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index a882a19dde1..73b719a8bec 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf import ai.rapids.cudf._ @@ -365,19 +366,32 @@ case class GpuFilterExec( } } -class GpuSampleExecMeta(sample: SampleExec, conf: RapidsConf, p: Option[RapidsMeta[_, _, _]], +class GpuSampleExecMeta( + sample: SampleExec, + conf: RapidsConf, + p: Option[RapidsMeta[_, _, _]], r: DataFromReplacementRule) extends SparkPlanMeta[SampleExec](sample, conf, p, r) - with Logging { + with Logging { override def convertToGpu(): GpuExec = { val gpuChild = childPlans.head.convertIfNeeded() - GpuSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement, - sample.seed, gpuChild) + if (conf.isFastSampleEnabled) { + // Use GPU sample JNI, this is faster, but the output is not the same as CPU produces + GpuFastSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement, + sample.seed, gpuChild) + } else { + // The output is the same as CPU produces + // First generates row indexes by CPU sampler, then use GPU to gathers + GpuSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement, + sample.seed, gpuChild) + } } } -case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement: Boolean, - seed: Long, child: SparkPlan) - extends ShimUnaryExecNode with GpuExec { +case class GpuSampleExec( + lowerBound: Double, + upperBound: Double, + withReplacement: Boolean, + seed: Long, child: SparkPlan) extends ShimUnaryExecNode with GpuExec { override lazy val additionalMetrics: Map[String, GpuMetric] = Map( OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME)) @@ -404,7 +418,9 @@ case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) val opTime = gpuLongMetric(OP_TIME) + val rdd = child.executeColumnar() + // CPU consistent, first generates sample row indexes by CPU, then gathers by GPU if (withReplacement) { new GpuPartitionwiseSampledRDD( rdd, @@ -415,46 +431,106 @@ case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement } else { rdd.mapPartitionsWithIndex( (index, iterator) => { - // use CPU sampler generate filter + // use CPU sampler generate row indexes val sampler = new BernoulliCellSampler(lowerBound, upperBound) sampler.setSeed(seed + index) - iterator.map[ColumnarBatch] { batch => - numOutputBatches += 1 - withResource(batch) { b => // will generate new columnar column, close this - val numRows = b.numRows() - val filter = withResource(HostColumnVector.builder(DType.BOOL8, numRows)) { - builder => - (0 until numRows).foreach { _ => - val n = sampler.sample() - if (n > 0) { - builder.append(1.toByte) - numOutputRows += 1 - } else { - builder.append(0.toByte) - } + iterator.map[ColumnarBatch] { columnarBatch => + // collect sampled row idx + // samples idx in batch one by one, so it's same as CPU execution + withResource(new NvtxWithMetrics("Sample Exec", NvtxColor.YELLOW, opTime)) { _ => + withResource(columnarBatch) { cb => + // generate sampled row indexes by CPU + val sampledRows = new ArrayBuffer[Int] + var rowIndex = 0 + while (rowIndex < cb.numRows()) { + if (sampler.sample() > 0) { + sampledRows += rowIndex } - builder.buildAndPutOnDevice() + rowIndex += 1 + } + numOutputBatches += 1 + numOutputRows += sampledRows.length + // gather by row indexes + GatherUtils.gather(cb, sampledRows) } + } + } + } + , preservesPartitioning = true + ) + } + } +} - // use GPU filer rows - val colTypes = GpuColumnVector.extractTypes(b) - withResource(filter) { filter => - withResource(GpuColumnVector.from(b)) { tbl => - withResource(tbl.filter(filter)) { filteredData => - if (filteredData.getRowCount == 0) { - GpuColumnVector.emptyBatchFromTypes(colTypes) - } else { - GpuColumnVector.from(filteredData, colTypes) - } +case class GpuFastSampleExec( + lowerBound: Double, + upperBound: Double, + withReplacement: Boolean, + seed: Long, + child: SparkPlan) extends ShimUnaryExecNode with GpuExec { + + override lazy val additionalMetrics: Map[String, GpuMetric] = Map( + OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME)) + + override def output: Seq[Attribute] = { + child.output + } + + // add one coalesce exec to avoid empty batch and small batch, + // because sample will shrink the batch + override val coalesceAfter: Boolean = true + + // Note GPU sample does not preserve the ordering + override def outputOrdering: Seq[SortOrder] = Nil + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def doExecute(): RDD[InternalRow] = + throw new IllegalStateException(s"Row-based execution should not occur for $this") + + override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL + override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) + val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) + val opTime = gpuLongMetric(OP_TIME) + val rdd = child.executeColumnar() + + // CPU inconsistent, uses GPU sample JNI + rdd.mapPartitionsWithIndex( + (index, iterator) => { + iterator.map[ColumnarBatch] { columnarBatch => + withResource(new NvtxWithMetrics("Fast Sample Exec", NvtxColor.YELLOW, opTime)) { _ => + withResource(columnarBatch) { cb => + numOutputBatches += 1 + val numSampleRows = (cb.numRows() * (upperBound - lowerBound)).toLong + + val colTypes = GpuColumnVector.extractTypes(cb) + if (numSampleRows == 0L) { + GpuColumnVector.emptyBatchFromTypes(colTypes) + } else if (cb.numCols() == 0) { + // for count agg, num of cols is 0 + val c = GpuColumnVector.emptyBatchFromTypes(colTypes) + c.setNumRows(numSampleRows.toInt) + c + } else { + withResource(GpuColumnVector.from(cb)) { table => + // GPU sample + withResource(table.sample(numSampleRows, withReplacement, seed + index)) { + sampled => + val cb = GpuColumnVector.from(sampled, colTypes) + numOutputRows += cb.numRows() + cb } } } } } } - ,preservesPartitioning = true - ) - } + } + , preservesPartitioning = true + ) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuPoissonSampler.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuPoissonSampler.scala index 24d543ac169..24003addfef 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuPoissonSampler.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuPoissonSampler.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.rapids import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{DeviceMemoryBuffer, DType, GatherMap, HostMemoryBuffer, NvtxColor} -import com.nvidia.spark.rapids.{Arm, GpuColumnVector, GpuMetric, NvtxWithMetrics} +import ai.rapids.cudf.NvtxColor +import com.nvidia.spark.rapids.{Arm, GatherUtils, GpuMetric, NvtxWithMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.random.PoissonSampler @@ -37,52 +37,35 @@ class GpuPoissonSampler(fraction: Double, useGapSamplingIfPossible: Boolean, } else { batchIterator.map { columnarBatch => withResource(new NvtxWithMetrics("Sample Exec", NvtxColor.YELLOW, opTime)) { _ => - numOutputBatches += 1 withResource(columnarBatch) { cb => // collect sampled row idx // samples idx in batch one by one, so it's same with CPU version val sampledRows = sample(cb.numRows()) - val intBytes = DType.INT32.getSizeInBytes() - val totalBytes = sampledRows.length * intBytes - withResource(HostMemoryBuffer.allocate(totalBytes)) { hostBuffer => - // copy row idx to host buffer - for (idx <- 0 until sampledRows.length) { - hostBuffer.setInt(idx * intBytes, sampledRows(idx)) - } - - // generate gather map and send to GPU to gather - withResource(DeviceMemoryBuffer.allocate(totalBytes)) { deviceBuffer => - deviceBuffer.copyFromHostBuffer(0, hostBuffer, 0, totalBytes) - withResource(new GatherMap(deviceBuffer).toColumnView(0, sampledRows.length)) { - gatherCv => - val colTypes = GpuColumnVector.extractTypes(cb) - withResource(GpuColumnVector.from(cb)) { table => - withResource(table.gather(gatherCv)) { gatheredTable => - GpuColumnVector.from(gatheredTable, colTypes) - } - } - } - } - } + numOutputBatches += 1 + numOutputRows += sampledRows.length + GatherUtils.gather(cb, sampledRows) } } } } } - // collect the sampled row idx + // collect the sampled row indexes, Note one row can be sampled multiple times private def sample(numRows: Int): ArrayBuffer[Int] = { val buf = new ArrayBuffer[Int] - for (rowIdx <- 0 until numRows) { + var rowIdx = 0 + while (rowIdx < numRows) { // invoke PoissonSampler sample val rowCount = super.sample() if (rowCount > 0) { - numOutputRows += rowCount - for (_ <- 0 until rowCount) { + var i = 0 + while (i < rowCount) { buf += rowIdx + i = i + 1 } } + rowIdx += 1 } buf }