Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support sft mapdataset #8840

Merged
merged 5 commits into from
Aug 5, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 273 additions & 0 deletions paddlenlp/data/indexed_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import shutil
import struct
import time
from dataclasses import fields
from functools import lru_cache
from itertools import accumulate

Expand Down Expand Up @@ -68,6 +69,20 @@
return None


def make_sft_dataset(path, impl, dataclass, skip_warmup=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要么就只支持mmap的吧,不用判断了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已经修改,不是mmap直接报错

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议:make_sft_dataset(path, dataclass, skip_warmup=False, impl=“mmap”)

if impl == "mmap" and SFT_MMapIndexedDataset.exists(path, dataclass):
print_rank_0(" > building dataset index ...")
start_time = time.time()
sft_indexed_dataset = SFT_MMapIndexedDataset(path, dataclass, skip_warmup)
print_rank_0(" > finished creating SFT indexed dataset in {:4f} " "seconds".format(time.time() - start_time))
print_rank_0(" number of samples: {}".format(len(sft_indexed_dataset.doc_idx) - 1))

Check warning on line 78 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L73-L78

Added lines #L73 - L78 were not covered by tests

return sft_indexed_dataset

Check warning on line 80 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L80

Added line #L80 was not covered by tests

print(f"Unknown dataset implementation: {impl}")
return None

Check warning on line 83 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L82-L83

Added lines #L82 - L83 were not covered by tests


def dataset_exists(path, impl):
if impl == "mmap":
return MMapIndexedDataset.exists(path)
Expand Down Expand Up @@ -120,6 +135,18 @@
return prefix_path + ".idx"


def sft_index_file_path(prefix_path):
return os.path.join(prefix_path, "index.idx")

Check warning on line 139 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L139

Added line #L139 was not covered by tests


def sft_data_file_path(prefix_path, dataclass):
file_path_list = []
for field in fields(dataclass):
file_path = os.path.join(prefix_path, f"{field.name}.bin")
file_path_list.append(file_path)
return file_path_list

Check warning on line 147 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L143-L147

Added lines #L143 - L147 were not covered by tests


def data_file_path(prefix_path):
return prefix_path + ".bin"

Expand Down Expand Up @@ -548,13 +575,259 @@
return os.path.exists(index_file_path(path)) and os.path.exists(data_file_path(path))


class SFT_MMapIndexedDataset(paddle.io.Dataset):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里class采用驼峰命名,不要下划线。

class Index(object):
_HDR_MAGIC = b"MMIDIDX\x00\x00"

@classmethod
def writer(cls, path, dtype):
class _Writer(object):
def __enter__(self):
self._file = open(path, "wb")
self._file.write(cls._HDR_MAGIC)
self._file.write(struct.pack("<Q", 1))
self._file.write(struct.pack("<B", code(dtype)))

Check warning on line 589 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L584-L589

Added lines #L584 - L589 were not covered by tests

return self

Check warning on line 591 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L591

Added line #L591 was not covered by tests

@staticmethod
def _get_pointers(sizes):
dtype_size = dtype().itemsize
address = 0
pointers = []
for size in sizes:
pointers.append(address)
address += size * dtype_size
return pointers

Check warning on line 601 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L593-L601

Added lines #L593 - L601 were not covered by tests

def write(self, sizes, doc_idx):

Check warning on line 603 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L603

Added line #L603 was not covered by tests

pointers = self._get_pointers(sizes)
self._file.write(struct.pack("<Q", len(sizes)))
self._file.write(struct.pack("<Q", len(doc_idx)))

Check warning on line 607 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L605-L607

Added lines #L605 - L607 were not covered by tests

sizes = np.array(sizes, dtype=np.int32)
self._file.write(sizes.tobytes(order="C"))
del sizes

Check warning on line 611 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L609-L611

Added lines #L609 - L611 were not covered by tests

pointers = np.array(pointers, dtype=np.int64)
self._file.write(pointers.tobytes(order="C"))
del pointers

Check warning on line 615 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L613-L615

Added lines #L613 - L615 were not covered by tests

doc_idx = np.array(doc_idx, dtype=np.int64)
self._file.write(doc_idx.tobytes(order="C"))

Check warning on line 618 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L617-L618

Added lines #L617 - L618 were not covered by tests

def __exit__(self, exc_type, exc_val, exc_tb):
self._file.close()

Check warning on line 621 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L620-L621

Added lines #L620 - L621 were not covered by tests

return _Writer()

Check warning on line 623 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L623

Added line #L623 was not covered by tests

def __init__(self, path, skip_warmup=False):
with open(path, "rb") as stream:
magic_test = stream.read(9)
assert self._HDR_MAGIC == magic_test, (

Check warning on line 628 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L626-L628

Added lines #L626 - L628 were not covered by tests
"Index file doesn't match expected format. "
"Make sure that --dataset-impl is configured properly."
)
version = struct.unpack("<Q", stream.read(8))
assert (1,) == version

Check warning on line 633 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L632-L633

Added lines #L632 - L633 were not covered by tests

(dtype_code,) = struct.unpack("<B", stream.read(1))
self._dtype = dtypes[dtype_code]
self._dtype_size = self._dtype().itemsize

Check warning on line 637 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L635-L637

Added lines #L635 - L637 were not covered by tests

self._len = struct.unpack("<Q", stream.read(8))[0]
self._doc_count = struct.unpack("<Q", stream.read(8))[0]
offset = stream.tell()

Check warning on line 641 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L639-L641

Added lines #L639 - L641 were not covered by tests

if not skip_warmup:
print_rank_0(" warming up index mmap file...")
_warmup_mmap_file(path)

Check warning on line 645 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L643-L645

Added lines #L643 - L645 were not covered by tests
gongel marked this conversation as resolved.
Show resolved Hide resolved

self._buffer_mmap = np.memmap(path, mode="r", order="C")
self._buffer = memoryview(self._buffer_mmap)
print_rank_0(" reading sizes...")
self._sizes = np.frombuffer(self._buffer, dtype=np.int32, count=self._len, offset=offset)
print_rank_0(" reading pointers...")
self._pointers = np.frombuffer(

Check warning on line 652 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L647-L652

Added lines #L647 - L652 were not covered by tests
self._buffer, dtype=np.int64, count=self._len, offset=offset + self._sizes.nbytes
)
print_rank_0(" reading document index...")
self._doc_idx = np.frombuffer(

Check warning on line 656 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L655-L656

Added lines #L655 - L656 were not covered by tests
self._buffer,
dtype=np.int64,
count=self._doc_count,
offset=offset + self._sizes.nbytes + self._pointers.nbytes,
)

def __del__(self):
self._buffer_mmap._mmap.close()
del self._buffer_mmap

Check warning on line 665 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L664-L665

Added lines #L664 - L665 were not covered by tests

@property
def dtype(self):
return self._dtype

Check warning on line 669 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L669

Added line #L669 was not covered by tests

@property
def sizes(self):
return self._sizes

Check warning on line 673 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L673

Added line #L673 was not covered by tests

@property
def doc_idx(self):
return self._doc_idx

Check warning on line 677 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L677

Added line #L677 was not covered by tests

@lru_cache(maxsize=8)
def __getitem__(self, i):
return self._pointers[i], self._sizes[i]

Check warning on line 681 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L681

Added line #L681 was not covered by tests

def __len__(self):
return self._doc_count - 1

Check warning on line 684 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L684

Added line #L684 was not covered by tests

def __init__(self, path, dataclass, skip_warmup=False):
super().__init__()
self._dataclass = dataclass
self._path = None
self._index = None
self._bin_buffer = None

Check warning on line 691 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L687-L691

Added lines #L687 - L691 were not covered by tests

self._do_init(path, skip_warmup)

Check warning on line 693 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L693

Added line #L693 was not covered by tests

def __getstate__(self):
return self._path

Check warning on line 696 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L696

Added line #L696 was not covered by tests

def __setstate__(self, state):
self._do_init(state, skip_warmup=True)

Check warning on line 699 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L699

Added line #L699 was not covered by tests

def _do_init(self, path, skip_warmup):
self._path = path
if not self.exists(path, self._dataclass):
raise ValueError("Missing file, %s" % (path))

Check warning on line 704 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L702-L704

Added lines #L702 - L704 were not covered by tests

self._index = self.Index(sft_index_file_path(self._path), skip_warmup)
if not skip_warmup:
print_rank_0(" warming up data mmap file...")
for data_file in sft_data_file_path(self._path, self._dataclass):
_warmup_mmap_file(data_file)
print_rank_0(" creating numpy buffer of mmap...")

Check warning on line 711 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L706-L711

Added lines #L706 - L711 were not covered by tests

self._bin_buffer_mmap_dict = {}
self._bin_buffer_dict = {}
for data_file in sft_data_file_path(self._path, self._dataclass):
self._bin_buffer_mmap_dict[data_file] = np.memmap(data_file, mode="r", order="C")
self._bin_buffer_dict[data_file] = memoryview(self._bin_buffer_mmap_dict[data_file])
print_rank_0(" creating memory view of numpy buffer...")

Check warning on line 718 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L713-L718

Added lines #L713 - L718 were not covered by tests

def __del__(self):
for key, value in self._bin_buffer_mmap_dict.items():
value._mmap.close()
for key, value in self._bin_buffer_dict.items():
del value
del self._index

Check warning on line 725 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L721-L725

Added lines #L721 - L725 were not covered by tests

def __len__(self):
return len(self._index)

Check warning on line 728 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L728

Added line #L728 was not covered by tests

def __getitem__(self, idx):
def get_index(idx):
doc_idx = self._index.doc_idx
start_sentence, end_sentence = doc_idx[idx], doc_idx[idx + 1]
start_pointers, _ = self._index[start_sentence]
length_list = self._index._sizes[start_sentence:end_sentence]

Check warning on line 735 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L731-L735

Added lines #L731 - L735 were not covered by tests

dataclass_fields = fields(self._dataclass)
dataclass_list = []
sequence_offset = start_pointers
scalar_offset = doc_idx[idx] * np.dtype(self._index.dtype).itemsize

Check warning on line 740 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L737-L740

Added lines #L737 - L740 were not covered by tests

for length in length_list:
field_data = {field.name: [] for field in dataclass_fields}
for field in dataclass_fields:
bin_buffer = self._bin_buffer_dict[os.path.join(self._path, f"{field.name}.bin")]
if field.type != int:
data = np.frombuffer(bin_buffer, dtype=self._index.dtype, count=length, offset=sequence_offset)
field_data[field.name] = data.tolist()

Check warning on line 748 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L742-L748

Added lines #L742 - L748 were not covered by tests
else:
data = np.frombuffer(bin_buffer, dtype=self._index.dtype, count=1, offset=scalar_offset)
field_data[field.name] = int(data[0])

Check warning on line 751 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L750-L751

Added lines #L750 - L751 were not covered by tests

dataclass_list.append(self._dataclass(**field_data))

Check warning on line 753 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L753

Added line #L753 was not covered by tests

sequence_offset += length * np.dtype(self._index.dtype).itemsize
scalar_offset += np.dtype(self._index.dtype).itemsize
return dataclass_list

Check warning on line 757 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L755-L757

Added lines #L755 - L757 were not covered by tests

if isinstance(idx, (int, np.integer)):
return get_index(idx)
elif isinstance(idx, slice):
start, stop, step = idx.indices(len(self))
if step != 1:
raise ValueError("Slices into indexed_dataset must be contiguous")
return [get_index(idx) for idx in range(start, stop)]

Check warning on line 765 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L759-L765

Added lines #L759 - L765 were not covered by tests

@property
def sizes(self):
return self._index.sizes

Check warning on line 769 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L769

Added line #L769 was not covered by tests

@property
def doc_idx(self):
return self._index.doc_idx

Check warning on line 773 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L773

Added line #L773 was not covered by tests

def get_doc_idx(self):
return self._index._doc_idx

Check warning on line 776 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L776

Added line #L776 was not covered by tests

def set_doc_idx(self, doc_idx_):
self._index._doc_idx = doc_idx_

Check warning on line 779 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L779

Added line #L779 was not covered by tests

@property
def supports_prefetch(self):
return False

Check warning on line 783 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L783

Added line #L783 was not covered by tests

@staticmethod
def exists(path, dataclass):
file_path_list = sft_data_file_path(path, dataclass)
file_path_list.append(sft_index_file_path(path))
for file_path in file_path_list:
if not os.path.exists(file_path):
return False
return True

Check warning on line 792 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L787-L792

Added lines #L787 - L792 were not covered by tests


def make_builder(out_file, impl, save_dtype, loss_mask_file=None):
if impl == "mmap":
return MMapIndexedDatasetBuilder(out_file, dtype=save_dtype, loss_mask_file=loss_mask_file)
else:
return IndexedDatasetBuilder(out_file, dtype=save_dtype)


class SFT_MMapIndexedDatasetBuilder(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个命名同样

def __init__(self, output_file_dict, dtype):
self._data_file_dict = {}
for key, filename in output_file_dict.items():
self._data_file_dict[key] = open(filename, "wb")
self.output_file_dict = output_file_dict
self._dtype = dtype
self._sizes = []
self._doc_idx = [0]

Check warning on line 810 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L804-L810

Added lines #L804 - L810 were not covered by tests

def add_item(self, sequence):
add_sequence_len = False
for key in self._data_file_dict.keys():
tensor = np.array(getattr(sequence, key), dtype=self._dtype)
if tensor.size > 1 and not add_sequence_len:
self._sizes.append(tensor.size)
add_sequence_len = True
self._data_file_dict[key].write(tensor.tobytes(order="C"))

Check warning on line 819 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L813-L819

Added lines #L813 - L819 were not covered by tests

def end_document(self):
self._doc_idx.append(len(self._sizes))

Check warning on line 822 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L822

Added line #L822 was not covered by tests

def finalize(self, index_file):
for key, filename in self._data_file_dict.items():
filename.close()
with SFT_MMapIndexedDataset.Index.writer(index_file, self._dtype) as index:
index.write(self._sizes, self._doc_idx)

Check warning on line 828 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L825-L828

Added lines #L825 - L828 were not covered by tests


class MMapIndexedDatasetBuilder(object):
def __init__(self, out_file, dtype, loss_mask_file=None):
self._data_file = open(out_file, "wb")
Expand Down
Loading