Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fall back to the CPU for non-zero scale on Ceil or Floor functions [databricks] #4971

Merged
merged 9 commits into from
Mar 22, 2022
23 changes: 22 additions & 1 deletion integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from marks import ignore_order, incompat, approximate_float, allow_non_gpu
from pyspark.sql.types import *
from pyspark.sql.types import IntegralType
from spark_session import with_cpu_session, with_gpu_session, with_spark_session, is_before_spark_320, is_databricks91_or_later
from spark_session import with_cpu_session, with_gpu_session, with_spark_session, is_before_spark_320, is_before_spark_330, is_databricks91_or_later
import pyspark.sql.functions as f

# No overflow gens here because we just focus on verifying the fallback to CPU when
Expand Down Expand Up @@ -351,11 +351,32 @@ def test_floor(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr('floor(a)'))

@pytest.mark.skipif(is_before_spark_330(), reason='scale parameter in Floor function is not supported before Spark 3.3.0')
@pytest.mark.parametrize('data_gen', double_n_long_gens + _arith_decimal_gens_no_neg_scale, ids=idfn)
def test_floor_scale_zero(data_gen):
revans2 marked this conversation as resolved.
Show resolved Hide resolved
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr('floor(a, 0)'),
conf={'spark.rapids.sql.castFloatToDecimal.enabled':'true'})

@pytest.mark.skipif(is_before_spark_330(), reason='scale parameter in Floor function is not supported before Spark 3.3.0')
@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', double_n_long_gens + _arith_decimal_gens_no_neg_scale, ids=idfn)
def test_floor_scale_nonzero(data_gen):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr('floor(a, -1)'), 'RoundFloor')

@pytest.mark.parametrize('data_gen', double_n_long_gens + _arith_decimal_gens_no_neg_scale, ids=idfn)
def test_ceil(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr('ceil(a)'))

@pytest.mark.skipif(is_before_spark_330(), reason='scale parameter in Ceil function is not supported before Spark 3.3.0')
@pytest.mark.parametrize('data_gen', double_n_long_gens + _arith_decimal_gens_no_neg_scale, ids=idfn)
def test_ceil_scale_zero(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr('ceil(a, 0)'),
conf={'spark.rapids.sql.castFloatToDecimal.enabled':'true'})

