Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support driverlog argument in profiler CLI #897

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,11 @@ def _get_main_cluster_obj(self):
return self.ctxt.get_ctxt('cpuClusterProxy')

def _process_eventlogs_args(self):
def eventlogs_arg_is_requires():
if self.wrapper_options.get('requiresEventlogs') is not None:
return self.wrapper_options.get('requiresEventlogs')
return self.ctxt.requires_eventlogs()

eventlog_arg = self.wrapper_options.get('eventlogs')
if eventlog_arg is None:
# get the eventlogs from spark properties
Expand All @@ -661,7 +666,7 @@ def _process_eventlogs_args(self):
spark_event_logs = eventlog_arg.split(',')
else:
spark_event_logs = eventlog_arg
if len(spark_event_logs) < 1:
if eventlogs_arg_is_requires() and len(spark_event_logs) < 1:
self.logger.error('Eventlogs list is empty. '
'The cluster Spark properties may be missing "spark.eventLog.dir". '
'Re-run the command passing "--eventlogs" flag to the wrapper.')
Expand Down
6 changes: 6 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ def get_tool_main_class(self) -> str:
def get_rapids_auto_tuner_enabled(self) -> bool:
return self.get_value('sparkRapids', 'enableAutoTuner')

def requires_eventlogs(self) -> bool:
flag = self.get_value_silent('sparkRapids', 'requireEventLogs')
if flag is None:
return True
return flag

def get_rapids_output_folder(self) -> str:
root_dir = self.get_local('outputFolder')
rapids_subfolder = self.get_value_silent('toolOutput', 'subFolder')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ sparkRapids:
mainClass: 'com.nvidia.spark.rapids.tool.profiling.ProfileMain'
outputDocURL: 'https://docs.nvidia.com/spark-rapids/user-guide/latest/spark-profiling-tool.html#understanding-profiling-tool-detailed-output-and-examples'
enableAutoTuner: true
requireEventLogs: true
cli:
toolOptions:
- csv
- any
- a
- application-name
- d
- driverlog
- f
- filter-criteria
- g
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ sparkRapids:
mainClass: 'com.nvidia.spark.rapids.tool.qualification.QualificationMain'
outputDocURL: 'https://docs.nvidia.com/spark-rapids/user-guide/latest/spark-qualification-tool.html#understanding-the-qualification-tool-output'
enableAutoTuner: true
requireEventLogs: true
gpu:
device: 't4'
workersPerNode: 2
Expand Down
55 changes: 46 additions & 9 deletions user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,7 @@ def init_extra_arg_cases(self) -> list:

def define_invalid_arg_cases(self):
super().define_invalid_arg_cases()
self.rejected['Missing Eventlogs'] = {
'valid': False,
'callable': partial(self.raise_validation_exception,
'Cannot run tool cmd. Cannot define eventlogs from input '
'(platform, cluster, and eventlogs)'),
'cases': [
[ArgValueCase.IGNORE, ArgValueCase.UNDEFINED, ArgValueCase.UNDEFINED]
]
}
self.define_rejected_missing_eventlogs()
self.rejected['Cluster By Name Without Platform Hints'] = {
'valid': False,
'callable': partial(self.raise_validation_exception,
Expand All @@ -296,6 +288,17 @@ def define_invalid_arg_cases(self):
]
}

def define_rejected_missing_eventlogs(self):
self.rejected['Missing Eventlogs'] = {
'valid': False,
'callable': partial(self.raise_validation_exception,
'Cannot run tool cmd. Cannot define eventlogs from input '
'(platform, cluster, and eventlogs)'),
'cases': [
[ArgValueCase.IGNORE, ArgValueCase.UNDEFINED, ArgValueCase.UNDEFINED]
]
}

