Skip to content

Commit

Permalink
Merge pull request #4160 from revans2/branch-22.02-upmerge
Browse files Browse the repository at this point in the history
Fix merge issues for branch 22.02
  • Loading branch information
revans2 authored Nov 19, 2021
2 parents 94d6e36 + ec0ea88 commit 5a6354d
Show file tree
Hide file tree
Showing 89 changed files with 3,514 additions and 1,847 deletions.
210 changes: 210 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,216 @@ after Spark 3.1.0.
We do not disable operations that produce different results due to `-0.0` in the data because it is
considered to be a rare occurrence.

## Decimal Support

Apache Spark supports decimal values with a precision up to 38. This equates to 128-bits.
However, when actually processing the data, in most cases, it is temporarily converted to
Java's `BigDecimal` type which allows for effectively unlimited precision. This lets Spark do
complicated calculations without the risk of missing an overflow and causing data corruption.
It also lets Spark support some operations that require intermediate values that are larger than
a 128-bit representation can support.

The RAPIDS Accelerator currently is limited to a maximum of 128-bits for storing or processing
decimal values. This allows us to fully support the majority of decimal operations. But there are
a few operations that we cannot support to the same degree as Spark can on the CPU.

### Decimal Sum Aggregation

When Apache Spark does a sum aggregation on decimal values it will store the result in a value
with a precision that is the input precision + 10, but with a maximum precision of 38. The table
below shows the number of rows/values in an aggregation before an overflow is possible,
and the number of rows/values in the aggregation before an overflow might not be detected.
The numbers are for Spark 3.1.0 and above after a number of fixes were put in place, please see
[SPARK-28067](https://issues.apache.org/jira/browse/SPARK-28067) and
[SPARK-32018](https://issues.apache.org/jira/browse/SPARK-32018) for more information.
Please also note that these are for the worst case situations, meaning all the values in the sum
were either the largest or smallest values possible to be stored in the input type. In the common
case, where the numbers are smaller, or vary between positive and negative values, many more
rows/values can be processed without any issues.

|Input Precision|Number of values before overflow is possible|Maximum number of values for guaranteed overflow detection (Spark CPU)|Maximum number of values for guaranteed overflow detection (RAPIDS GPU)|
|---------------|------------------------------|------------|-------------|
|1 |11,111,111,111 |2,049,638,219,301,061,290 |Same as CPU |
|2 |10,101,010,101 |186,330,738,118,278,299 |Same as CPU |
|3 |10,010,010,010 |18,465,199,272,982,534 |Same as CPU |
|4 |10,001,000,100 |1,844,848,892,260,181 |Same as CPU |
|5 |10,000,100,001 |184,459,285,329,948 |Same as CPU |
|6 |10,000,010,000 |18,436,762,510,472 |Same as CPU |
|7 |10,000,001,000 |1,834,674,590,838 |Same as CPU |
|8 |10,000,000,100 |174,467,442,481 |Same as CPU |
|9 |10,000,000,010 |Unlimited |Unlimited |
|10 - 19 |10,000,000,000 |Unlimited |Unlimited |
|20 |10,000,000,000 |Unlimited |3,402,823,659,209,384,634 |
|21 |10,000,000,000 |Unlimited |340,282,356,920,938,463 |
|22 |10,000,000,000 |Unlimited |34,028,226,692,093,846 |
|23 |10,000,000,000 |Unlimited |3,402,813,669,209,384 |
|24 |10,000,000,000 |Unlimited |340,272,366,920,938 |
|25 |10,000,000,000 |Unlimited |34,018,236,692,093 |
|26 |10,000,000,000 |Unlimited |3,392,823,669,209 |
|27 |10,000,000,000 |Unlimited |330,282,366,920 |
|28 |10,000,000,000 |Unlimited |24,028,236,692 |
|29 |1,000,000,000 |Unlimited |Falls back to CPU |
|30 |100,000,000 |Unlimited |Falls back to CPU |
|31 |10,000,000 |Unlimited |Falls back to CPU |
|32 |1,000,000 |Unlimited |Falls back to CPU |
|33 |100,000 |Unlimited |Falls back to CPU |
|34 |10,00 |Unlimited |Falls back to CPU |
|35 |1,000 |Unlimited |Falls back to CPU |
|36 |100 |Unlimited |Falls back to CPU |
|37 |10 |Unlimited |Falls back to CPU |
|38 |1 |Unlimited |Falls back to CPU |

For an input precision of 9 and above, Spark will do the aggregations as a `BigDecimal`
value which is slow, but guarantees that any overflow can be detected. For inputs with a
precision of 8 or below Spark will internally do the calculations as a long value, 64-bits.
When the precision is 8, you would need at least 174-billion values/rows contributing to a
single aggregation result, and even then all the values would need to be either the largest
or the smallest value possible to be stored in the type before the overflow is no longer detected.

For the RAPIDS Accelerator we only have access to at most a 128-bit value to store the results
in and still detect overflow. Because of this we cannot guarantee overflow detection in all
cases. In some cases we can guarantee unlimited overflow detection because of the maximum number of
values that RAPIDS will aggregate in a single batch. But even in the worst cast for a decimal value
with a precision of 28 the user would still have to aggregate so many values that it overflows 2.4
times over before we are no longer able to detect it.

### Decimal Average

Average is effectively doing a `sum(input)/count(input)`, except the scale of the output type is
the scale of the input + 4. As such it inherits some of the same issues that both sum and divide
have. It also inherits some issues from Spark itself. See
https://issues.apache.org/jira/browse/SPARK-37024 for a detailed description of some issues
with average in Spark.

In order to be able to guarantee overflow detection on the sum with at least 100-billion values
and to be able to guarantee doing the divide with half up rounding at the end we only support
average on input values with a precision of 23 or below. This is 38 - 10 for the sum guarantees
and then 5 less to be able to shift the left-hand side of the divide enough to get a correct
answer that can be rounded to the result that Spark would produce.

### Divide and Multiply

Division and multiplication of decimal types is a little complicated in Apache Spark. For
some arbitrary reason divide and multiply in Spark require that the precision and scale of
the left-hand side and the right-hand side match. As such when planning a divide or multiply
Spark will look at the original inputs to calculate the output precision and scale. Then it will
cast the inputs to a common wider value where the scale is the max of the two input scales,
and the precision is max of the two input non-scale portions (precision - scale) + the new
scale. Then it will do the divide or multiply as a `BigDecimal` value, and return the result as
a `BigDecimal` but lie about the precision and scale of the return type. Finally, Spark will
insert a `CheckOverflow` expression that will round the scale of the BigDecimal value to that
of the desired output type and check that the final precision will fit in the precision of the
desired output type.

In order to match exactly with what Spark is doing the RAPIDS Accelerator would need at least
256-bit decimal values. We might implement that at some point, but until then we try to cover as
much of division and multiplication as possible.

To combat this we look at the query plan and try to determine what is the smallest precision
and scale for each parameter that would let us still produce the exact same answer as Apache
Spark. We effectively try to undo what Spark did when widening the types to make them common.

#### Division

In Spark the output of a division operation is

```scala
val precision = p1 - s1 + s2 + max(6, s1 + p2 + 1)
val scale = max(6, s1 + p2 + 1)
```

Where `p1` and `s1` are the precision and scale of the left-hand side of the operation and
`p2` and `s2` are the precision and scale of the right-hand side of the operation. But decimal
divide inherently produces a result where the output scale is `s1 - s2`. In addition to this
Spark will round the result to the given scale, and not just truncate it. This means that to
produce the same result as Apache Spark we have to increase the scale of the left-hand side
operation to be at least `output_scale + s2 + 1`. The `+ 1` is so the output is large enough that
we can round it to the desired result. If this causes the precision of the left-hand side
to go above 38, the maximum precision that 128-bits can hold, then we have to fall back to the
CPU. Unfortunately the math is a bit complicated so there is no simple rule of thumb for this.

#### Multiplication

In Spark the output of a multiplication operation is

```scala
val precision = p1 + p2 + 1
val scale = s1 + s2
```

Where `p1` and `s1` are the precision and scale of the left-hand side of the operation and
`p2` and `s2` are the precision and scale of the right-hand side of the operation. Fortunately,
decimal multiply inherently produces the same scale, but Spark will round the result. As such,
the RAPIDS Accelerator must add an extra decimal place to the scale and the precision, so we can
round correctly. This means that if `p1 + p2 > 36` we will fall back to the CPU to do processing.

### How to get more decimal operations on the GPU?

Spark is very conservative in calculating the output types for decimal operations. It does this
to avoid overflow in the worst case scenario, but generally will end up using a much larger type
than is needed to store the final result. This means that over the course of a large query the
precision and scale can grow to a size that would force the RAPIDS Accelerator to fall back
to the CPU out of an abundance of caution. If you find yourself in this situation you can often
cast the results to something smaller and still get the same answer. These casts should be done
with some knowledge about the data being processed.

For example if we had a query like

```sql
SELECT SUM(cs_wholesale_cost * cs_quantity)/
SUM(cs_sales_price * cs_quantity) cost_to_sale
FROM catalog_sales
GROUP BY cs_sold_date_sk
ORDER BY cs_sold_date_sk
```

where `cs_wholesale_cost` and `cs_sale_price` are both decimal values with a precision of 7
and a scale of 2, `Decimal(7, 2)`, and `cs_quantity` is a 32-bit integer. Only the first half
of the query will be on the GPU. The following explanation is a bit complicated but tries to
break down the processing into the distinct steps that Spark takes.

1. Multiplying a `Decimal(7, 2)` by an integer produces a `Decimal(18, 2)` value. This is the
same for both multiply operations in the query.
2. The `sum` operation on the resulting `Decimal(18, 2)` column produces a `Decimal(28, 2)`.
This also is the same for both sum aggregations in the query.
3. The final divide operation is dividing a `Decimal(28, 2)` by another `Decimal(28, 2)` and
produces a `Decimal(38, 10)`.

We cannot guarantee that on the GPU the divide will produce the exact same result as
the CPU for all possible inputs. But we know that we have at most 1,000,000 line items
for each `cs_sold_date_sk`, and the average price/cost is no where close to the maximum
value that `Decimal(7, 2)` can hold. So we can cast the result of the sums to a more
reasonable `Decimal(14, 2)` and still produce an equivalent result, but totally on the GPU.

```sql
SELECT CAST(SUM(cs_wholesale_cost * cs_quantity) AS Decimal(14,2))/
CAST(SUM(cs_sales_price * cs_quantity) AS Decimal(14,2)) cost_to_sale
FROM catalog_sales
GROUP BY cs_sold_date_sk
ORDER BY cs_sold_date_sk
```

This should be done with some caution as it does reduce the range of values that the query could
process before overflowing. It also can produce different result types. In this case instead of
producing a `Decimal(38, 10)` the result is a `Decimal(31, 17)`. If you really want the exact
same result type you can cast the result back to a `Decimal(38, 10)`, and the result will be
identical to before. But, it can have a positive impact to performance.

If you have made it this far in the documentation then you probably know what you are doing
and will use the following power only for good. It can often be difficult to
determine if adding casts to put some processing on the GPU would improve performance or not.
It can also be difficult to detect if a query might produce incorrect results because of a cast.
To help answer some of these questions we provide
`spark.rapids.sql.decimalOverflowGuarantees` that if set to false will disable guarantees for
overflow checking and run all decimal operations on the GPU, even if it cannot guarantee that
it will produce the exact same result as Spark. This should **never** be set to false in
production because it disables all guarantees, and if your data does overflow, it might produce
either a `null` value or worse an incorrect decimal value. But, it should give you more
information about what the performance impact might be if you tuned it with casting. If
you compare the results to GPU results with the guarantees still in place it should give you
an idea if casting would still produce a correct answer. Even with this you should go through
the query and your data and see what level of guarantees for outputs you are comfortable with.

## Unicode

Spark delegates Unicode operations to the underlying JVM. Each version of Java complies with a
Expand Down
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Name | Description | Default Value
<a name="sql.csv.read.long.enabled"></a>spark.rapids.sql.csv.read.long.enabled|Parsing CSV longs is much more lenient and will return 0 for some malformed values instead of null|false
<a name="sql.csv.read.short.enabled"></a>spark.rapids.sql.csv.read.short.enabled|Parsing CSV shorts is much more lenient and will return 0 for some malformed values instead of null|false
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.decimalType.enabled"></a>spark.rapids.sql.decimalType.enabled|Enable decimal type support on the GPU. Decimal support on the GPU is limited to less than 18 digits. This can result in a lot of data movement to and from the GPU, which can slow down processing in some cases.|false
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
Expand Down
1 change: 0 additions & 1 deletion docs/spark-profiling-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ Compare Rapids Properties which are set explicitly:
|spark.rapids.memory.pinnedPool.size |null |2g |
|spark.rapids.sql.castFloatToDecimal.enabled|null |true |
|spark.rapids.sql.concurrentGpuTasks |null |2 |
|spark.rapids.sql.decimalType.enabled |null |true |
|spark.rapids.sql.enabled |false |true |
|spark.rapids.sql.explain |null |NOT_ON_GPU|
|spark.rapids.sql.hasNans |null |FALSE |
Expand Down
Loading

0 comments on commit 5a6354d

Please sign in to comment.