diff --git a/.github/workflows/python-package-conda.yml b/.github/workflows/python-package-conda.yml index 4bb5b8dc..f575b987 100644 --- a/.github/workflows/python-package-conda.yml +++ b/.github/workflows/python-package-conda.yml @@ -81,6 +81,8 @@ jobs: mpirun -np 2 pytest -k test_gen_data[jpeg-tensorflow] -v mpirun -np 2 pytest -k test_gen_data[tfrecord-tensorflow] -v mpirun -np 2 pytest -k test_gen_data[hdf5-tensorflow] -v + mpirun -np 2 pytest -k test_gen_data[indexed_binary-tensorflow] -v + mpirun -np 2 pytest -k test_gen_data[mmap_indexed_binary-tensorflow] -v - name: test_custom_storage_root_gen_data run: | source ${VENV}/bin/activate @@ -89,6 +91,8 @@ jobs: mpirun -np 2 pytest -k test_storage_root_gen_data[jpeg-tensorflow] -v mpirun -np 2 pytest -k test_storage_root_gen_data[tfrecord-tensorflow] -v mpirun -np 2 pytest -k test_storage_root_gen_data[hdf5-tensorflow] -v + mpirun -np 2 pytest -k test_storage_root_gen_data[indexed_binary-tensorflow] -v + mpirun -np 2 pytest -k test_storage_root_gen_data[mmap_indexed_binary-tensorflow] -v - name: test_train run: | source ${VENV}/bin/activate @@ -113,6 +117,14 @@ jobs: mpirun -np 2 pytest -k test_train[jpeg-pytorch-dali] -v mpirun -np 2 pytest -k test_train[hdf5-pytorch-dali] -v mpirun -np 2 pytest -k test_train[csv-pytorch-dali] -v + mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-tensorflow] -v + mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-pytorch] -v + mpirun -np 2 pytest -k test_train[indexed_binary-tensorflow-dali] -v + mpirun -np 2 pytest -k test_train[indexed_binary-pytorch-dali] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-tensorflow] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-pytorch] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-tensorflow-dali] -v + mpirun -np 2 pytest -k test_train[mmap_indexed_binary-pytorch-dali] -v - name: test_custom_storage_root_train run: | source ${VENV}/bin/activate @@ -127,10 +139,19 @@ jobs: mpirun -np 2 pytest -k test_custom_storage_root_train[jpeg-pytorch] -v mpirun -np 2 pytest -k test_custom_storage_root_train[hdf5-pytorch] -v mpirun -np 2 pytest -k test_custom_storage_root_train[csv-pytorch] -v + mpirun -np 2 pytest -k test_custom_storage_root_train[indexed_binary-tensorflow] -v + mpirun -np 2 pytest -k test_custom_storage_root_train[indexed_binary-pytorch] -v + mpirun -np 2 pytest -k test_custom_storage_root_train[mmap_indexed_binary-tensorflow] -v + mpirun -np 2 pytest -k test_custom_storage_root_train[mmap_indexed_binary-pytorch] -v - name: test_checkpoint_epoch run: | source ${VENV}/bin/activate - mpirun -np 2 pytest -k test_checkpoint_epoch -v + mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers0-2-layer_params0-all_ranks] -v + mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers1-2-layer_params1-all_ranks] -v + mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers2-2-layer_params2-rank_zero] -v + mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers3-2-layer_params3-rank_zero] -v + mpirun -np 2 pytest -k test_checkpoint_epoch[tensorflow-1024-optimizers4-1-layer_params4-all_ranks] -v + mpirun -np 2 pytest -k test_checkpoint_epoch[pytorch-1024-optimizers5-1-layer_params5-all_ranks] -v - name: test_checkpoint_step run: | source ${VENV}/bin/activate diff --git a/dlio_benchmark/common/enumerations.py b/dlio_benchmark/common/enumerations.py index 80fe4370..91e911f8 100644 --- a/dlio_benchmark/common/enumerations.py +++ b/dlio_benchmark/common/enumerations.py @@ -17,6 +17,16 @@ from enum import Enum +class CheckpointLocationType(Enum): + """ + Different types of underlying storage + """ + RANK_ZERO = 'rank_zero' + ALL_RANKS = 'all_ranks' + + def __str__(self): + return self.value + class StorageType(Enum): """ Different types of underlying storage @@ -97,6 +107,8 @@ class FormatType(Enum): HDF5_OPT = 'hdf5_opt' JPEG = 'jpeg' PNG = 'png' + INDEXED_BINARY = 'indexed_binary' + MMAP_INDEXED_BINARY = 'mmap_indexed_binary' def __str__(self): return self.value @@ -119,6 +131,10 @@ def get_enum(value): return FormatType.JPEG elif FormatType.PNG.value == value: return FormatType.PNG + elif FormatType.INDEXED_BINARY.value == value: + return FormatType.INDEXED_BINARY + elif FormatType.MMAP_INDEXED_BINARY.value == value: + return FormatType.MMAP_INDEXED_BINARY class DataLoaderType(Enum): """ diff --git a/dlio_benchmark/configs/workload/dlrm.yaml b/dlio_benchmark/configs/workload/dlrm.yaml new file mode 100644 index 00000000..a172d878 --- /dev/null +++ b/dlio_benchmark/configs/workload/dlrm.yaml @@ -0,0 +1,34 @@ +model: dlrm + +framework: pytorch + +workflow: + generate_data: False + train: True + do_eval: True + +dataset: + data_folder: data/dlrm + format: indexed_binary + num_files_train: 1 + num_files_eval: 1 + num_samples_per_file: 4195198976 + record_length: 327680 + keep_files: True + eval_num_samples_per_file: 91681240 + +reader: + data_loader: pytorch + batch_size: 2048 + batch_size_eval: 16384 + sample_shuffle: random + +train: + epochs: 1 + computation_time: 0.064296 + total_training_steps: 32768 + total_eval_steps: 2048 + +evaluation: + eval_time: 0.0843 + steps_between_evals: 16384 \ No newline at end of file diff --git a/dlio_benchmark/configs/workload/megatron_deepspeed.yaml b/dlio_benchmark/configs/workload/megatron_deepspeed.yaml new file mode 100644 index 00000000..20e4a3aa --- /dev/null +++ b/dlio_benchmark/configs/workload/megatron_deepspeed.yaml @@ -0,0 +1,36 @@ +# 8 node run with 4 GPUs per node and TPSIZE=4 and PPSIZE=8 +model: megatron_deepspeed + +framework: pytorch + +workflow: + generate_data: False + train: True + checkpoint: True + +dataset: + data_folder: dataset/megatron-deepspeed/ + format: mmap_indexed_binary + num_files_train: 1 + num_samples_per_file: 277203535 + record_length: 2048 + +reader: + data_loader: pytorch + batch_size: 1024 + read_threads: 1 + file_shuffle: seed + sample_shuffle: seed + +train: + epochs: 311541 + computation_time: 0.03 # every iteration has 290 steps and each iteration is 8.9 sec. + +checkpoint: + checkpoint_folder: checkpoints/megatron-deepspeed + steps_between_checkpoints: 1000 + model_size: 30102 + type: all_ranks + optimization_groups: [1009254400, 865075200, 793600] + num_layers: 44 + layer_parameters: [129761280, 20971520] diff --git a/dlio_benchmark/data_generator/generator_factory.py b/dlio_benchmark/data_generator/generator_factory.py index e61ead4c..18622bbc 100644 --- a/dlio_benchmark/data_generator/generator_factory.py +++ b/dlio_benchmark/data_generator/generator_factory.py @@ -47,5 +47,8 @@ def get_generator(type): elif type == FormatType.PNG: from dlio_benchmark.data_generator.png_generator import PNGGenerator return PNGGenerator() + elif type == FormatType.INDEXED_BINARY or type == FormatType.MMAP_INDEXED_BINARY: + from dlio_benchmark.data_generator.indexed_binary_generator import IndexedBinaryGenerator + return IndexedBinaryGenerator() else: raise Exception(str(ErrorCodes.EC1001)) \ No newline at end of file diff --git a/dlio_benchmark/data_generator/hdf5_generator.py b/dlio_benchmark/data_generator/hdf5_generator.py index f770fadd..3f04f4e0 100644 --- a/dlio_benchmark/data_generator/hdf5_generator.py +++ b/dlio_benchmark/data_generator/hdf5_generator.py @@ -44,7 +44,7 @@ def generate(self): """ super().generate() np.random.seed(10) - samples_per_iter=max(1, int(32*1024*1024/self._args.record_length)) + samples_per_iter=max(1, int(self._args.generation_buffer_size/self._args.record_length)) record_labels = [0] * self.num_samples for i in dlp.iter(range(self.my_rank, int(self.total_files_to_generate), self.comm_size)): progress(i, self.total_files_to_generate, "Generating HDF5 Data") diff --git a/dlio_benchmark/data_generator/indexed_binary_generator.py b/dlio_benchmark/data_generator/indexed_binary_generator.py new file mode 100644 index 00000000..039ba447 --- /dev/null +++ b/dlio_benchmark/data_generator/indexed_binary_generator.py @@ -0,0 +1,97 @@ +""" + Copyright (c) 2022, UChicago Argonne, LLC + All Rights Reserved + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +from dlio_benchmark.common.enumerations import Compression +from dlio_benchmark.data_generator.data_generator import DataGenerator + +import logging +import numpy as np + +from dlio_benchmark.utils.utility import progress, utcnow +from dlio_profiler.logger import fn_interceptor as Profile +from shutil import copyfile +from dlio_benchmark.common.constants import MODULE_DATA_GENERATOR +import struct + +dlp = Profile(MODULE_DATA_GENERATOR) + +""" +Generator for creating data in NPZ format. +""" +class IndexedBinaryGenerator(DataGenerator): + def __init__(self): + super().__init__() + + def index_file_path_off(self, prefix_path): + return prefix_path + '.off.idx' + + def index_file_path_size(self, prefix_path): + return prefix_path + '.sz.idx' + + @dlp.log + def generate(self): + """ + Generator for creating data in NPZ format of 3d dataset. + """ + super().generate() + np.random.seed(10) + GB=1073741824 + for i in dlp.iter(range(self.my_rank, int(self.total_files_to_generate), self.comm_size)): + dim1, dim2 = self.get_dimension() + sample_size = dim1 * dim2 + total_size = sample_size * self.num_samples + write_size = total_size + memory_size = self._args.generation_buffer_size + if total_size > memory_size: + write_size = memory_size - (memory_size % sample_size) + out_path_spec = self.storage.get_uri(self._file_list[i]) + out_path_spec_off_idx = self.index_file_path_off(out_path_spec) + out_path_spec_sz_idx = self.index_file_path_size(out_path_spec) + progress(i + 1, self.total_files_to_generate, "Generating Indexed Binary Data") + prev_out_spec = out_path_spec + written_bytes = 0 + data_file = open(out_path_spec, "wb") + off_file = open(out_path_spec_off_idx, "wb") + sz_file = open(out_path_spec_sz_idx, "wb") + records = np.random.randint(255, size=write_size, dtype=np.uint8) + while written_bytes < total_size: + data_to_write = write_size if written_bytes + write_size <= total_size else total_size - written_bytes + samples_to_write = data_to_write // sample_size + + # Write data + myfmt = 'B' * data_to_write + binary_data = struct.pack(myfmt, *records[:data_to_write]) + data_file.write(binary_data) + + # Write offsets + myfmt = 'Q' * samples_to_write + offsets = range(0, data_to_write, sample_size) + offsets = offsets[:samples_to_write] + binary_offsets = struct.pack(myfmt, *offsets) + off_file.write(binary_offsets) + + # Write sizes + myfmt = 'Q' * samples_to_write + sample_sizes = [sample_size] * samples_to_write + binary_sizes = struct.pack(myfmt, *sample_sizes) + sz_file.write(binary_sizes) + + written_bytes = written_bytes + data_to_write + data_file.close() + off_file.close() + sz_file.close() + np.random.seed() diff --git a/dlio_benchmark/framework/framework.py b/dlio_benchmark/framework/framework.py index 9e44b9a0..a9d6d3dc 100644 --- a/dlio_benchmark/framework/framework.py +++ b/dlio_benchmark/framework/framework.py @@ -16,6 +16,10 @@ """ from abc import ABC, abstractmethod + +from dlio_benchmark.common.enumerations import DatasetType +from dlio_benchmark.data_loader.data_loader_factory import DataLoaderFactory +from dlio_benchmark.storage.storage_factory import StorageFactory from dlio_benchmark.utils.utility import utcnow from time import sleep @@ -40,11 +44,18 @@ def __init__(self): self.args = ConfigArguments.get_instance() self.output_folder = self.args.output_folder self.checkpoint_folder = self.args.checkpoint_folder - pass + @abstractmethod - def init_loader(self, format_type, epoch_number, data_loader=None): - pass + def init_loader(self, format_type, epoch, data_loader=None): + self.reader_train = DataLoaderFactory.get_loader(data_loader, format_type, + dataset_type=DatasetType.TRAIN, epoch=epoch) + self.reader_valid = DataLoaderFactory.get_loader(data_loader, format_type, + dataset_type=DatasetType.VALID, epoch=epoch) + self.storage = StorageFactory().get_storage(self.args.storage_type, self.args.storage_root, self.args.framework) + checkpoint_storage = StorageFactory().get_storage(self.args.storage_type, self.checkpoint_folder, + self.args.framework) + checkpoint_storage.create_namespace(exist_ok=True) @abstractmethod def get_type(self): diff --git a/dlio_benchmark/framework/tf_framework.py b/dlio_benchmark/framework/tf_framework.py index 3e684153..ed0cbf5b 100644 --- a/dlio_benchmark/framework/tf_framework.py +++ b/dlio_benchmark/framework/tf_framework.py @@ -28,7 +28,8 @@ from dlio_benchmark.reader.reader_factory import ReaderFactory from dlio_benchmark.profiler.profiler_factory import ProfilerFactory from dlio_benchmark.storage.storage_factory import StorageFactory -from dlio_benchmark.common.enumerations import FrameworkType, Profiler, FormatType, DatasetType, MetadataType, DataLoaderType +from dlio_benchmark.common.enumerations import FrameworkType, Profiler, FormatType, DatasetType, MetadataType, \ + DataLoaderType, CheckpointLocationType import tensorflow as tf from tensorflow.python.framework import errors @@ -37,6 +38,7 @@ dlp = Profile(MODULE_AI_FRAMEWORK) + class TFFramework(Framework): __instance = None @@ -51,17 +53,38 @@ def __init__(self, profiling): else: self.tensorboard = ProfilerFactory.get_profiler(Profiler.TENSORBOARD) self.reader_handler = None + self.model_state = None + rank_to_checkpoint = self.args.my_rank + if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO: + rank_to_checkpoint = 0 + if rank_to_checkpoint == self.args.my_rank: + if self.args.model_size > 0: + self.model_state = {"a": self._get_tensor(self.args.model_size)} + self.optimization_state = None + if len(self.args.optimization_groups) > 0: + self.optimization_state = dict() + tensor_array_size = 0 + for index, state in enumerate(self.args.optimization_groups): + if state > 0: + self.optimization_state[str(index)] = {'a': self._get_tensor(state), + 'b': self._get_tensor(state)} + tensor_array_size += state + self.optimization_state["combined"] = self._get_tensor(tensor_array_size) + self.layer_state = None + if len(self.args.layer_parameters) > 0: + self.layer_state = dict() + for index, state in enumerate(self.args.layer_parameters): + if state > 0: + self.layer_state[str(index)] = self._get_tensor(state) + + def _get_tensor(self, size): + return tf.random.uniform((int(size / 4),), maxval=100, dtype=tf.dtypes.int32) @dlp.log def init_loader(self, format_type, epoch=0, data_loader=None): if data_loader is None: - data_loader = DataLoaderType.PYTORCH - self.reader_train = DataLoaderFactory.get_loader(data_loader, format_type, - dataset_type=DatasetType.TRAIN, epoch=epoch) - self.reader_valid = DataLoaderFactory.get_loader(data_loader, format_type, - dataset_type=DatasetType.VALID, epoch=epoch) - self.storage = StorageFactory().get_storage(self.args.storage_type, self.args.storage_root, self.args.framework) - + data_loader = DataLoaderType.TENSORFLOW + super().init_loader(format_type, epoch, data_loader) @dlp.log def get_type(self): return FrameworkType.TENSORFLOW @@ -80,40 +103,46 @@ def start_framework_profiler(self): @dlp.log def stop_framework_profiler(self): - #if self.profiling: + # if self.profiling: # self.tensorboard.stop() pass - + @dlp.log def trace_object(self, string, step, r): - pass #tf.profiler.experimental.Trace(string, step_num=step, _r=r) + pass # tf.profiler.experimental.Trace(string, step_num=step, _r=r) @dlp.log def checkpoint(self, epoch, step_number): """ Performs Checkpointing for a specific step number. It writes different file of different sizes. """ - if DLIOMPI.get_instance().rank() == 0: - if not self.storage.get_node(self.checkpoint_folder): - self.storage.create_node(self.checkpoint_folder) - - model_file = os.path.join(self.checkpoint_folder, f"model-{epoch}-{step_number}.bin") - meta_file = os.path.join(self.checkpoint_folder, f"meta-{epoch}-{step_number}.bin") - index_file = os.path.join(self.checkpoint_folder, f"index-{epoch}-{step_number}.bin") - - string_val = "x" * self.args.model_size - self.storage.put_data(model_file, string_val) - # TODO Should these scale with the model size? - string_val = "x" * (17371) - self.storage.put_data(index_file, string_val) - - string_val = "x" * (24740228) - self.storage.put_data(meta_file, string_val) + my_rank = DLIOMPI.get_instance().rank() + rank_to_checkpoint = my_rank + if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO: + rank_to_checkpoint = 0 + if rank_to_checkpoint == my_rank: + if self.model_state: + fname = os.path.join(self.checkpoint_folder, f"model-{epoch}-{step_number}-{my_rank}.tf") + checkpoint = tf.train.Checkpoint() + checkpoint.mapped = self.model_state + checkpoint.save(fname) + if self.optimization_state: + fname = os.path.join(self.checkpoint_folder, f"optimizer-{epoch}-{step_number}-{my_rank}.tf") + checkpoint = tf.train.Checkpoint() + checkpoint.mapped = self.optimization_state + checkpoint.save(fname) + + if self.layer_state and self.args.num_layers > 0: + for layer in range(self.args.num_layers): + fname = os.path.join(self.checkpoint_folder, f"layer-{layer}-{epoch}-{step_number}-{my_rank}.tf") + checkpoint = tf.train.Checkpoint() + checkpoint.mapped = self.layer_state + checkpoint.save(fname) @dlp.log def compute(self, x, epoch_number, step, computation_time): sleep(computation_time) - #tf.function(self.model)(epoch_number, step, computation_time) + # tf.function(self.model)(epoch_number, step, computation_time) @dlp.log def get_loader(self, dataset_type=DatasetType.TRAIN): diff --git a/dlio_benchmark/framework/torch_framework.py b/dlio_benchmark/framework/torch_framework.py index 84f0b825..84a6ac90 100644 --- a/dlio_benchmark/framework/torch_framework.py +++ b/dlio_benchmark/framework/torch_framework.py @@ -16,7 +16,7 @@ """ from dlio_benchmark.common.error_code import ErrorCodes -from dlio_benchmark.common.enumerations import FormatType, FrameworkType, DatasetType, DataLoaderType +from dlio_benchmark.common.enumerations import FormatType, FrameworkType, DatasetType, DataLoaderType, CheckpointLocationType from dlio_benchmark.data_loader.data_loader_factory import DataLoaderFactory from dlio_benchmark.framework.framework import Framework, DummyTraceObject from dlio_benchmark.common.constants import MODULE_AI_FRAMEWORK @@ -61,16 +61,37 @@ def __init__(self, profiling): super().__init__() self.profiling = profiling self.reader_handler = None + rank_to_checkpoint = self.args.my_rank + if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO: + rank_to_checkpoint = 0 + if rank_to_checkpoint == self.args.my_rank: + self.model_state = None + if self.args.model_size > 0: + self.model_state = {"a": self._get_tensor(self.args.model_size)} + self.optimization_state = None + if len(self.args.optimization_groups) > 0: + self.optimization_state = dict() + tensor_array_size = 0 + for index, state in enumerate(self.args.optimization_groups): + if state > 0: + self.optimization_state[str(index)] = {'a': self._get_tensor(state), 'b': self._get_tensor(state)} + tensor_array_size += state + self.optimization_state["combined"] = self._get_tensor(tensor_array_size) + self.layer_state = None + if len(self.args.layer_parameters) > 0: + self.layer_state = dict() + for index, state in enumerate(self.args.layer_parameters): + if state > 0: + self.layer_state[str(index)] = self._get_tensor(state) + + def _get_tensor(self, size): + return torch.randint(high=1, size=(size,), dtype=torch.int8) @dlp.log def init_loader(self, format_type, epoch=0, data_loader=None): if data_loader is None: data_loader = DataLoaderType.PYTORCH - self.reader_train = DataLoaderFactory.get_loader(data_loader, format_type, - dataset_type=DatasetType.TRAIN, epoch=epoch) - self.reader_valid = DataLoaderFactory.get_loader(data_loader, format_type, - dataset_type=DatasetType.VALID, epoch=epoch) - self.storage = StorageFactory().get_storage(self.args.storage_type, self.args.storage_root, self.args.framework) + super().init_loader(format_type, epoch, data_loader) @dlp.log def get_type(self): @@ -97,17 +118,26 @@ def trace_object(self, string, step, r): @dlp.log def checkpoint(self, epoch, step_number): - if DLIOMPI.get_instance().rank() == 0: - """ - Performs Checkpointing for a specific step number. It writes different file of different sizes. - """ - if not self.storage.get_node(self.checkpoint_folder): - self.storage.create_node(self.checkpoint_folder) - model_file = os.path.join(self.checkpoint_folder, f"model-{epoch}-{step_number}.bin") - - string_val = "x" * self.args.model_size - self.storage.put_data(model_file, string_val) + rank_to_checkpoint = DLIOMPI.get_instance().rank() + if self.args.checkpoint_type == CheckpointLocationType.RANK_ZERO: + rank_to_checkpoint = 0 + if rank_to_checkpoint == DLIOMPI.get_instance().rank(): + my_rank = DLIOMPI.get_instance().rank() + if self.model_state: + fname = os.path.join(self.checkpoint_folder, f"model-{epoch}-{step_number}-{my_rank}.pt") + with open(fname, "wb") as f: + torch.save(self.model_state, f) + if self.optimization_state: + fname = os.path.join(self.checkpoint_folder, f"optimizer-{epoch}-{step_number}-{my_rank}.pt") + with open(fname, "wb") as f: + torch.save(self.optimization_state, f) + + if self.layer_state and self.args.num_layers > 0: + for layer in range(self.args.num_layers): + fname = os.path.join(self.checkpoint_folder, f"layer-{layer}-{epoch}-{step_number}-{my_rank}.pt") + with open(fname, "wb") as f: + torch.save(self.layer_state, f) @dlp.log def compute(self, x, epoch_number, step, computation_time): diff --git a/dlio_benchmark/main.py b/dlio_benchmark/main.py index d0d4dcb1..721c77dc 100644 --- a/dlio_benchmark/main.py +++ b/dlio_benchmark/main.py @@ -80,7 +80,10 @@ def __init__(self, cfg): self.my_rank = self.args.my_rank = DLIOMPI.get_instance().rank() self.comm_size = self.args.comm_size = DLIOMPI.get_instance().size() self.dlp_logger = PerfTrace.initialize_log(logfile=dlp_trace, - data_dir=f"{os.path.abspath(self.args.data_folder)}:{self.args.data_folder}:./{self.args.data_folder}", + data_dir=f"{os.path.abspath(self.args.data_folder)}:" + f"{self.args.data_folder}:./{self.args.data_folder}:" + f"{self.args.checkpoint_folder}:./{self.args.checkpoint_folder}:" + f"{os.path.abspath(self.args.checkpoint_folder)}", process_id=self.my_rank) with Profile(name=f"{self.__init__.__qualname__}", cat=MODULE_DLIO_BENCHMARK): self.data_folder = self.args.data_folder @@ -195,8 +198,9 @@ def initialize(self): else: assert (num_subfolders == 0) fullpaths = [self.storage.get_uri(os.path.join(self.args.data_folder, f"{dataset_type}", entry)) - for entry in filenames if entry.find(f'{self.args.format}') != -1] + for entry in filenames if entry.endswith(f'{self.args.format}')] fullpaths = sorted(fullpaths) + logging.debug(f"subfolder {num_subfolders} fullpaths {fullpaths}") if dataset_type is DatasetType.TRAIN: file_list_train = fullpaths elif dataset_type is DatasetType.VALID: diff --git a/dlio_benchmark/reader/indexed_binary_mmap_reader.py b/dlio_benchmark/reader/indexed_binary_mmap_reader.py new file mode 100644 index 00000000..bd3fc97b --- /dev/null +++ b/dlio_benchmark/reader/indexed_binary_mmap_reader.py @@ -0,0 +1,112 @@ +""" + Copyright (c) 2022, UChicago Argonne, LLC + All Rights Reserved + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" +import logging + +import numpy as np +import struct + +from dlio_benchmark.common.constants import MODULE_DATA_READER +from dlio_benchmark.common.enumerations import DataLoaderSampler +from dlio_benchmark.reader.reader_handler import FormatReader +from dlio_profiler.logger import fn_interceptor as Profile + +dlp = Profile(MODULE_DATA_READER) + + +class IndexedBinaryMMapReader(FormatReader): + """ + Reader for Indexed Binary Memory mapped files + """ + + @dlp.log_init + def __init__(self, dataset_type, thread_index, epoch): + super().__init__(dataset_type, thread_index) + self.file_map = {} + self.load_index() + self.buffer_map = {} + + def index_file_path_off(self, prefix_path): + return prefix_path + '.off.idx' + + def index_file_path_size(self, prefix_path): + return prefix_path + '.sz.idx' + + def read_longs(self, f, n): + a = np.empty(n, dtype=np.int64) + f.readinto(a) + return a + + def load_index_file(self, global_sample_idx, filename, sample_index): + if filename not in self.file_map: + offset_file = self.index_file_path_off(filename) + sz_file = self.index_file_path_size(filename) + self.file_map[filename] = [] + with open(offset_file, 'rb') as f: + offsets = self.read_longs(f, self._args.num_samples_per_file) + logging.debug(f"read offsets {offsets} from file {offset_file}") + self.file_map[filename].append(offsets) + with open(sz_file, 'rb') as f: + sizes = self.read_longs(f, self._args.num_samples_per_file) + logging.debug(f"read sizes {sizes} from file {sz_file}") + self.file_map[filename].append(sizes) + + @dlp.log + def load_index(self): + if self._args.data_loader_sampler == DataLoaderSampler.ITERATIVE: + for global_sample_idx, filename, sample_index in self._args.file_map[self.thread_index]: + self.load_index_file(global_sample_idx, filename, sample_index) + elif self._args.data_loader_sampler == DataLoaderSampler.INDEX: + for global_sample_idx, (filename, sample_index) in self._args.global_index_map.items(): + self.load_index_file(global_sample_idx, filename, sample_index) + + + + + @dlp.log + def open(self, filename): + super().open(filename) + bin_buffer_mmap = np.memmap(filename, mode='r', order='C') + bin_buffer = memoryview(bin_buffer_mmap) + self.buffer_map[filename] = np.frombuffer(bin_buffer, dtype=np.uint8) + return bin_buffer_mmap + + @dlp.log + def close(self, filename): + super().close(filename) + self.open_file_map[filename]._mmap.close() + + @dlp.log + def get_sample(self, filename, sample_index): + super().get_sample(filename, sample_index) + buffer = self.buffer_map[filename] + offset = self.file_map[filename][0][sample_index] + size = self.file_map[filename][1][sample_index] + logging.debug(f"reading sample from offset {offset} of size {size} from file {filename}") + image = buffer[offset:offset+size] + dlp.update(image_size=size) + + def next(self): + for batch in super().next(): + yield batch + + @dlp.log + def read_index(self, image_idx, step): + return super().read_index(image_idx, step) + + @dlp.log + def finalize(self): + return super().finalize() diff --git a/dlio_benchmark/reader/indexed_binary_reader.py b/dlio_benchmark/reader/indexed_binary_reader.py new file mode 100644 index 00000000..71d717c1 --- /dev/null +++ b/dlio_benchmark/reader/indexed_binary_reader.py @@ -0,0 +1,109 @@ +""" + Copyright (c) 2022, UChicago Argonne, LLC + All Rights Reserved + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" +import logging + +import numpy as np +import struct + +from dlio_benchmark.common.constants import MODULE_DATA_READER +from dlio_benchmark.common.enumerations import DataLoaderSampler +from dlio_benchmark.reader.reader_handler import FormatReader +from dlio_profiler.logger import fn_interceptor as Profile + +dlp = Profile(MODULE_DATA_READER) + + +class IndexedBinaryReader(FormatReader): + """ + Reader for Indexed Binary files + """ + + @dlp.log_init + def __init__(self, dataset_type, thread_index, epoch): + super().__init__(dataset_type, thread_index) + self.file_map = {} + self.load_index() + + def index_file_path_off(self, prefix_path): + return prefix_path + '.off.idx' + + def index_file_path_size(self, prefix_path): + return prefix_path + '.sz.idx' + + def read_longs(self, f, n): + a = np.empty(n, dtype=np.int64) + f.readinto(a) + return a + + def load_index_file(self, global_sample_idx, filename, sample_index): + if filename not in self.file_map: + offset_file = self.index_file_path_off(filename) + sz_file = self.index_file_path_size(filename) + self.file_map[filename] = [] + with open(offset_file, 'rb') as f: + offsets = self.read_longs(f, self._args.num_samples_per_file) + logging.debug(f"read offsets {offsets} from file {offset_file}") + self.file_map[filename].append(offsets) + with open(sz_file, 'rb') as f: + sizes = self.read_longs(f, self._args.num_samples_per_file) + logging.debug(f"read sizes {sizes} from file {sz_file}") + self.file_map[filename].append(sizes) + @dlp.log + def load_index(self): + if self._args.data_loader_sampler == DataLoaderSampler.ITERATIVE: + for global_sample_idx, filename, sample_index in self._args.file_map[self.thread_index]: + self.load_index_file(global_sample_idx, filename, sample_index) + elif self._args.data_loader_sampler == DataLoaderSampler.INDEX: + for global_sample_idx, (filename, sample_index) in self._args.global_index_map.items(): + self.load_index_file(global_sample_idx, filename, sample_index) + + + + + @dlp.log + def open(self, filename): + super().open(filename) + return open(filename, "rb") + + @dlp.log + def close(self, filename): + super().close(filename) + self.open_file_map[filename].close() + + @dlp.log + def get_sample(self, filename, sample_index): + super().get_sample(filename, sample_index) + file = self.open_file_map[filename] + offset = self.file_map[filename][0][sample_index] + size = self.file_map[filename][1][sample_index] + logging.debug(f"reading sample from offset {offset} of size {size} from file {filename}") + file.seek(offset) + image = np.empty(size, dtype=np.uint8) + file.readinto(image) + dlp.update(image_size=size) + + def next(self): + for batch in super().next(): + yield batch + + @dlp.log + def read_index(self, image_idx, step): + return super().read_index(image_idx, step) + + @dlp.log + def finalize(self): + return super().finalize() diff --git a/dlio_benchmark/reader/reader_factory.py b/dlio_benchmark/reader/reader_factory.py index e84db142..128b482c 100644 --- a/dlio_benchmark/reader/reader_factory.py +++ b/dlio_benchmark/reader/reader_factory.py @@ -69,6 +69,12 @@ def get_reader(type, dataset_type, thread_index, epoch_number): return DaliTFRecordReader(dataset_type, thread_index, epoch_number) else: from dlio_benchmark.reader.tf_reader import TFReader - return TFReader(dataset_type, thread_index, epoch_number) + return TFReader(dataset_type, thread_index, epoch_number) + elif type == FormatType.INDEXED_BINARY: + from dlio_benchmark.reader.indexed_binary_reader import IndexedBinaryReader + return IndexedBinaryReader(dataset_type, thread_index, epoch_number) + elif type == FormatType.MMAP_INDEXED_BINARY: + from dlio_benchmark.reader.indexed_binary_mmap_reader import IndexedBinaryMMapReader + return IndexedBinaryMMapReader(dataset_type, thread_index, epoch_number) else: raise Exception("Loading data of %s format is not supported without framework data loader" %type) \ No newline at end of file diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index 86e1bb76..9aeeea0b 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -27,7 +27,7 @@ from dlio_benchmark.common.constants import MODULE_CONFIG from dlio_benchmark.common.enumerations import StorageType, FormatType, Shuffle, ReadType, FileAccess, Compression, \ FrameworkType, \ - DataLoaderType, Profiler, DatasetType, DataLoaderSampler + DataLoaderType, Profiler, DatasetType, DataLoaderSampler, CheckpointLocationType from dlio_benchmark.utils.utility import DLIOMPI from dataclasses import dataclass import math @@ -96,11 +96,16 @@ class ConfigArguments: do_eval: bool = False batch_size_eval: int = 1 num_files_eval: int = 0 + generation_buffer_size: int = 2 * 1073741824 # 2 GB eval_time: float = 0.0 eval_time_stdev: float = 0.0 eval_after_epoch: int = 1 epochs_between_evals: int = 1 + checkpoint_type: CheckpointLocationType = CheckpointLocationType.RANK_ZERO model_size: int = 10240 + optimization_groups: ClassVar[List[int]] = [] + num_layers: int = 1 + layer_parameters: ClassVar[List[int]] = [17371, 24740228] data_loader: DataLoaderType = DataLoaderType.TENSORFLOW.value num_subfolders_train: int = 0 num_subfolders_eval: int = 0 @@ -344,6 +349,8 @@ def LoadConfig(args, config): args.num_files_train = config['dataset']['num_files_train'] if 'num_files_eval' in config['dataset']: args.num_files_eval = config['dataset']['num_files_eval'] + if 'generation_buffer_size' in config['dataset']: + args.generation_buffer_size = config['dataset']['generation_buffer_size'] if 'num_samples_per_file' in config['dataset']: args.num_samples_per_file = config['dataset']['num_samples_per_file'] if 'data_folder' in config['dataset']: @@ -449,8 +456,16 @@ def LoadConfig(args, config): args.epochs_between_checkpoints = config['checkpoint']['epochs_between_checkpoints'] if 'steps_between_checkpoints' in config['checkpoint']: args.steps_between_checkpoints = config['checkpoint']['steps_between_checkpoints'] + if 'type' in config['checkpoint']: + args.checkpoint_type = CheckpointLocationType(config['checkpoint']['type']) if 'model_size' in config['checkpoint']: args.model_size = config['checkpoint']['model_size'] + if 'optimization_groups' in config['checkpoint']: + args.optimization_groups = config['checkpoint']['optimization_groups'] + if 'num_layers' in config['checkpoint']: + args.num_layers = config['checkpoint']['num_layers'] + if 'layer_parameters' in config['checkpoint']: + args.layer_parameters = config['checkpoint']['layer_parameters'] if 'output' in config: if 'folder' in config['output']: args.output_folder = config['output']['folder'] diff --git a/docs/source/config.rst b/docs/source/config.rst index 9d245ca2..ed9f0a13 100644 --- a/docs/source/config.rst +++ b/docs/source/config.rst @@ -300,7 +300,20 @@ checkpoint - performing one checkpointing per certain number of steps specified * - model_size - 10240 - - the size of the model in bytes + - the size of the model parameters per GPU in bytes + * - optimization_groups + - [] + - List of optimization group tensors. Use Array notation for yaml. + * - num_layers + - 1 + - Number of layers to checkpoint. Each layer would be checkpointed separately. + * - layer_parameters + - [] + - List of parameters per layer. This is used to perform I/O per layer. + * - type + - rank_zero + - Which rank performs this checkpoint. All ranks (all_ranks) or Rank 0 (rank_zero). + .. note:: diff --git a/tests/dlio_benchmark_test.py b/tests/dlio_benchmark_test.py index bcc48213..105864c7 100644 --- a/tests/dlio_benchmark_test.py +++ b/tests/dlio_benchmark_test.py @@ -75,7 +75,7 @@ def run_benchmark(cfg, storage_root="./", verify=True): @pytest.mark.timeout(60, method="thread") @pytest.mark.parametrize("fmt, framework", [("png", "tensorflow"), ("npz", "tensorflow"), ("jpeg", "tensorflow"), ("tfrecord", "tensorflow"), - ("hdf5", "tensorflow")]) + ("hdf5", "tensorflow"), ("indexed_binary", "tensorflow"), ("mmap_indexed_binary", "tensorflow")]) def test_gen_data(fmt, framework) -> None: if (comm.rank == 0): logging.info("") @@ -129,7 +129,8 @@ def test_subset() -> None: @pytest.mark.timeout(60, method="thread") @pytest.mark.parametrize("fmt, framework", [("png", "tensorflow"), ("npz", "tensorflow"), ("jpeg", "tensorflow"), ("tfrecord", "tensorflow"), - ("hdf5", "tensorflow")]) + ("hdf5", "tensorflow"), ("indexed_binary", "tensorflow"), + ("mmap_indexed_binary", "tensorflow")]) def test_storage_root_gen_data(fmt, framework) -> None: storage_root = "runs" @@ -207,31 +208,59 @@ def test_iostat_profiling() -> None: subprocess.run(cmd, capture_output=True, timeout=10) clean() - @pytest.mark.timeout(60, method="thread") -def test_checkpoint_epoch() -> None: +@pytest.mark.parametrize("framework, model_size, optimizers, num_layers, layer_params, type", [("tensorflow", 1024, [1024, 128], 2, [16], "all_ranks"), + ("pytorch", 1024, [1024, 128], 2, [16], "all_ranks"), + ("tensorflow", 1024, [1024, 128], 2, [16], "rank_zero"), + ("pytorch", 1024, [1024, 128], 2, [16], "rank_zero"), + ("tensorflow", 1024, [128], 1, [], "all_ranks"), + ("pytorch", 1024, [128], 1, [], "all_ranks")]) +def test_checkpoint_epoch(framework, model_size, optimizers, num_layers, layer_params, type) -> None: clean() - if (comm.rank == 0): + if comm.rank == 0: logging.info("") logging.info("=" * 80) logging.info(f" DLIO test for checkpointing at the end of epochs") logging.info("=" * 80) with initialize_config_dir(version_base=None, config_dir=config_dir): cfg = compose(config_name='config', - overrides=['++workload.workflow.train=True', \ - '++workload.workflow.generate_data=True', \ - '++workload.train.computation_time=0.01', \ - '++workload.evaluation.eval_time=0.005', \ - '++workload.train.epochs=8', '++workload.workflow.checkpoint=True', \ - '++workload.checkpoint.epochs_between_checkpoints=2']) + overrides=[f'++workload.framework={framework}', + f'++workload.reader.data_loader={framework}', + '++workload.workflow.train=True', + '++workload.workflow.generate_data=True', + '++workload.train.computation_time=0.01', + '++workload.evaluation.eval_time=0.005', + '++workload.train.epochs=8', '++workload.workflow.checkpoint=True', + '++workload.checkpoint.epochs_between_checkpoints=2', + f'++workload.checkpoint.type={type}', + f'++workload.checkpoint.model_size={model_size}', + f'++workload.checkpoint.optimization_groups={optimizers}', + f'++workload.checkpoint.num_layers={num_layers}', + f'++workload.checkpoint.layer_parameters={layer_params}']) comm.Barrier() if comm.rank == 0: shutil.rmtree("./checkpoints", ignore_errors=True) + os.makedirs("./checkpoints", exist_ok=True) comm.Barrier() benchmark = run_benchmark(cfg) output = pathlib.Path("./checkpoints") - load_bin = list(output.glob("*.bin")) - assert (len(load_bin) == 4) + load_bin = list(output.glob("*")) + n = 0 + if len(layer_params) > 0: + n = num_layers + nranks = 1 + if type == "all_ranks": + nranks = comm.size + if framework == "tensorflow": + num_check_files = 8 / 2 * (2 + 2 + 2*n) * nranks + 1 + assert (len(load_bin) == num_check_files), f"files produced are {len(load_bin)} {num_check_files} {load_bin} " + if framework == "pytorch": + num_check_files = 8 / 2 * (1 + 1 + n) * nranks + assert (len(load_bin) == num_check_files), f"files produced are {len(load_bin)} {num_check_files} {load_bin}" + comm.Barrier() + if comm.rank == 0: + shutil.rmtree("./checkpoints", ignore_errors=True) + comm.Barrier() clean() @@ -254,14 +283,15 @@ def test_checkpoint_step() -> None: comm.Barrier() if comm.rank == 0: shutil.rmtree("./checkpoints", ignore_errors=True) + os.makedirs("./checkpoints", exist_ok=True) comm.Barrier() benchmark = run_benchmark(cfg) dataset = cfg['workload']['dataset'] nstep = dataset.num_files_train * dataset.num_samples_per_file // cfg['workload'][ 'reader'].batch_size // benchmark.comm_size - ncheckpoints = nstep // 2 * 8 + ncheckpoints = nstep // 2 * 8 * 2 output = pathlib.Path("./checkpoints") - load_bin = list(output.glob("*.bin")) + load_bin = list(output.glob("*")) assert (len(load_bin) == ncheckpoints) clean() @@ -343,14 +373,19 @@ def test_pytorch_multiprocessing_context(nt, context) -> None: @pytest.mark.parametrize("fmt, framework, dataloader", [("png", "tensorflow","tensorflow"), ("npz", "tensorflow","tensorflow"), ("jpeg", "tensorflow","tensorflow"), ("tfrecord", "tensorflow","tensorflow"), ("hdf5", "tensorflow","tensorflow"), ("csv", "tensorflow","tensorflow"), + ("indexed_binary", "tensorflow","tensorflow"), ("mmap_indexed_binary", "tensorflow","tensorflow"), ("png", "pytorch", "pytorch"), ("npz", "pytorch", "pytorch"), - ("jpeg", "pytorch", "pytorch"), ("hdf5", "pytorch", "pytorch"), ("csv", "pytorch", "pytorch"), + ("jpeg", "pytorch", "pytorch"), ("hdf5", "pytorch", "pytorch"), + ("csv", "pytorch", "pytorch"), ("indexed_binary", "pytorch", "pytorch"), + ("mmap_indexed_binary", "pytorch", "pytorch"), ("png", "tensorflow", "dali"), ("npz", "tensorflow", "dali"), - ("jpeg", "tensorflow", "dali"), - ("hdf5", "tensorflow", "dali"), ("csv", "tensorflow", "dali"), + ("jpeg", "tensorflow", "dali"), ("hdf5", "tensorflow", "dali"), + ("csv", "tensorflow", "dali"), ("indexed_binary", "tensorflow", "dali"), + ("mmap_indexed_binary", "tensorflow", "dali"), ("png", "pytorch", "dali"), ("npz", "pytorch", "dali"), ("jpeg", "pytorch", "dali"), ("hdf5", "pytorch", "dali"), - ("csv", "pytorch", "dali"), + ("csv", "pytorch", "dali"), ("indexed_binary", "pytorch", "dali"), + ("mmap_indexed_binary", "pytorch", "dali"), ]) def test_train(fmt, framework, dataloader) -> None: clean() @@ -379,9 +414,11 @@ def test_train(fmt, framework, dataloader) -> None: @pytest.mark.parametrize("fmt, framework", [("png", "tensorflow"), ("npz", "tensorflow"), ("jpeg", "tensorflow"), ("tfrecord", "tensorflow"), ("hdf5", "tensorflow"), ("csv", "tensorflow"), + ("indexed_binary", "tensorflow"), ("mmap_indexed_binary", "tensorflow"), ("png", "pytorch"), ("npz", "pytorch"), ("jpeg", "pytorch"), ("hdf5", "pytorch"), - ("csv", "pytorch") + ("csv", "pytorch"), ("indexed_binary", "pytorch"), + ("mmap_indexed_binary", "pytorch"), ]) def test_custom_storage_root_train(fmt, framework) -> None: storage_root = "root_dir"