Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decrease pre_merge_ci parallelism to 4 and reordering time-consuming tests #3455

Merged
merged 8 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration_tests/src/main/python/conditionals_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def test_if_else_map(data_gen):
'IF(a, b, c)'),
conf = allow_negative_scale_of_decimal_conf)

@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gens + all_nested_gens, ids=idfn)
def test_case_when(data_gen):
num_cmps = 20
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def test_explode_litarray(data_gen):
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'}

@ignore_order(local=True)
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gen + struct_gens_sample + array_gens_sample + map_gens_sample, ids=idfn)
def test_explode_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(data_gen)]
Expand Down Expand Up @@ -81,6 +82,7 @@ def test_explode_nested_array_data(spark_tmp_path, data_gen):
#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gen + struct_gens_sample + array_gens_sample + map_gens_sample, ids=idfn)
def test_explode_outer_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(data_gen)]
Expand Down Expand Up @@ -130,6 +132,7 @@ def test_posexplode_litarray(data_gen):
#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gen + struct_gens_sample + array_gens_sample + map_gens_sample, ids=idfn)
def test_posexplode_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(data_gen)]
Expand Down Expand Up @@ -161,6 +164,7 @@ def test_posexplode_nested_array_data(spark_tmp_path, data_gen):
#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gen + struct_gens_sample + array_gens_sample + map_gens_sample, ids=idfn)
def test_posexplode_outer_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(data_gen)]
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens, ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_cartesian_join(data_gen, batch_size):
Expand All @@ -257,6 +258,7 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.xfail(condition=is_databricks_runtime(),
reason='https://github.com/NVIDIA/spark-rapids/issues/334')
@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens, ids=idfn)
Expand All @@ -271,6 +273,7 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.xfail(condition=is_databricks_runtime(),
reason='https://github.com/NVIDIA/spark-rapids/issues/334')
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
Expand All @@ -285,6 +288,7 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_cartesian_join_with_condition(data_gen, batch_size):
Expand Down
5 changes: 2 additions & 3 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
from spark_session import with_cpu_session, with_spark_session
from parquet_test import _nested_pruning_schemas

# Mark all tests in current file as premerge_ci_1 in order to be run in first k8s pod for parallel build premerge job
pytestmark = pytest.mark.premerge_ci_1

def read_orc_df(data_path):
return lambda spark : spark.read.orc(data_path)

Expand Down Expand Up @@ -123,6 +120,7 @@ def test_orc_fallback(spark_tmp_path, read_func, disable_conf):
conf={disable_conf: 'false',
"spark.sql.sources.useV1SourceList": "orc"})

@pytest.mark.order(2)
@pytest.mark.parametrize('orc_gens', orc_gens_list, ids=idfn)
@pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql])
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
Expand All @@ -147,6 +145,7 @@ def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_e
# timestamp_gen
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))]

@pytest.mark.order(2)
@pytest.mark.parametrize('orc_gen', orc_pred_push_gens, ids=idfn)
@pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql])
@pytest.mark.parametrize('v1_enabled_list', ["", "orc"])
Expand Down
5 changes: 2 additions & 3 deletions integration_tests/src/main/python/orc_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
from marks import *
from pyspark.sql.types import *

# Mark all tests in current file as premerge_ci_1 in order to be run in first k8s pod for parallel build premerge job
pytestmark = pytest.mark.premerge_ci_1

orc_write_gens_list = [
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
Expand Down Expand Up @@ -75,6 +72,7 @@ def test_compress_write_round_trip(spark_tmp_path, compress):
data_path,
conf={'spark.sql.orc.compression.codec': compress, 'spark.rapids.sql.format.orc.write.enabled': True})

@pytest.mark.order(2)
@pytest.mark.parametrize('orc_gens', orc_write_gens_list, ids=idfn)
@pytest.mark.parametrize('orc_impl', ["native", "hive"])
def test_write_save_table(spark_tmp_path, orc_gens, orc_impl, spark_tmp_table_factory):
Expand All @@ -95,6 +93,7 @@ def write_orc_sql_from(spark, df, data_path, write_to_table):
write_cmd = 'CREATE TABLE `{}` USING ORC location \'{}\' AS SELECT * from `{}`'.format(write_to_table, data_path, tmp_view_name)
spark.sql(write_cmd)

@pytest.mark.order(2)
@pytest.mark.parametrize('orc_gens', orc_write_gens_list, ids=idfn)
@pytest.mark.parametrize('ts_type', ["TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"])
@pytest.mark.parametrize('orc_impl', ["native", "hive"])
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def limited_timestamp(nullable=True):
parquet_basic_gen + parquet_struct_gen + parquet_array_gen + parquet_decimal_gens + parquet_map_gens]
parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS']

