diff --git a/src/cloudai/__init__.py b/src/cloudai/__init__.py index 3076cdbe..320b3996 100644 --- a/src/cloudai/__init__.py +++ b/src/cloudai/__init__.py @@ -49,6 +49,7 @@ from .schema.test_template.jax_toolbox.slurm_install_strategy import JaxToolboxSlurmInstallStrategy from .schema.test_template.jax_toolbox.template import JaxToolbox from .schema.test_template.nccl_test.grading_strategy import NcclTestGradingStrategy +from .schema.test_template.nccl_test.job_status_retrieval_strategy import NcclTestJobStatusRetrievalStrategy from .schema.test_template.nccl_test.report_generation_strategy import NcclTestReportGenerationStrategy from .schema.test_template.nccl_test.slurm_command_gen_strategy import NcclTestSlurmCommandGenStrategy from .schema.test_template.nccl_test.slurm_install_strategy import NcclTestSlurmInstallStrategy @@ -105,10 +106,11 @@ ) Registry().add_strategy(JobIdRetrievalStrategy, [StandaloneSystem], [Sleep], StandaloneJobIdRetrievalStrategy) Registry().add_strategy(JobStatusRetrievalStrategy, [StandaloneSystem], [Sleep], DefaultJobStatusRetrievalStrategy) +Registry().add_strategy(JobStatusRetrievalStrategy, [SlurmSystem], [NcclTest], NcclTestJobStatusRetrievalStrategy) Registry().add_strategy( JobStatusRetrievalStrategy, [SlurmSystem], - [ChakraReplay, JaxToolbox, NcclTest, UCCTest, NeMoLauncher], + [ChakraReplay, JaxToolbox, UCCTest, NeMoLauncher], DefaultJobStatusRetrievalStrategy, ) Registry().add_strategy(CommandGenStrategy, [SlurmSystem], [UCCTest], UCCTestSlurmCommandGenStrategy) diff --git a/src/cloudai/schema/test_template/nccl_test/job_status_retrieval_strategy.py b/src/cloudai/schema/test_template/nccl_test/job_status_retrieval_strategy.py new file mode 100644 index 00000000..5c335a04 --- /dev/null +++ b/src/cloudai/schema/test_template/nccl_test/job_status_retrieval_strategy.py @@ -0,0 +1,60 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from cloudai._core.job_status_result import JobStatusResult +from cloudai._core.job_status_retrieval_strategy import JobStatusRetrievalStrategy + + +class NcclTestJobStatusRetrievalStrategy(JobStatusRetrievalStrategy): + """Strategy to retrieve job status for NCCL tests by checking the contents of 'stdout.txt'.""" + + def get_job_status(self, output_path: str) -> JobStatusResult: + """ + Determine the job status by examining 'stdout.txt' in the output directory. + + Args: + output_path (str): Path to the directory containing 'stdout.txt'. + + Returns: + JobStatusResult: The result containing the job status and an optional error message. + """ + stdout_path = os.path.join(output_path, "stdout.txt") + if os.path.isfile(stdout_path): + with open(stdout_path, "r") as file: + content = file.read() + if "# Out of bounds values" in content and "# Avg bus bandwidth" in content: + return JobStatusResult(is_successful=True) + missing_indicators = [] + if "# Out of bounds values" not in content: + missing_indicators.append("'# Out of bounds values'") + if "# Avg bus bandwidth" not in content: + missing_indicators.append("'# Avg bus bandwidth'") + error_message = ( + f"Missing success indicators in {stdout_path}: {', '.join(missing_indicators)}. " + "These keywords are expected to be present in stdout.txt, usually towards the end of the file. " + f"Please ensure the NCCL test ran to completion. You can run the generated sbatch script manually " + f"and check if {stdout_path} is created and contains the expected keywords." + ) + return JobStatusResult(is_successful=False, error_message=error_message) + return JobStatusResult( + is_successful=False, + error_message=( + f"stdout.txt file not found in the specified output directory {output_path}. " + "This file is expected to be created as a result of the NCCL test run. " + "Please ensure the NCCL test was executed properly and that stdout.txt is generated. " + f"You can run the generated NCCL test command manually and verify the creation of {stdout_path}." + ), + ) diff --git a/tests/test_job_status_retrieval_strategy.py b/tests/test_job_status_retrieval_strategy.py new file mode 100644 index 00000000..9d720b82 --- /dev/null +++ b/tests/test_job_status_retrieval_strategy.py @@ -0,0 +1,56 @@ +from pathlib import Path + +from cloudai.schema.test_template.nccl_test.job_status_retrieval_strategy import NcclTestJobStatusRetrievalStrategy + + +class TestNcclTestJobStatusRetrievalStrategy: + """Tests for the NcclTestJobStatusRetrievalStrategy class.""" + + def setup_method(self) -> None: + """Setup method for initializing NcclTestJobStatusRetrievalStrategy.""" + self.js = NcclTestJobStatusRetrievalStrategy() + + def test_no_stdout_file(self, tmp_path: Path) -> None: + """Test that job status is False when no stdout.txt file is present.""" + result = self.js.get_job_status(str(tmp_path)) + assert not result.is_successful + assert result.error_message == ( + f"stdout.txt file not found in the specified output directory {tmp_path}. " + "This file is expected to be created as a result of the NCCL test run. " + "Please ensure the NCCL test was executed properly and that stdout.txt is generated. " + f"You can run the generated NCCL test command manually and verify the creation of " + f"{tmp_path / 'stdout.txt'}." + ) + + def test_successful_job(self, tmp_path: Path) -> None: + """Test that job status is True when stdout.txt contains success indicators.""" + stdout_file = tmp_path / "stdout.txt" + stdout_content = """ + # Some initialization output + # More output + # Out of bounds values : 0 OK + # Avg bus bandwidth : 100.00 + # Some final output + """ + stdout_file.write_text(stdout_content) + result = self.js.get_job_status(str(tmp_path)) + assert result.is_successful + assert result.error_message == "" + + def test_failed_job(self, tmp_path: Path) -> None: + """Test that job status is False when stdout.txt does not contain success indicators.""" + stdout_file = tmp_path / "stdout.txt" + stdout_content = """ + # Some initialization output + # More output + # Some final output without success indicators + """ + stdout_file.write_text(stdout_content) + result = self.js.get_job_status(str(tmp_path)) + assert not result.is_successful + assert result.error_message == ( + f"Missing success indicators in {stdout_file}: '# Out of bounds values', '# Avg bus bandwidth'. " + "These keywords are expected to be present in stdout.txt, usually towards the end of the file. " + f"Please ensure the NCCL test ran to completion. You can run the generated sbatch script manually " + f"and check if {stdout_file} is created and contains the expected keywords." + )