@pytest.mark.parametrize('data_gen', [_decimal_gen_36_neg5, _decimal_gen_38_neg10], ids=idfn)
def test_floor_ceil_overflow(data_gen):
assert_gpu_and_cpu_error(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.shims

import org.apache.spark.sql.rapids.GpuFloorCeil
import org.apache.spark.sql.types.{DataType, DecimalType, LongType}

object RapidsFloorCeilUtils {

def outputDataType(dataType: DataType): DataType = {
dataType match {
case dt: DecimalType =>
DecimalType.bounded(GpuFloorCeil.unboundedOutputPrecision(dt), 0)
case _ => LongType
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.shims

import ai.rapids.cudf.DType
import com.nvidia.spark.InMemoryTableScanMeta
import com.nvidia.spark.rapids._
import org.apache.parquet.schema.MessageType
Expand All @@ -35,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids._
import org.apache.spark.sql.rapids.execution.GpuShuffleMeta
import org.apache.spark.sql.rapids.shims.GpuTimeAdd
import org.apache.spark.sql.types.{CalendarIntervalType, DayTimeIntervalType, StructType}
import org.apache.spark.sql.types.{CalendarIntervalType, DayTimeIntervalType, DecimalType, StructType}
import org.apache.spark.unsafe.types.CalendarInterval

trait Spark33XShims extends Spark321PlusShims with Spark320PlusNonDBShims {
Expand Down Expand Up @@ -135,6 +136,68 @@ trait Spark33XShims extends Spark321PlusShims with Spark320PlusNonDBShims {
}
}
}),
GpuOverrides.expr[RoundCeil](
"Computes the ceiling of the given expression to d decimal places",
ExprChecks.binaryProject(
TypeSig.gpuNumeric, TypeSig.cpuNumeric,
("value", TypeSig.gpuNumeric +
TypeSig.psNote(TypeEnum.FLOAT, "result may round slightly differently") +
TypeSig.psNote(TypeEnum.DOUBLE, "result may round slightly differently"),
TypeSig.cpuNumeric),
("scale", TypeSig.lit(TypeEnum.INT), TypeSig.lit(TypeEnum.INT))),
(ceil, conf, p, r) => new BinaryExprMeta[RoundCeil](ceil, conf, p, r) {
override def tagExprForGpu(): Unit = {
ceil.child.dataType match {
case dt: DecimalType =>
val precision = GpuFloorCeil.unboundedOutputPrecision(dt)
if (precision > DType.DECIMAL128_MAX_PRECISION) {
willNotWorkOnGpu(s"output precision $precision would require overflow " +
s"checks, which are not supported yet")
}
case _ => // NOOP
}
GpuOverrides.extractLit(ceil.scale).foreach { scale =>
if (scale.value != null &&
scale.value.asInstanceOf[Integer] != 0) {
willNotWorkOnGpu("Scale other than 0 is not supported")
}
}
}

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuCeil(lhs)
}),
GpuOverrides.expr[RoundFloor](
"Computes the floor of the given expression to d decimal places",
ExprChecks.binaryProject(
TypeSig.gpuNumeric, TypeSig.cpuNumeric,
("value", TypeSig.gpuNumeric +
TypeSig.psNote(TypeEnum.FLOAT, "result may round slightly differently") +
TypeSig.psNote(TypeEnum.DOUBLE, "result may round slightly differently"),
TypeSig.cpuNumeric),
("scale", TypeSig.lit(TypeEnum.INT), TypeSig.lit(TypeEnum.INT))),
(floor, conf, p, r) => new BinaryExprMeta[RoundFloor](floor, conf, p, r) {
override def tagExprForGpu(): Unit = {
floor.child.dataType match {
case dt: DecimalType =>
val precision = GpuFloorCeil.unboundedOutputPrecision(dt)
if (precision > DType.DECIMAL128_MAX_PRECISION) {
willNotWorkOnGpu(s"output precision $precision would require overflow " +
s"checks, which are not supported yet")
}
case _ => // NOOP
}
GpuOverrides.extractLit(floor.scale).foreach { scale =>
if (scale.value != null &&
scale.value.asInstanceOf[Integer] != 0) {
willNotWorkOnGpu("Scale other than 0 is not supported")
}
}
}

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuFloor(lhs)
}),
GpuOverrides.expr[TimeAdd](
"Adds interval to timestamp",
ExprChecks.binaryProject(TypeSig.TIMESTAMP, TypeSig.TIMESTAMP,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.rapids.shims

import org.apache.spark.sql.rapids.GpuFloorCeil
import org.apache.spark.sql.types.{DataType, DecimalType, LongType}

object RapidsFloorCeilUtils {

def outputDataType(dataType: DataType): DataType = {
dataType match {
// For Ceil/Floor function we calculate the precision by calling unboundedOutputPrecision and
// for RoundCeil/RoundFloor we take the precision as it is. Here the actual precision is
// calculated by taking the max of these 2 to make sure we don't overflow while
// creating the DecimalType.
case dt: DecimalType =>
val maxPrecision = math.max(GpuFloorCeil.unboundedOutputPrecision(dt), dt.precision)
DecimalType.bounded(maxPrecision, 0)
case _ => LongType
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression

import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, ImplicitCastInputTypes}
import org.apache.spark.sql.rapids.shims.RapidsFloorCeilUtils
import org.apache.spark.sql.types._

abstract class CudfUnaryMathExpression(name: String) extends GpuUnaryMathExpression(name)
Expand Down Expand Up @@ -166,11 +167,7 @@ object GpuFloorCeil {
}

case class GpuCeil(child: Expression) extends CudfUnaryMathExpression("CEIL") {
override def dataType: DataType = child.dataType match {
case dt: DecimalType =>
DecimalType.bounded(GpuFloorCeil.unboundedOutputPrecision(dt), 0)
case _ => LongType
}
override def dataType: DataType = RapidsFloorCeilUtils.outputDataType(child.dataType)

override def hasSideEffects: Boolean = true

Expand Down Expand Up @@ -242,11 +239,7 @@ case class GpuExpm1(child: Expression) extends CudfUnaryMathExpression("EXPM1")
}

case class GpuFloor(child: Expression) extends CudfUnaryMathExpression("FLOOR") {
override def dataType: DataType = child.dataType match {
case dt: DecimalType =>
DecimalType.bounded(GpuFloorCeil.unboundedOutputPrecision(dt), 0)
case _ => LongType
}
override def dataType: DataType = RapidsFloorCeilUtils.outputDataType(child.dataType)

override def hasSideEffects: Boolean = true

Expand Down