Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update GPU ORC statistics write support #5715

Merged
merged 3 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ The plugin supports reading `uncompressed`, `snappy` and `zlib` ORC files and wr
and `snappy` ORC files. At this point, the plugin does not have the ability to fall back to the
CPU when reading an unsupported compression format, and will error out in that case.

### Push Down Aggreates for ORC
### Push Down Aggregates for ORC

Spark-3.3.0+ pushes down certain aggregations (`MIN`/`MAX`/`COUNT`) into ORC when the user-config
`spark.sql.orc.aggregatePushdown` is set to true.
Expand Down Expand Up @@ -414,16 +414,14 @@ file-statistics are missing (see [SPARK-34960 discussion](https://issues.apache.

**Limitations With RAPIDS**

RAPIDS does not support whole file statistics in ORC file. We are working with
[CUDF](https://github.com/rapidsai/cudf) to support writing statistics and you can track it
[here](https://github.com/rapidsai/cudf/issues/5826).
RAPIDS does not support whole file statistics in ORC file in releases _prior_ to release 22.06.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
RAPIDS does not support whole file statistics in ORC file in releases _prior_ to release 22.06.
RAPIDS does not support whole file statistics in ORC file in releases prior to release 22.06.


*Writing ORC Files*

tgravescs marked this conversation as resolved.
Show resolved Hide resolved
Without CUDF support to file statistics, all ORC files written by
the GPU are incompatible with the optimization causing an ORC read-job to fail as described above.
In order to prevent job failures, `spark.sql.orc.aggregatePushdown` should be disabled while reading ORC files
that were written by the GPU.
If you are using release prior to release 22.06 where CUDF does not support writing file statistics, then the ORC files
written by the GPU are incompatible with the optimization causing an ORC read-job to fail as described above.
In order to prevent job failures in releases prior to release 22.06, `spark.sql.orc.aggregatePushdown` should be disabled
while reading ORC files that were written by the GPU.

*Reading ORC Files*

Expand Down
32 changes: 11 additions & 21 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,16 @@ def test_orc_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enabled_li
_aggregate_orc_list_col_partition = ['COUNT']
_aggregate_orc_list_no_col_partition = ['MAX', 'MIN']
_aggregate_orc_list = _aggregate_orc_list_col_partition + _aggregate_orc_list_no_col_partition
_orc_aggregate_pushdown_enabled_conf = {'spark.sql.orc.aggregatePushdown': 'true',
_orc_aggregate_pushdown_enabled_conf = {'spark.rapids.sql.format.orc.write.enabled': 'true',
'spark.sql.orc.aggregatePushdown': 'true',
"spark.sql.sources.useV1SourceList": ""}

def _do_orc_scan_with_agg(spark, path, agg):
spark.range(10).selectExpr("id", "id % 3 as p").write.mode("overwrite").orc(path)
return spark.read.orc(path).selectExpr('{}(p)'.format(agg))

def _do_orc_scan_with_agg_on_partitioned_column(spark, path, agg):
spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").mode("overwrite").orc(path)
return spark.read.orc(path).selectExpr('{}(p)'.format(agg))

@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on ORC is a new feature of Spark 330')
Expand All @@ -582,11 +588,7 @@ def test_orc_scan_with_aggregate_pushdown(spark_tmp_path, aggregate):
| MAX | Y |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_00.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.orc(data_path))


# fallback to CPU
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
Expand All @@ -609,16 +611,10 @@ def test_orc_scan_with_aggregate_pushdown_on_col_partition(spark_tmp_path, aggre
| COUNT | Y | Y |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_01.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
# Partition column P
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.partitionBy("p")
.orc(data_path))


# fallback to CPU only if aggregate is COUNT
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
lambda spark: _do_orc_scan_with_agg_on_partitioned_column(spark, data_path, aggregate),
exist_classes="BatchScanExec",
non_exist_classes="GpuBatchScanExec",
conf=_orc_aggregate_pushdown_enabled_conf)
Expand All @@ -637,14 +633,8 @@ def test_orc_scan_with_aggregate_no_pushdown_on_col_partition(spark_tmp_path, ag
| MAX | Y | N |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_02.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
# Partition column P
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.partitionBy("p")
.orc(data_path))

# should not fallback to CPU
assert_gpu_and_cpu_are_equal_collect(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
lambda spark: _do_orc_scan_with_agg_on_partitioned_column(spark, data_path, aggregate),
conf=_orc_aggregate_pushdown_enabled_conf)