Skip to content

Commit

Permalink
Merge pull request #3082 from NVIDIA/branch-21.08
Browse files Browse the repository at this point in the history
[auto-merge] branch-21.08 to branch-21.10 [skip ci] [bot]
  • Loading branch information
nvauto authored Jul 29, 2021
2 parents 00ab4e3 + f8ba1ba commit d234816
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 45 deletions.
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -21012,9 +21012,9 @@ dates or timestamps, or for a lack of type coercion support.
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, BINARY, MAP, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, BINARY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, MAP, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
31 changes: 5 additions & 26 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,13 @@ def test_basic_read(std_input_path, name, read_func, v1_enabled_list, orc_impl,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))]

orc_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_basic_gens)])

# Some array gens, but not all because of nesting
orc_array_gens_sample = [ArrayGen(sub_gen) for sub_gen in orc_basic_gens] + [
ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10),
ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))]

# Some struct gens, but not all because of nesting.
# No empty struct gen because it leads to an error as below.
# '''
# E pyspark.sql.utils.AnalysisException:
# E Datasource does not support writing empty or nested empty schemas.
# E Please make sure the data schema has at least one or more column(s).
# '''
orc_struct_gens_sample = [orc_basic_struct_gen,
StructGen([['child0', byte_gen], ['child1', orc_basic_struct_gen]]),
StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])]
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10)]

orc_gens_list = [orc_basic_gens,
orc_array_gens_sample,
orc_struct_gens_sample,
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131')),
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131'))]

Expand Down Expand Up @@ -104,9 +89,7 @@ def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_e
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.orc(data_path))
all_confs = reader_confs.copy()
# Nested schema pruning is not supported yet for orc read.
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.optimizer.nestedSchemaPruning.enabled': "false"})
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
read_func(data_path),
conf=all_confs)
Expand All @@ -127,19 +110,15 @@ def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_e
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
def test_pred_push_round_trip(spark_tmp_path, orc_gen, read_func, v1_enabled_list, reader_confs):
data_path = spark_tmp_path + '/ORC_DATA'
# Append two struct columns to verify nested predicate pushdown.
gen_list = [('a', RepeatSeqGen(orc_gen, 100)), ('b', orc_gen),
('s1', StructGen([['sa', orc_gen]])),
('s2', StructGen([['sa', StructGen([['ssa', orc_gen]])]]))]
gen_list = [('a', RepeatSeqGen(orc_gen, 100)), ('b', orc_gen)]
s0 = gen_scalar(orc_gen, force_no_nulls=True)
with_cpu_session(
lambda spark : gen_df(spark, gen_list).orderBy('a').write.orc(data_path))
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.optimizer.nestedSchemaPruning.enabled': "false"})
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
rf = read_func(data_path)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: rf(spark).select(f.col('a') >= s0, f.col('s1.sa') >= s0, f.col('s2.sa.ssa') >= s0),
lambda spark: rf(spark).select(f.col('a') >= s0),
conf=all_confs)

orc_compress_options = ['none', 'uncompressed', 'snappy', 'zlib']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.OrcFilters
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -124,19 +124,6 @@ object GpuOrcScanBase {
.getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) {
meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}

if (sparkSession.conf
.getOption("spark.sql.optimizer.nestedSchemaPruning.enabled").exists(_.toBoolean) &&
schema.exists(f => hasStructType(f.dataType))) {
meta.willNotWorkOnGpu("nested schema pruning is not supported yet")
}
}

private def hasStructType(dt: DataType): Boolean = dt match {
case m: MapType => hasStructType(m.keyType) || hasStructType(m.valueType)
case a: ArrayType => hasStructType(a.elementType)
case _: StructType => true
case _ => false
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ object GpuOverrides {
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())),
(OrcFormatType, FileFormatChecks(
cudfRead = (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
cudfRead = (TypeSig.commonCudfTypes + TypeSig.ARRAY).nested(),
cudfWrite = TypeSig.commonCudfTypes,
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())))
Expand Down
2 changes: 1 addition & 1 deletion tools/src/main/resources/supportedDataSource.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,DECIMAL,NULL,BINARY,CALENDAR,ARRAY,MAP,STRUCT,UDT
CSV,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,S,CO,NA,NS,NA,NA,NA,NA,NA
ORC,read,S,S,S,S,S,S,S,S,S*,S,CO,NA,NS,NA,PS*,NS,PS*,NS
ORC,read,S,S,S,S,S,S,S,S,S*,S,CO,NA,NS,NA,PS*,NS,NS,NS
Parquet,read,S,S,S,S,S,S,S,S,S*,S,CO,NA,NS,NA,PS*,PS*,PS*,NS
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Score,Potential Problems,SQL DF Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,Read File Format Score,Unsupported Read File Formats and Types
Spark shell,local-1626104300434,1322.1,DECIMAL,2429,1469,131104,88.35,false,160,"",20,50.0,Parquet[decimal];ORC[map:decimal]
Spark shell,local-1626104300434,1322.1,DECIMAL,2429,1469,131104,88.35,false,160,"",20,50.0,Parquet[decimal];ORC[map:struct:decimal]

0 comments on commit d234816

Please sign in to comment.