Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event logs for integration tests #2520

Merged
merged 3 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,23 @@ Basically, you need first to upload the test resources onto the cloud path `reso
`root-dir` of each executor(e.g. via `spark-submit --files root-dir ...`). After that you must set both `LOCAL_ROOTDIR=root-dir` and `INPUT_PATH=resource-path`
to run the shell-script, e.g. `LOCAL_ROOTDIR=root-dir INPUT_PATH=resource-path bash [run_pyspark_from_build.sh](run_pyspark_from_build.sh)`.

### Reviewing integration tests in Spark History Server

If the integration tests are run using [run_pyspark_from_build.sh](run_pyspark_from_build.sh) we have
the [event log enabled](https://spark.apache.org/docs/3.1.1/monitoring.html) by default. You can opt
out by setting the environment variable `SPARK_EVENTLOG_ENABLED` to `false`.

Compressed event logs will appear under the run directories of the form
`integration_tests/target/run_dir/eventlog_WORKERID`. If xdist is not used (e.g., `TEST_PARALLEL=1`)
the event log directory will be `integration_tests/target/run_dir/eventlog_gw0` as if executed by
worker 0 under xdist.

To review all the tests run by a particular worker you can start the History Server as follows:
```shell
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=integration_tests/target/run_dir/eventlog_gw0" \
${SPARK_HOME}/bin/spark-class org.apache.spark.deploy.history.HistoryServer
```

### Enabling cudf_udf Tests

The cudf_udf tests in this framework are testing Pandas UDF(user-defined function) with cuDF. They are disabled by default not only because of the complicated environment setup, but also because GPU resources scheduling for Pandas UDF is an experimental feature now, the performance may not always be better.
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ else
export PYSP_TEST_spark_ui_showConsoleProgress='false'
export PYSP_TEST_spark_sql_session_timeZone='UTC'
export PYSP_TEST_spark_sql_shuffle_partitions='12'
export PYSP_TEST_spark_eventLog_enabled=${SPARK_EVENTLOG_ENABLED:-'true'}
export PYSP_TEST_spark_eventLog_compress=true
if ((${#TEST_PARALLEL_OPTS[@]} > 0));
then
export PYSP_TEST_spark_rapids_memory_gpu_allocFraction=$MEMORY_FRACTION
Expand Down
26 changes: 20 additions & 6 deletions integration_tests/src/main/python/spark_init_internal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os

try:
from pyspark.sql import SparkSession
except ImportError as error:
Expand All @@ -26,7 +28,6 @@ def _spark__init():
# DO NOT SET ANY OTHER CONFIGS HERE!!!
# due to bugs in pyspark/pytest it looks like any configs set here
# can be reset in the middle of a test if specific operations are done (some types of cast etc)
import os
_sb = SparkSession.builder
_sb.config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \
.config("spark.sql.adaptive.enabled", "false") \
Expand All @@ -40,12 +41,11 @@ def _spark__init():

if ('PYTEST_XDIST_WORKER' in os.environ):
wid = os.environ['PYTEST_XDIST_WORKER']
d = "./derby_{}".format(wid)
if not os.path.exists(d):
os.makedirs(d)
_sb.config('spark.driver.extraJavaOptions', driver_opts + ' -Dderby.system.home={}'.format(d))
_create_derby_dir(_sb, driver_opts, wid)
_create_event_log_dir(_sb, wid)
else:
_sb.config('spark.driver.extraJavaOptions', driver_opts)
_create_event_log_dir(_sb, 'gw0')

# enableHiveSupport() is needed for parquet bucket tests
_s = _sb.enableHiveSupport() \
Expand All @@ -55,6 +55,20 @@ def _spark__init():
_s.sparkContext.setLogLevel("WARN")
return _s


def _create_derby_dir(sb, driver_opts, wid):
revans2 marked this conversation as resolved.
Show resolved Hide resolved
d = "./derby_{}".format(wid)
if not os.path.exists(d):
os.makedirs(d)
sb.config('spark.driver.extraJavaOptions', driver_opts + ' -Dderby.system.home={}'.format(d))

def _create_event_log_dir(sb, wid):
d = "./eventlog_{}".format(wid)
if not os.path.exists(d):
os.makedirs(d)
sb.config('spark.eventLog.dir', "file://{}".format(os.path.abspath(d)))
revans2 marked this conversation as resolved.
Show resolved Hide resolved


_spark = _spark__init()

def get_spark_i_know_what_i_am_doing():
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from conftest import is_allowing_any_non_gpu, get_non_gpu_allowed, get_validate_execs_in_gpu_plan, is_databricks_runtime
from pyspark.sql import SparkSession, DataFrame
from spark_init_internal import get_spark_i_know_what_i_am_doing, spark_version
Expand Down Expand Up @@ -64,11 +65,20 @@ def _check_for_proper_return_values(something):
def with_spark_session(func, conf={}):
"""Run func that takes a spark session as input with the given configs set."""
reset_spark_session_conf()
_add_job_description(conf)
_set_all_confs(conf)
ret = func(_spark)
_check_for_proper_return_values(ret)
return ret


def _add_job_description(conf):
abellina marked this conversation as resolved.
Show resolved Hide resolved
is_gpu_job = conf.get('spark.rapids.sql.enabled', False)
job_type = 'GPU' if str(is_gpu_job).lower() == str(True).lower() else 'CPU'
job_desc = '{}[{}]'.format(os.environ.get('PYTEST_CURRENT_TEST'), job_type)
_spark.sparkContext.setJobDescription(job_desc)


abellina marked this conversation as resolved.
Show resolved Hide resolved
def with_cpu_session(func, conf={}):
"""Run func that takes a spark session as input with the given configs set on the CPU."""
copy = dict(conf)
Expand Down