diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 4d7dfc082..9c2f63ce2 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -644,6 +644,11 @@ def _get_main_cluster_obj(self): return self.ctxt.get_ctxt('cpuClusterProxy') def _process_eventlogs_args(self): + def eventlogs_arg_is_requires(): + if self.wrapper_options.get('requiresEventlogs') is not None: + return self.wrapper_options.get('requiresEventlogs') + return self.ctxt.requires_eventlogs() + eventlog_arg = self.wrapper_options.get('eventlogs') if eventlog_arg is None: # get the eventlogs from spark properties @@ -661,7 +666,7 @@ def _process_eventlogs_args(self): spark_event_logs = eventlog_arg.split(',') else: spark_event_logs = eventlog_arg - if len(spark_event_logs) < 1: + if eventlogs_arg_is_requires() and len(spark_event_logs) < 1: self.logger.error('Eventlogs list is empty. ' 'The cluster Spark properties may be missing "spark.eventLog.dir". ' 'Re-run the command passing "--eventlogs" flag to the wrapper.') diff --git a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py index 394e0aa0b..be350b67e 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py +++ b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py @@ -188,6 +188,12 @@ def get_tool_main_class(self) -> str: def get_rapids_auto_tuner_enabled(self) -> bool: return self.get_value('sparkRapids', 'enableAutoTuner') + def requires_eventlogs(self) -> bool: + flag = self.get_value_silent('sparkRapids', 'requireEventLogs') + if flag is None: + return True + return flag + def get_rapids_output_folder(self) -> str: root_dir = self.get_local('outputFolder') rapids_subfolder = self.get_value_silent('toolOutput', 'subFolder') diff --git a/user_tools/src/spark_rapids_pytools/resources/profiling-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/profiling-conf.yaml index 68f005312..8ff783fd6 100644 --- a/user_tools/src/spark_rapids_pytools/resources/profiling-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/profiling-conf.yaml @@ -12,12 +12,15 @@ sparkRapids: mainClass: 'com.nvidia.spark.rapids.tool.profiling.ProfileMain' outputDocURL: 'https://docs.nvidia.com/spark-rapids/user-guide/latest/spark-profiling-tool.html#understanding-profiling-tool-detailed-output-and-examples' enableAutoTuner: true + requireEventLogs: true cli: toolOptions: - csv - any - a - application-name + - d + - driverlog - f - filter-criteria - g diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index b8ec3cc70..b55ccd49d 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -53,6 +53,7 @@ sparkRapids: mainClass: 'com.nvidia.spark.rapids.tool.qualification.QualificationMain' outputDocURL: 'https://docs.nvidia.com/spark-rapids/user-guide/latest/spark-qualification-tool.html#understanding-the-qualification-tool-output' enableAutoTuner: true + requireEventLogs: true gpu: device: 't4' workersPerNode: 2 diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index 70db73d7e..d3723f84c 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -262,15 +262,7 @@ def init_extra_arg_cases(self) -> list: def define_invalid_arg_cases(self): super().define_invalid_arg_cases() - self.rejected['Missing Eventlogs'] = { - 'valid': False, - 'callable': partial(self.raise_validation_exception, - 'Cannot run tool cmd. Cannot define eventlogs from input ' - '(platform, cluster, and eventlogs)'), - 'cases': [ - [ArgValueCase.IGNORE, ArgValueCase.UNDEFINED, ArgValueCase.UNDEFINED] - ] - } + self.define_rejected_missing_eventlogs() self.rejected['Cluster By Name Without Platform Hints'] = { 'valid': False, 'callable': partial(self.raise_validation_exception, @@ -296,6 +288,17 @@ def define_invalid_arg_cases(self): ] } + def define_rejected_missing_eventlogs(self): + self.rejected['Missing Eventlogs'] = { + 'valid': False, + 'callable': partial(self.raise_validation_exception, + 'Cannot run tool cmd. Cannot define eventlogs from input ' + '(platform, cluster, and eventlogs)'), + 'cases': [ + [ArgValueCase.IGNORE, ArgValueCase.UNDEFINED, ArgValueCase.UNDEFINED] + ] + } + def define_detection_cases(self): self.detected['Define Platform from Cluster Properties file'] = { 'valid': True, @@ -459,6 +462,7 @@ class ProfileUserArgModel(ToolUserArgModel): Represents the arguments collected by the user to run the profiling tool. This is used as doing preliminary validation against some of the common pattern """ + driverlog: Optional[str] = None def determine_cluster_arg_type(self) -> ArgValueCase: cluster_case = super().determine_cluster_arg_type() @@ -470,9 +474,27 @@ def determine_cluster_arg_type(self) -> ArgValueCase: self.p_args['toolArgs']['autotuner'] = self.cluster return cluster_case + def init_driverlog_argument(self): + if self.driverlog is None: + self.p_args['toolArgs']['driverlog'] = None + else: + if not CspPath.is_file_path(self.driverlog, raise_on_error=False): + raise PydanticCustomError( + 'file_path', + 'Driver log file path is not valid\n Error:') + + # the file cannot be a http_url + if is_http_file(self.cluster): + # we do not accept http://urls + raise PydanticCustomError( + 'invalid_argument', + f'Driver log file path cannot be a web URL path: {self.driverlog}\n Error:') + self.p_args['toolArgs']['driverlog'] = self.driverlog + def init_tool_args(self): self.p_args['toolArgs']['platform'] = self.platform self.p_args['toolArgs']['autotuner'] = None + self.init_driverlog_argument() def define_invalid_arg_cases(self): super().define_invalid_arg_cases() @@ -485,6 +507,10 @@ def define_invalid_arg_cases(self): ] } + def define_rejected_missing_eventlogs(self): + if self.p_args['toolArgs']['driverlog'] is None: + super().define_rejected_missing_eventlogs() + def define_detection_cases(self): super().define_detection_cases() # append the case when the autotuner input @@ -507,6 +533,15 @@ def build_tools_args(self) -> dict: else: # this is an actual cluster argument self.p_args['toolArgs']['cluster'] = self.cluster + if self.p_args['toolArgs']['driverlog'] is None: + requires_event_logs = True + rapid_options = {} + else: + requires_event_logs = False + rapid_options = { + 'driverlog': self.p_args['toolArgs']['driverlog'] + } + # finally generate the final values wrapped_args = { 'runtimePlatform': runtime_platform, @@ -525,6 +560,8 @@ def build_tools_args(self) -> dict: } }, 'eventlogs': self.eventlogs, + 'requiresEventlogs': requires_event_logs, + 'rapidOptions': rapid_options, 'toolsJar': None, 'autoTunerFileInput': self.p_args['toolArgs']['autotuner'] } diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index 7f31c451a..6ac3c78bd 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -144,6 +144,7 @@ def profiling(self, eventlogs: str = None, cluster: str = None, platform: str = None, + driverlog: str = None, output_folder: str = None, verbose: bool = None, **rapids_options): @@ -159,10 +160,11 @@ def profiling(self, containing event logs (comma separated). If missing, the wrapper reads the Spark's property `spark.eventLog.dir` defined in the `cluster`. :param cluster: The cluster on which the Spark applications were executed. The argument - can be a cluster name od ID (for databricks platforms) or a valid path to the cluster's + can be a cluster name or ID (for databricks platforms) or a valid path to the cluster's properties file (json format) generated by the CSP SDK. :param platform: defines one of the following "onprem", "emr", "dataproc", "databricks-aws", and "databricks-azure". + :param driverlog: Valid path to the GPU driver log file. :param output_folder: path to store the output. :param verbose: True or False to enable verbosity of the script. :param rapids_options: A list of valid Profiling tool options. @@ -183,8 +185,10 @@ def profiling(self, eventlogs=eventlogs, cluster=cluster, platform=platform, + driverlog=driverlog, output_folder=output_folder) if prof_args: + rapids_options.update(prof_args['rapidOptions']) tool_obj = ProfilingAsLocal(platform_type=prof_args['runtimePlatform'], output_folder=prof_args['outputFolder'], wrapper_options=prof_args, diff --git a/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py b/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py index f98c4ba7f..acb950590 100644 --- a/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py +++ b/user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py @@ -76,7 +76,8 @@ def validate_args_w_savings_disabled(tool_name: str, t_args: dict): assert t_args['filterApps'] != QualFilterApp.SAVINGS @staticmethod - def create_tool_args_should_pass(tool_name: str, platform=None, cluster=None, eventlogs=None): + def create_tool_args_should_pass(tool_name: str, platform=None, cluster=None, + eventlogs=None): return AbsToolUserArgModel.create_tool_args(tool_name, platform=platform, cluster=cluster, @@ -286,6 +287,13 @@ def test_with_platform_with_autotuner_with_eventlogs(self, get_ut_data_dir, tool cost_savings_enabled=False, expected_platform=CspEnv.ONPREM) + @pytest.mark.parametrize('prop_path', [autotuner_prop_path]) + def test_profiler_with_driverlog(self, get_ut_data_dir, prop_path): + prof_args = AbsToolUserArgModel.create_tool_args('profiling', + driverlog=f'{get_ut_data_dir}/{prop_path}') + assert not prof_args['requiresEventlogs'] + assert prof_args['rapidOptions']['driverlog'] == f'{get_ut_data_dir}/{prop_path}' + def test_arg_cases_coverage(self): """ This test ensures that above tests have covered all possible states of the `platform`, `cluster`,