def define_detection_cases(self):
self.detected['Define Platform from Cluster Properties file'] = {
'valid': True,
Expand Down Expand Up @@ -459,6 +462,7 @@ class ProfileUserArgModel(ToolUserArgModel):
Represents the arguments collected by the user to run the profiling tool.
This is used as doing preliminary validation against some of the common pattern
"""
driverlog: Optional[str] = None

def determine_cluster_arg_type(self) -> ArgValueCase:
cluster_case = super().determine_cluster_arg_type()
Expand All @@ -470,9 +474,27 @@ def determine_cluster_arg_type(self) -> ArgValueCase:
self.p_args['toolArgs']['autotuner'] = self.cluster
return cluster_case

def init_driverlog_argument(self):
if self.driverlog is None:
self.p_args['toolArgs']['driverlog'] = None
else:
if not CspPath.is_file_path(self.driverlog, raise_on_error=False):
raise PydanticCustomError(
'file_path',
'Driver log file path is not valid\n Error:')

# the file cannot be a http_url
if is_http_file(self.cluster):
# we do not accept http://urls
raise PydanticCustomError(
'invalid_argument',
f'Driver log file path cannot be a web URL path: {self.driverlog}\n Error:')
self.p_args['toolArgs']['driverlog'] = self.driverlog

def init_tool_args(self):
self.p_args['toolArgs']['platform'] = self.platform
self.p_args['toolArgs']['autotuner'] = None
self.init_driverlog_argument()

def define_invalid_arg_cases(self):
super().define_invalid_arg_cases()
Expand All @@ -485,6 +507,10 @@ def define_invalid_arg_cases(self):
]
}

def define_rejected_missing_eventlogs(self):
if self.p_args['toolArgs']['driverlog'] is None:
super().define_rejected_missing_eventlogs()

def define_detection_cases(self):
super().define_detection_cases()
# append the case when the autotuner input
Expand All @@ -507,6 +533,15 @@ def build_tools_args(self) -> dict:
else:
# this is an actual cluster argument
self.p_args['toolArgs']['cluster'] = self.cluster
if self.p_args['toolArgs']['driverlog'] is None:
requires_event_logs = True
rapid_options = {}
else:
requires_event_logs = False
rapid_options = {
'driverlog': self.p_args['toolArgs']['driverlog']
}

# finally generate the final values
wrapped_args = {
'runtimePlatform': runtime_platform,
Expand All @@ -525,6 +560,8 @@ def build_tools_args(self) -> dict:
}
},
'eventlogs': self.eventlogs,
'requiresEventlogs': requires_event_logs,
'rapidOptions': rapid_options,
'toolsJar': None,
'autoTunerFileInput': self.p_args['toolArgs']['autotuner']
}
Expand Down
6 changes: 5 additions & 1 deletion user_tools/src/spark_rapids_tools/cmdli/tools_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def profiling(self,
eventlogs: str = None,
cluster: str = None,
platform: str = None,
driverlog: str = None,
output_folder: str = None,
verbose: bool = None,
**rapids_options):
Expand All @@ -159,10 +160,11 @@ def profiling(self,
containing event logs (comma separated). If missing, the wrapper reads the Spark's
property `spark.eventLog.dir` defined in the `cluster`.
:param cluster: The cluster on which the Spark applications were executed. The argument
can be a cluster name od ID (for databricks platforms) or a valid path to the cluster's
can be a cluster name or ID (for databricks platforms) or a valid path to the cluster's
properties file (json format) generated by the CSP SDK.
:param platform: defines one of the following "onprem", "emr", "dataproc", "databricks-aws",
and "databricks-azure".
:param driverlog: Valid path to the GPU driver log file.
:param output_folder: path to store the output.
:param verbose: True or False to enable verbosity of the script.
:param rapids_options: A list of valid Profiling tool options.
Expand All @@ -183,8 +185,10 @@ def profiling(self,
eventlogs=eventlogs,
cluster=cluster,
platform=platform,
driverlog=driverlog,
output_folder=output_folder)
if prof_args:
rapids_options.update(prof_args['rapidOptions'])
tool_obj = ProfilingAsLocal(platform_type=prof_args['runtimePlatform'],
output_folder=prof_args['outputFolder'],
wrapper_options=prof_args,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def validate_args_w_savings_disabled(tool_name: str, t_args: dict):
assert t_args['filterApps'] != QualFilterApp.SAVINGS

@staticmethod
def create_tool_args_should_pass(tool_name: str, platform=None, cluster=None, eventlogs=None):
def create_tool_args_should_pass(tool_name: str, platform=None, cluster=None,
eventlogs=None):
return AbsToolUserArgModel.create_tool_args(tool_name,
platform=platform,
cluster=cluster,
Expand Down Expand Up @@ -286,6 +287,13 @@ def test_with_platform_with_autotuner_with_eventlogs(self, get_ut_data_dir, tool
cost_savings_enabled=False,
expected_platform=CspEnv.ONPREM)

@pytest.mark.parametrize('prop_path', [autotuner_prop_path])
def test_profiler_with_driverlog(self, get_ut_data_dir, prop_path):
prof_args = AbsToolUserArgModel.create_tool_args('profiling',
driverlog=f'{get_ut_data_dir}/{prop_path}')
assert not prof_args['requiresEventlogs']
assert prof_args['rapidOptions']['driverlog'] == f'{get_ut_data_dir}/{prop_path}'

def test_arg_cases_coverage(self):
"""
This test ensures that above tests have covered all possible states of the `platform`, `cluster`,
Expand Down