Skip to content

Commit

Permalink
Add disorder read schema test case for Parquet (#3032)
Browse files Browse the repository at this point in the history
Signed-off-by: Bobby Wang <wbo4958@gmail.com>
  • Loading branch information
wbo4958 authored Jul 27, 2021
1 parent 813db99 commit 6898557
Showing 1 changed file with 47 additions and 0 deletions.
47 changes: 47 additions & 0 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,3 +477,50 @@ def _create_wide_data_frame(spark, num_cols):
assert_gpu_and_cpu_are_equal_collect(
func=lambda spark: _create_wide_data_frame(spark, 1000),
is_cpu_first=False)

def setup_parquet_file_with_column_names(spark, table_name):
drop_query = "DROP TABLE IF EXISTS {}".format(table_name)
create_query = "CREATE TABLE `{}` (`a` INT, `b` ARRAY<INT>, `c` STRUCT<`c_1`: INT, `c_2`: STRING>) USING parquet"\
.format(table_name)
insert_query = "INSERT INTO {} VALUES(13, array(2020), named_struct('c_1', 1, 'c_2', 'hello'))".format(table_name)
spark.sql(drop_query).collect
spark.sql(create_query).collect
spark.sql(insert_query).collect

@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_disorder_read_schema_XXX(spark_tmp_table_factory, reader_confs, v1_enabled_list):
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
table_name = spark_tmp_table_factory.get()
with_cpu_session(lambda spark : setup_parquet_file_with_column_names(spark, table_name))
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT a,b FROM {}".format(table_name)),
all_confs)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT c,a FROM {}".format(table_name)),
all_confs)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT c,b FROM {}".format(table_name)),
all_confs)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT a,c,b FROM {}".format(table_name)),
all_confs)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT a,b,c FROM {}".format(table_name)),
all_confs)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT b,c,a FROM {}".format(table_name)),
all_confs)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT b,c,a FROM {}".format(table_name)),
all_confs)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT c,a,b FROM {}".format(table_name)),
all_confs)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT c,b,a FROM {}".format(table_name)),
all_confs)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT c.c_2,c.c_1,b,a FROM {}".format(table_name)),
all_confs)

0 comments on commit 6898557

Please sign in to comment.