Skip to content

Commit

Permalink
Add event logs for integration tests (NVIDIA#2520)
Browse files Browse the repository at this point in the history
* Add event logs for integration tests

- review configs, execution plans in SHS

Signed-off-by: Gera Shegalov <gera@apache.org>

* review

* space
  • Loading branch information
gerashegalov authored Jun 1, 2021
1 parent 747df66 commit e68da97
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 10 deletions.
17 changes: 17 additions & 0 deletions integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,23 @@ Basically, you need first to upload the test resources onto the cloud path `reso
`root-dir` of each executor(e.g. via `spark-submit --files root-dir ...`). After that you must set both `LOCAL_ROOTDIR=root-dir` and `INPUT_PATH=resource-path`
to run the shell-script, e.g. `LOCAL_ROOTDIR=root-dir INPUT_PATH=resource-path bash [run_pyspark_from_build.sh](run_pyspark_from_build.sh)`.

### Reviewing integration tests in Spark History Server

If the integration tests are run using [run_pyspark_from_build.sh](run_pyspark_from_build.sh) we have
the [event log enabled](https://spark.apache.org/docs/3.1.1/monitoring.html) by default. You can opt
out by setting the environment variable `SPARK_EVENTLOG_ENABLED` to `false`.

Compressed event logs will appear under the run directories of the form
`integration_tests/target/run_dir/eventlog_WORKERID`. If xdist is not used (e.g., `TEST_PARALLEL=1`)
the event log directory will be `integration_tests/target/run_dir/eventlog_gw0` as if executed by
worker 0 under xdist.

To review all the tests run by a particular worker you can start the History Server as follows:
```shell
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=integration_tests/target/run_dir/eventlog_gw0" \
${SPARK_HOME}/bin/spark-class org.apache.spark.deploy.history.HistoryServer
```

### Enabling cudf_udf Tests

The cudf_udf tests in this framework are testing Pandas UDF(user-defined function) with cuDF. They are disabled by default not only because of the complicated environment setup, but also because GPU resources scheduling for Pandas UDF is an experimental feature now, the performance may not always be better.
Expand Down
58 changes: 48 additions & 10 deletions integration_tests/src/main/python/spark_init_internal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os

try:
from pyspark.sql import SparkSession
import pyspark
except ImportError as error:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark

_DRIVER_ENV = 'PYSP_TEST_spark_driver_extraJavaOptions'

Expand All @@ -26,8 +28,7 @@ def _spark__init():
# DO NOT SET ANY OTHER CONFIGS HERE!!!
# due to bugs in pyspark/pytest it looks like any configs set here
# can be reset in the middle of a test if specific operations are done (some types of cast etc)
import os
_sb = SparkSession.builder
_sb = pyspark.sql.SparkSession.builder
_sb.config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \
.config("spark.sql.adaptive.enabled", "false") \
.config('spark.sql.queryExecutionListeners', 'com.nvidia.spark.rapids.ExecutionPlanCaptureCallback')
Expand All @@ -40,13 +41,12 @@ def _spark__init():

if ('PYTEST_XDIST_WORKER' in os.environ):
wid = os.environ['PYTEST_XDIST_WORKER']
d = "./derby_{}".format(wid)
if not os.path.exists(d):
os.makedirs(d)
_sb.config('spark.driver.extraJavaOptions', driver_opts + ' -Dderby.system.home={}'.format(d))
_handle_derby_dir(_sb, driver_opts, wid)
_handle_event_log_dir(_sb, wid)
else:
_sb.config('spark.driver.extraJavaOptions', driver_opts)

_handle_event_log_dir(_sb, 'gw0')

# enableHiveSupport() is needed for parquet bucket tests
_s = _sb.enableHiveSupport() \
.appName('rapids spark plugin integration tests (python)').getOrCreate()
Expand All @@ -55,6 +55,44 @@ def _spark__init():
_s.sparkContext.setLogLevel("WARN")
return _s


def _handle_derby_dir(sb, driver_opts, wid):
d = "./derby_{}".format(wid)
if not os.path.exists(d):
os.makedirs(d)
sb.config('spark.driver.extraJavaOptions', driver_opts + ' -Dderby.system.home={}'.format(d))


def _handle_event_log_dir(sb, wid):
if os.environ.get('SPARK_EVENTLOG_ENABLED', str(True)).lower() in [
str(False).lower(), 'off', '0'
]:
print('Automatic configuration for spark event log disabled')
return

spark_conf = pyspark.SparkConf()
master_url = os.environ.get('PYSP_TEST_spark_master',
spark_conf.get("spark.master", 'local'))
event_log_config = os.environ.get('PYSP_TEST_spark_eventLog_enabled',
spark_conf.get('spark.eventLog.enabled', str(False).lower()))

if not master_url.startswith('local') or event_log_config != str(False).lower():
print("SPARK_EVENTLOG_ENABLED is ignored for non-local Spark master and when "
"it's pre-configured by the user")
return
d = "./eventlog_{}".format(wid)
if not os.path.exists(d):
os.makedirs(d)

print('Spark event logs will appear under {}. Set the environmnet variable '
'SPARK_EVENTLOG_ENABLED=false if you want to disable it'.format(d))

sb\
.config('spark.eventLog.dir', "file://{}".format(os.path.abspath(d))) \
.config('spark.eventLog.compress', True) \
.config('spark.eventLog.enabled', True)


_spark = _spark__init()

def get_spark_i_know_what_i_am_doing():
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from conftest import is_allowing_any_non_gpu, get_non_gpu_allowed, get_validate_execs_in_gpu_plan, is_databricks_runtime
from pyspark.sql import SparkSession, DataFrame
from spark_init_internal import get_spark_i_know_what_i_am_doing, spark_version
Expand Down Expand Up @@ -64,11 +65,20 @@ def _check_for_proper_return_values(something):
def with_spark_session(func, conf={}):
"""Run func that takes a spark session as input with the given configs set."""
reset_spark_session_conf()
_add_job_description(conf)
_set_all_confs(conf)
ret = func(_spark)
_check_for_proper_return_values(ret)
return ret


def _add_job_description(conf):
is_gpu_job = conf.get('spark.rapids.sql.enabled', False)
job_type = 'GPU' if str(is_gpu_job).lower() == str(True).lower() else 'CPU'
job_desc = '{}[{}]'.format(os.environ.get('PYTEST_CURRENT_TEST'), job_type)
_spark.sparkContext.setJobDescription(job_desc)


def with_cpu_session(func, conf={}):
"""Run func that takes a spark session as input with the given configs set on the CPU."""
copy = dict(conf)
Expand Down

0 comments on commit e68da97

Please sign in to comment.