@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
Expand Down Expand Up @@ -117,6 +118,7 @@ def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase):

# There are race conditions around when individual files are read in for partitioned data
@ignore_order
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('parquet_gen', parquet_part_write_gens, ids=idfn)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
Expand Down Expand Up @@ -180,6 +182,7 @@ def test_compress_write_round_trip(spark_tmp_path, compress, v1_enabled_list, re
data_path,
conf=all_confs)

@pytest.mark.order(2)
@pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn)
@pytest.mark.parametrize('ts_type', parquet_ts_write_options)
def test_write_save_table(spark_tmp_path, parquet_gens, ts_type, spark_tmp_table_factory):
Expand All @@ -198,6 +201,7 @@ def write_parquet_sql_from(spark, df, data_path, write_to_table):
write_cmd = 'CREATE TABLE `{}` USING PARQUET location \'{}\' AS SELECT * from `{}`'.format(write_to_table, data_path, tmp_view_name)
spark.sql(write_cmd)

@pytest.mark.order(2)
@pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn)
@pytest.mark.parametrize('ts_type', parquet_ts_write_options)
def test_write_sql_save_table(spark_tmp_path, parquet_gens, ts_type, spark_tmp_table_factory):
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def test_large_orderby(data_gen, stable_sort):
StructGen([('child1', byte_gen)]),
simple_string_to_string_map_gen,
ArrayGen(byte_gen, max_length=5)], ids=idfn)
@pytest.mark.order(2)
def test_large_orderby_nested_ridealong(data_gen):
# We use a LongRangeGen to avoid duplicate keys that can cause ambiguity in the sort
# results, especially on distributed clusters.
Expand All @@ -278,6 +279,7 @@ def test_large_orderby_nested_ridealong(data_gen):
StructGen([('child1', byte_gen)]),
simple_string_to_string_map_gen,
ArrayGen(byte_gen, max_length=5)], ids=idfn)
@pytest.mark.order(2)
def test_orderby_nested_ridealong_limit(data_gen):
# We use a LongRangeGen to avoid duplicate keys that can cause ambiguity in the sort
# results, especially on distributed clusters.
Expand Down
2 changes: 1 addition & 1 deletion jenkins/Dockerfile-blossom.ubuntu
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ RUN python3.8 -m easy_install pip
RUN update-java-alternatives --set /usr/lib/jvm/java-1.8.0-openjdk-amd64

RUN ln -s /usr/bin/python3.8 /usr/bin/python
RUN python -m pip install pytest sre_yield requests pandas pyarrow findspark pytest-xdist pre-commit
RUN python -m pip install pytest sre_yield requests pandas pyarrow findspark pytest-xdist pre-commit pytest-order

# libnuma1 and libgomp1 are required by ucx packaging
RUN apt install -y inetutils-ping expect wget libnuma1 libgomp1
Expand Down
9 changes: 6 additions & 3 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mvn_verify() {
# Here run Python integration tests tagged with 'premerge_ci_1' only, that would help balance test duration and memory
# consumption from two k8s pods running in parallel, which executes 'mvn_verify()' and 'ci_2()' respectively.
mvn -U -B $MVN_URM_MIRROR '-Pindividual,pre-merge' clean verify -Dpytest.TEST_TAGS="premerge_ci_1" \
-Dpytest.TEST_TYPE="pre-commit" -Dpytest.TEST_PARALLEL=5 -Dcuda.version=$CUDA_CLASSIFIER
-Dpytest.TEST_TYPE="pre-commit" -Dpytest.TEST_PARALLEL=4 -Dcuda.version=$CUDA_CLASSIFIER

# Run the unit tests for other Spark versions but don't run full python integration tests
# NOT ALL TESTS NEEDED FOR PREMERGE
Expand Down Expand Up @@ -94,9 +94,12 @@ ci_2() {
mvn -U -B $MVN_URM_MIRROR clean package -DskipTests=true -Dcuda.version=$CUDA_CLASSIFIER
export TEST_TAGS="not premerge_ci_1"
export TEST_TYPE="pre-commit"
export TEST_PARALLEL=4
# separate process to avoid OOM kill
TEST_PARALLEL=4 TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh
TEST_PARALLEL=5 TEST='not conditionals_test and not window_function_test' ./integration_tests/run_pyspark_from_build.sh
TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh
TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh
TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \
./integration_tests/run_pyspark_from_build.sh
}


Expand Down