From 0fa99ec2bc7e38a54eafaa7239a05e2318a5f9d3 Mon Sep 17 00:00:00 2001 From: greycooker <526929599@qq.com> Date: Tue, 30 Jul 2024 16:05:45 +0000 Subject: [PATCH 1/5] support sft mapdataset --- paddlenlp/data/indexed_dataset.py | 265 ++++++++++++++++++++++++++++++ 1 file changed, 265 insertions(+) diff --git a/paddlenlp/data/indexed_dataset.py b/paddlenlp/data/indexed_dataset.py index 79ff36779869..fa2d7222f33e 100644 --- a/paddlenlp/data/indexed_dataset.py +++ b/paddlenlp/data/indexed_dataset.py @@ -29,6 +29,7 @@ import shutil import struct import time +from dataclasses import fields from functools import lru_cache from itertools import accumulate @@ -68,6 +69,20 @@ def make_dataset(path, impl, skip_warmup=False): return None +def make_sft_dataset(path, impl, dataclass, skip_warmup=False): + if impl == "mmap" and SFT_MMapIndexedDataset.exists(path, dataclass): + print_rank_0(" > building dataset index ...") + start_time = time.time() + sft_indexed_dataset = SFT_MMapIndexedDataset(path, dataclass, skip_warmup) + print_rank_0(" > finished creating SFT indexed dataset in {:4f} " "seconds".format(time.time() - start_time)) + print_rank_0(" number of samples: {}".format(len(sft_indexed_dataset.doc_idx) - 1)) + + return sft_indexed_dataset + + print(f"Unknown dataset implementation: {impl}") + return None + + def dataset_exists(path, impl): if impl == "mmap": return MMapIndexedDataset.exists(path) @@ -120,6 +135,18 @@ def index_file_path(prefix_path): return prefix_path + ".idx" +def sft_index_file_path(prefix_path): + return os.path.join(prefix_path, "index.idx") + + +def sft_data_file_path(prefix_path, dataclass): + file_path_list = [] + for field in fields(dataclass): + file_path = os.path.join(prefix_path, f"{field.name}.bin") + file_path_list.append(file_path) + return file_path_list + + def data_file_path(prefix_path): return prefix_path + ".bin" @@ -548,6 +575,215 @@ def exists(path): return os.path.exists(index_file_path(path)) and os.path.exists(data_file_path(path)) +class SFT_MMapIndexedDataset(paddle.io.Dataset): + class Index(object): + _HDR_MAGIC = b"MMIDIDX\x00\x00" + + @classmethod + def writer(cls, path, dtype): + class _Writer(object): + def __enter__(self): + self._file = open(path, "wb") + self._file.write(cls._HDR_MAGIC) + self._file.write(struct.pack(" 1 and not add_sequence_len: + self._sizes.append(tensor.size) + add_sequence_len = True + self._data_file_dict[key].write(tensor.tobytes(order="C")) + + def end_document(self): + self._doc_idx.append(len(self._sizes)) + + def finalize(self, index_file): + for key, filename in self._data_file_dict.items(): + filename.close() + with SFT_MMapIndexedDataset.Index.writer(index_file, self._dtype) as index: + index.write(self._sizes, self._doc_idx) + + class MMapIndexedDatasetBuilder(object): def __init__(self, out_file, dtype, loss_mask_file=None): self._data_file = open(out_file, "wb") From e2710b523c766e2800d8e5e341f4b530d730ae6d Mon Sep 17 00:00:00 2001 From: greycooker <526929599@qq.com> Date: Thu, 1 Aug 2024 05:45:02 +0000 Subject: [PATCH 2/5] fix __len__ and __getitem__ --- paddlenlp/data/indexed_dataset.py | 62 +++++++++++++++++-------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/paddlenlp/data/indexed_dataset.py b/paddlenlp/data/indexed_dataset.py index fa2d7222f33e..7ca219bdb914 100644 --- a/paddlenlp/data/indexed_dataset.py +++ b/paddlenlp/data/indexed_dataset.py @@ -681,7 +681,7 @@ def __getitem__(self, i): return self._pointers[i], self._sizes[i] def __len__(self): - return self._len + return self._doc_count - 1 def __init__(self, path, dataclass, skip_warmup=False): super().__init__() @@ -728,33 +728,41 @@ def __len__(self): return len(self._index) def __getitem__(self, idx): - doc_idx = self._index.doc_idx - start_sentence, end_sentence = doc_idx[idx], doc_idx[idx + 1] - start_pointers, _ = self._index[start_sentence] - length_list = self._index._sizes[start_sentence:end_sentence] - - dataclass_fields = fields(self._dataclass) - dataclass_list = [] - sequence_offset = start_pointers - scalar_offset = doc_idx[idx] * np.dtype(self._index.dtype).itemsize - - for length in length_list: - field_data = {field.name: [] for field in dataclass_fields} - for field in dataclass_fields: - bin_buffer = self._bin_buffer_dict[os.path.join(self._path, f"{field.name}.bin")] - if field.type != int: - data = np.frombuffer(bin_buffer, dtype=self._index.dtype, count=length, offset=sequence_offset) - field_data[field.name] = data.tolist() - else: - data = np.frombuffer(bin_buffer, dtype=self._index.dtype, count=1, offset=scalar_offset) - field_data[field.name] = int(data[0]) - - dataclass_list.append(self._dataclass(**field_data)) + def get_index(idx): + doc_idx = self._index.doc_idx + start_sentence, end_sentence = doc_idx[idx], doc_idx[idx + 1] + start_pointers, _ = self._index[start_sentence] + length_list = self._index._sizes[start_sentence:end_sentence] + + dataclass_fields = fields(self._dataclass) + dataclass_list = [] + sequence_offset = start_pointers + scalar_offset = doc_idx[idx] * np.dtype(self._index.dtype).itemsize + + for length in length_list: + field_data = {field.name: [] for field in dataclass_fields} + for field in dataclass_fields: + bin_buffer = self._bin_buffer_dict[os.path.join(self._path, f"{field.name}.bin")] + if field.type != int: + data = np.frombuffer(bin_buffer, dtype=self._index.dtype, count=length, offset=sequence_offset) + field_data[field.name] = data.tolist() + else: + data = np.frombuffer(bin_buffer, dtype=self._index.dtype, count=1, offset=scalar_offset) + field_data[field.name] = int(data[0]) + + dataclass_list.append(self._dataclass(**field_data)) + + sequence_offset += length * np.dtype(self._index.dtype).itemsize + scalar_offset += np.dtype(self._index.dtype).itemsize + return dataclass_list - sequence_offset += length * np.dtype(self._index.dtype).itemsize - scalar_offset += np.dtype(self._index.dtype).itemsize - - return dataclass_list + if isinstance(idx, (int, np.integer)): + return get_index(idx) + elif isinstance(idx, slice): + start, stop, step = idx.indices(len(self)) + if step != 1: + raise ValueError("Slices into indexed_dataset must be contiguous") + return [get_index(idx) for idx in range(start, stop)] @property def sizes(self): From 4665ccfbbf6e4d3fc6eed5035de5fc5686cc47ba Mon Sep 17 00:00:00 2001 From: greycooker <526929599@qq.com> Date: Thu, 1 Aug 2024 08:30:08 +0000 Subject: [PATCH 3/5] fix judge impl method --- paddlenlp/data/indexed_dataset.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/paddlenlp/data/indexed_dataset.py b/paddlenlp/data/indexed_dataset.py index 7ca219bdb914..adf39e808d72 100644 --- a/paddlenlp/data/indexed_dataset.py +++ b/paddlenlp/data/indexed_dataset.py @@ -70,17 +70,16 @@ def make_dataset(path, impl, skip_warmup=False): def make_sft_dataset(path, impl, dataclass, skip_warmup=False): - if impl == "mmap" and SFT_MMapIndexedDataset.exists(path, dataclass): - print_rank_0(" > building dataset index ...") - start_time = time.time() - sft_indexed_dataset = SFT_MMapIndexedDataset(path, dataclass, skip_warmup) - print_rank_0(" > finished creating SFT indexed dataset in {:4f} " "seconds".format(time.time() - start_time)) - print_rank_0(" number of samples: {}".format(len(sft_indexed_dataset.doc_idx) - 1)) + if impl != "mmap": + raise ValueError("SFT Indexed Dataset only support mmap memory-mapped method temporarily") - return sft_indexed_dataset + print_rank_0(" > building dataset index ...") + start_time = time.time() + sft_indexed_dataset = SFT_MMapIndexedDataset(path, dataclass, skip_warmup) + print_rank_0(" > finished creating SFT indexed dataset in {:4f} " "seconds".format(time.time() - start_time)) + print_rank_0(" number of samples: {}".format(len(sft_indexed_dataset.doc_idx) - 1)) - print(f"Unknown dataset implementation: {impl}") - return None + return sft_indexed_dataset def dataset_exists(path, impl): From e988cf57c88e961e4e2a0660ed246f5002d9ca54 Mon Sep 17 00:00:00 2001 From: greycooker <526929599@qq.com> Date: Fri, 2 Aug 2024 10:49:54 +0000 Subject: [PATCH 4/5] fix mmap --- paddlenlp/data/indexed_dataset.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/paddlenlp/data/indexed_dataset.py b/paddlenlp/data/indexed_dataset.py index adf39e808d72..1809038b8c7f 100644 --- a/paddlenlp/data/indexed_dataset.py +++ b/paddlenlp/data/indexed_dataset.py @@ -69,13 +69,13 @@ def make_dataset(path, impl, skip_warmup=False): return None -def make_sft_dataset(path, impl, dataclass, skip_warmup=False): +def make_sft_dataset(path, dataclass, skip_warmup=False, impl="mmap"): if impl != "mmap": raise ValueError("SFT Indexed Dataset only support mmap memory-mapped method temporarily") print_rank_0(" > building dataset index ...") start_time = time.time() - sft_indexed_dataset = SFT_MMapIndexedDataset(path, dataclass, skip_warmup) + sft_indexed_dataset = SftMMapIndexedDataset(path, dataclass, skip_warmup) print_rank_0(" > finished creating SFT indexed dataset in {:4f} " "seconds".format(time.time() - start_time)) print_rank_0(" number of samples: {}".format(len(sft_indexed_dataset.doc_idx) - 1)) @@ -574,7 +574,7 @@ def exists(path): return os.path.exists(index_file_path(path)) and os.path.exists(data_file_path(path)) -class SFT_MMapIndexedDataset(paddle.io.Dataset): +class SftMMapIndexedDataset(paddle.io.Dataset): class Index(object): _HDR_MAGIC = b"MMIDIDX\x00\x00" @@ -798,7 +798,7 @@ def make_builder(out_file, impl, save_dtype, loss_mask_file=None): return IndexedDatasetBuilder(out_file, dtype=save_dtype) -class SFT_MMapIndexedDatasetBuilder(object): +class SftMMapIndexedDatasetBuilder(object): def __init__(self, output_file_dict, dtype): self._data_file_dict = {} for key, filename in output_file_dict.items(): @@ -823,7 +823,7 @@ def end_document(self): def finalize(self, index_file): for key, filename in self._data_file_dict.items(): filename.close() - with SFT_MMapIndexedDataset.Index.writer(index_file, self._dtype) as index: + with SftMMapIndexedDataset.Index.writer(index_file, self._dtype) as index: index.write(self._sizes, self._doc_idx) From ecb62b69a1925ef595ce77fd9b26d7a4878b869a Mon Sep 17 00:00:00 2001 From: greycooker <526929599@qq.com> Date: Fri, 2 Aug 2024 10:56:08 +0000 Subject: [PATCH 5/5] change variable name --- paddlenlp/data/indexed_dataset.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/paddlenlp/data/indexed_dataset.py b/paddlenlp/data/indexed_dataset.py index 1809038b8c7f..5130339779aa 100644 --- a/paddlenlp/data/indexed_dataset.py +++ b/paddlenlp/data/indexed_dataset.py @@ -75,7 +75,7 @@ def make_sft_dataset(path, dataclass, skip_warmup=False, impl="mmap"): print_rank_0(" > building dataset index ...") start_time = time.time() - sft_indexed_dataset = SftMMapIndexedDataset(path, dataclass, skip_warmup) + sft_indexed_dataset = SFTMMapIndexedDataset(path, dataclass, skip_warmup) print_rank_0(" > finished creating SFT indexed dataset in {:4f} " "seconds".format(time.time() - start_time)) print_rank_0(" number of samples: {}".format(len(sft_indexed_dataset.doc_idx) - 1)) @@ -574,7 +574,7 @@ def exists(path): return os.path.exists(index_file_path(path)) and os.path.exists(data_file_path(path)) -class SftMMapIndexedDataset(paddle.io.Dataset): +class SFTMMapIndexedDataset(paddle.io.Dataset): class Index(object): _HDR_MAGIC = b"MMIDIDX\x00\x00" @@ -798,7 +798,7 @@ def make_builder(out_file, impl, save_dtype, loss_mask_file=None): return IndexedDatasetBuilder(out_file, dtype=save_dtype) -class SftMMapIndexedDatasetBuilder(object): +class SFTMMapIndexedDatasetBuilder(object): def __init__(self, output_file_dict, dtype): self._data_file_dict = {} for key, filename in output_file_dict.items(): @@ -823,7 +823,7 @@ def end_document(self): def finalize(self, index_file): for key, filename in self._data_file_dict.items(): filename.close() - with SftMMapIndexedDataset.Index.writer(index_file, self._dtype) as index: + with SFTMMapIndexedDataset.Index.writer(index_file, self._dtype) as index: index.write(self._sizes, self._doc_idx)