Language Datasets and Data Loaders (LDDL) is a utility library that minimizes the friction during dataset retrieval, preprocessing and loading for the language models in NVIDIA Deep Learning Examples.
The current capabilities of LDDL include:
- Data preprocessing at scale via Dask and MPI for Python (mpi4py): BERT-PyTorch's and BERT-PaddlePaddle's dataset for pretraining takes less than 2 mins to preprocess on 32 DGXA100 nodes. BART-PyTorch's dataset for pretraining takes about 2 hrs to preprocess on 4 DGXA100 nodes.
- Data loading for PyTorch and PaddlePaddle multi-node training workloads with minimum overhead.
- Sequence binning that can reduce end-to-end training latency.
The steps to install LDDL are listed below:
Step 1 [optional but recommended]: The jemalloc memory allocator is an alternative (to the glibc memory allocator) that might offer better performance during data preprocessing. On Debian-based Linux distributions (e.g., Ubuntu), you can install jemalloc via:
apt-get install libjemalloc-dev
Step 2 [required]: LDDL can be installed from the source by running
pip install <target>
where <target>
is the project root directory of LDDL or
an URL thereof. For examples:
pip install git+https://github.com/NVIDIA/lddl.git
or
git clone https://github.com/NVIDIA/lddl.git
pip install ./lddl
pip
would also automatically install all LDDL's other Python package
dependencies.
LDDL is only tested to work with Python 3!
Step 3 [required]: After installing NLTK
(either manually or automatically when pip install
LDDL), the model data of
the NLTK's Punkt Sentence Tokenizer needs to be downloaded before the Punkt
Sentence Tokenizer can be used:
python -m nltk.downloader punkt
For your own tasks, it is highly likely that you would need to use a Docker container whose image you customize and build. As an example to show how to install LDDL inside a Docker container, we provide a Dockerfile which follows the above installation steps to install LDDL in a NGC Container. You can build NGC Container images with LDDL installed using this example Dockerfile by
bash docker/build.sh <Dockerfile name without extension> <tag of the base image> <output image name/URL>
NGC Containers are not one of LDDL's dependencies. You can install LDDL in your customized Docker image, local virtualenv or conda environments too.
To build the PyTorch image with LDDL installed based on the
NGC PyTorch Container
Version 21.11
and name the output image as lddl:latest
:
bash docker/build.sh ngc_pyt 21.11-py3 lddl:latest
where:
ngc_pyt
refers to using docker/ngc_pyt.Dockerfile;21.11-py3
means thatnvcr.io/nvidia/pytorch:21.11-py3
is used as the base image;lddl:latest
is the name of the built image. After building this image, you can find its name viadocker image list
.
To build the PaddlePaddle image with LDDL installed based on the
NGC PaddlePaddle Container
Version 22.12
and name the output image as lddl:latest
:
bash docker/build.sh ngc_paddle 22.12-py3 lddl:latest
where:
ngc_paddle
refers to using docker/ngc_paddle.Dockerfile;22.12-py3
means thatnvcr.io/nvidia/paddlepaddle:22.12-py3
is used as the base image;lddl:latest
is the name of the built image. After building this image, you can find its name viadocker image list
.
You can launch a container in interactive mode by:
bash docker/interactive.sh <mount specifications> <shell command> <output image name/URL>
For example, to launch a container using the lddl:latest
image and mount
datasets/
under your home directory to /datasets
inside the container:
bash docker/interactive.sh "-v ${HOME}/datasets:/datasets" /bin/bash lddl:latest
To clarify, we define the following terms:
- Offline vs. Online: By "Offline", we mean something to be run as a separate entity only once with respect to the training jobs; in contrast, "Online" refers to being run as part of the training jobs.
As an analogy, buying kitchen appliances is "Offline" with respect to cooking, because you only need to buy them once, and the action of buying often happens when you are not cooking (i.e., when you are going to the stores or shopping online). In contrast, washing the vegetables is "Online" with respect to cooking, because it happens every time before you cook the vegetables.
- Ahead-of-training vs. During-training: By "Ahead-of-training", we mean something to be run before the training processes start; in contrast, "During-training" refers to being run during the training processes.
In summary, LDDL consists of four components:
- Stage 1 [Offline] Downloaders that download the raw text of datasets from public and online sources.
- [Offline or Online and Ahead-of-training] Preprocessing:
- Stage 2 Preprocessors that preprocesses the raw text into unbalanced Parquet shards.
- Stage 3 Load Balancer that balance the Parquet shards and makes sure every shard has the same amount of samples.
- Stage 4 [Online and During-training] Data Loaders that load the balanced shards into memory and perform additional preprocessing steps during training.
Depending on the specific usage, a certain step can be performed in different stages. For example, if you want to experiment with static masking, you can request the preprocessor to mask each samples; however, if you want to experiment with dynamic masking, you can request the data loader to mask each samples.
LDDL supports the technique of sequence binning in order to reduce redundant computation on the padded tokens:
- The maximum sequence length is divided into several bins. For example, if
we want
4
bins out of a maximum sequence length of512
, then:- The first bin contains samples whose sequence lengths are between
[0, 128]
; - The second bin contains samples whose sequence lengths are between
[129, 256]
; - The third bin contains samples whose sequence lengths are between
[257, 384]
; - The third bin contains samples whose sequence lengths are between
[385, 512]
;
- The first bin contains samples whose sequence lengths are between
- At each training iteration, a bin is randomly selected based on the sequence distribution of the entire dataset, and all ranks are fed with mini-batches whose samples all belong to this selected bin.
- Each mini-batch is only padded to the longest sequence within this mini-batch.
The preprocessor and load balancer can speedup preprocessing large corpora significantly by scaling to multi-node via Dask and MPI.
Meanwhile, the technique of sequence binning can significantly reduce the end-to-end training latency by reducing redundant computation on the padded tokens.
Downloaders, preprocessors and the load balancer can be launched via shell commands:
- Downloaders:
- Wikipedia dumps:
download_wikipedia
- Bookcorpus
download_books
- Common Crawl
download_common_crawl
- OpenWebTextCorpus
download_open_webtext
- Wikipedia dumps:
- Preprocessors:
- BERT
- Pretraining:
preprocess_bert_pretrain
- Pretraining:
- BART
- Pretraining:
preprocess_bart_pretrain
- Pretraining:
- BERT
- Load Balancer:
balance_dask_output
Please use the --help
flag to check the exact usage of each command (e.g.,
download_wikipedia --help
).
An implementation of MPI is required to be already installed on your system for the preprocessors and the load balancer. NGC containers come with a pre-installed MPI implementation.
LDDL currently supports the following data loaders:
- BERT
- PyTorch
- Pretraining:
lddl.torch.get_bert_pretrain_data_loader
- Pretraining:
- PaddlePaddle
- Pretraining:
lddl.paddle.get_bert_pretrain_data_loader
- Pretraining:
- PyTorch
Please refer to their in-code documentation for more details.
We provide two working example scripts to demonstrate how to use LDDL end-to-end (i.e., from downloading the datasets to loading input batches during training) for a (mock) BERT Phase 2 pretraining task:
- Running on a local machine: local_example.sh.
You can run this script by
bash examples/local_example.sh
. - Running on a Slurm cluster and scale to multi-nodes:
slurm_example.sub. Before running this script,
you need to download and move the datasets to the right location in the NFS of
your Slurm cluster. You might also need to customize this script to match the
specific settings of your Slurm cluster. You can run this script and submit
jobs to Slurm by
sbatch -N<number of nodes> examples/slurm_example.sub
(e.g.,sbatch -N2 examples/slurm_example.sub
if you want to run on 2 nodes).
We assume that these two example scripts could be run without interruption so that they would work out-of-the-box. You can also comment out the commands in these two scripts to run each step individually. Important steps in the above working example scripts are summarized and highlighted below:
The Wikipedia corpus can be downloaded via:
download_wikipedia --outdir <Wikipedia output path>
where <Wikipedia output path>
is where you want to store the raw text of the
Wikipedia corpus. For example,
download_wikipedia --outdir data/wikipedia
would download the raw text of the Wikipedia corpus to data/wikipedia
.
The dataset for BERT pretraining can be preprocessed via:
mpirun \
-np $(nproc) \
--oversubscribe \
--allow-run-as-root \
-x LD_PRELOAD=<path to libjemalloc.so> \
preprocess_bert_pretrain \
--schedule mpi \
--target-seq-length <128 for Phase 1; 512 for Phase 2> \
--wikipedia <Wikipedia output path>/source \
--sink <BERT pretraining input path> \
--vocab-file <path to the vocab file> \
--num-blocks <number of input shards> \
--bin-size <bin size>
where:
<BERT pretraining input path>
is where you want to store the preprocessed but unbalanced Parquet shards generated by this BERT pretraining preprocessor.<path to the vocab file>
is the path to the vocab file that is used by the WordPiece tokenizer.<number of input shards>
is the total number of input shards; this number needs to be a positive integer multiple of<world size> * <max(DataLoader's num_workers, 1)>
.- if you want to enable sequence binning, you can set the
--bin-size
flag. Thei
-th bin contains sequences that have from(i - 1) * <bin size> + 1
toi * <bin size>
tokens. <path to libjemalloc.so>
will depend on your Linux distribution. For Debian-based distributions, it can be found at/usr/lib/x86_64-linux-gnu/libjemalloc.so
.
If you want to use the memory allocator from glibc instead of jemalloc, you
can omit the -x LD_PRELOAD=<path to libjemalloc.so>
flag to mpirun
. Either
jemalloc or glibc could be more suitable to your specific system, so we
recommend trying both and select the one that yields the best performance.
By default, masking is deferred to the online and during-training data loading
and performed dynamically. During dynamic masking, each sample can be masked
differently among epochs. If you want to enable static masking that takes place
during the preprocessor stage, you need to add the --masking
flag.
For example,
mpirun \
--oversubscribe \
--allow-run-as-root \
-np 64 \
-x LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so \
preprocess_bert_pretrain \
--schedule mpi \
--vocab-file data/vocab/bert-en-uncased.txt \
--wikipedia data/wikipedia/source/ \
--sink data/bert/pretrain/phase2/bin_size_64/ \
--target-seq-length 512 \
--num-blocks 4096 \
--bin-size 64 \
--masking
would use a total of 64
processes to run the preprocessor for BERT Phase 2
pretraining (whose maximum sequence length is 512
). In this case, the shared
object of jemalloc is located at /usr/lib/x86_64-linux-gnu/libjemalloc.so
; the vocab file
is located at data/vocab/bert-en-uncased.txt
; the Wikipedia corpus is
downloaded at data/wikipedia/
; the preprocessor would store the unbalanced
(roughly 4096
) Parquet shards at data/bert/pretrain/phase2/bin_size_64/
;
the sequence binning is enabled with the bin size of 64
; and the static
masking is enabled as well.
We can balance the number of samples among the preprocessed but unbalanced Parquet shards via:
mpirun -np $(nproc) --oversubscribe --allow-run-as-root \
balance_dask_output \
--indir <BERT pretraining input path> \
--num-shards <number of input shards>
After the load balancer finishes, all shards will have exactly the same number
of samples, or some shards are different by only 1 sample if the total number of
shards does not divide the total number of samples. If you don't specify a path
to the --outdir
flag, the Parquet shards in <BERT pretraining input path>
will be modified in-place. For example,
mpirun \
--oversubscribe \
--allow-run-as-root \
-np 64 \
balance_dask_output \
--indir data/bert/pretrain/phase2/bin_size_64/ \
--num-shards 4096
would use 64
processes to run the load balancer which balances the Parquet
shards (located at data/bert/pretrain/phase2/bin_size_64/
) into exactly 4096
shards. Among these shards, the number of samples could differ by at most 1.
The above
mpirun
commands show how the preprocessor and load balancer could be run on a single node. The flags passed intompirun
often need to be adjusted based on the configuration of your compute cluster.
We also provide an example to demonstrate how to run the preprocessor and load balancer on SLURM clusters that support MPI, Pyxis and Enroot.
We can get the LDDL dataloader for BERT pretraining via
lddl.torch.get_bert_pretrain_data_loader
(please refer to the
in-code documentation of this function). Afterwards,
we can use it like a normal PyTorch DataLoader instance. For example,
import argparse
import logging
import os
import lddl.torch
parser = argparse.ArgumentParser()
parser.add_argument(
'--local_rank',
type=int,
default=os.getenv('LOCAL_RANK', 0),
help='local_rank is set by torch.distributed.launch or SLURM',
)
args = parser.parse_args()
# Contains the balanced Parquet shards generated by the load balancer.
input_dir = 'data/bert/pretrain/phase2/bin_size_64/'
# Path to the vocab file.
vocab_file = 'data/vocab/bert-en-uncased.txt'
# Number of samples in a single mini-batch per rank.
batch_size = 64
# Number of DataLoader worker processes per rank.
num_workers = 4
# Epoch number to start with.
start_epoch = 0
# Total number of epochs to train. One epoch refers to going through the entire
# dataset once.
epochs = 2
train_dataloader = lddl.torch.get_bert_pretrain_data_loader(
input_dir,
local_rank=args.local_rank,
vocab_file=vocab_file,
data_loader_kwargs={
'batch_size': batch_size,
'num_workers': num_workers,
'pin_memory': True,
},
log_level=logging.WARNING,
start_epoch=start_epoch,
)
...
for epoch in range(start_epoch, start_epoch + epochs):
for i, batch in enumerate(train_dataloader):
prediction_scores, seq_relationship_score = model(
input_ids=batch['input_ids'].to(device),
token_type_ids=batch['token_type_ids'].to(device),
attention_mask=batch['attention_mask'].to(device),
)
loss = criterion(
prediction_scores,
seq_relationship_score,
batch['labels'].to(device),
batch['next_sentence_labels'].to(device),
)
...
We provide a (mock) training script that shows how
the LDDL dataloader should be used. For example, if the balanced Parquet shards
are located at data/bert/pretrain/phase2/bin_size_64/
and the vocab file is
located at data/vocab/bert-en-uncased.txt
, you can run this (mock) training
script with a world size of 2
on a single machine via:
python -m torch.distributed.launch --nproc_per_node=2 \
benchmarks/torch_train.py \
--path data/bert/pretrain/phase2/bin_size_64/ \
--vocab-file data/vocab/bert-en-uncased.txt
Once the (mock) training processes are up and running, and the first rank starts to print output, these processes simply emulate the training loop which could take some time to go through one epoch. You can kill these processes at any time.
We can get the LDDL dataloader for BERT pretraining via
lddl.paddle.get_bert_pretrain_data_loader
(please refer to the
in-code documentation of this function). Afterwards,
we can use it like a normal PaddlePaddle DataLoader instance. For example,
import logging
import os
import lddl.paddle
# Contains the balanced Parquet shards generated by the load balancer.
input_dir = 'data/bert/pretrain/phase2/bin_size_64/'
# Path to the vocab file.
vocab_file = 'vocab/bert-large-uncased-vocab.txt'
# Number of samples in a single mini-batch per rank.
batch_size = 64
# Number of DataLoader worker processes per rank.
num_workers = 4
# Epoch number to start with.
start_epoch = 0
# Total number of epochs to train. One epoch refers to going through the entire
# dataset once.
epochs = 2
train_dataloader = lddl.paddle.get_bert_pretrain_data_loader(
input_dir,
vocab_file=vocab_file,
data_loader_kwargs={
'batch_size': batch_size,
'num_workers': num_workers,
},
log_level=logging.WARNING,
start_epoch=start_epoch,
)
...
for epoch in range(start_epoch, start_epoch + epochs):
for i, batch in enumerate(train_dataloader):
prediction_scores, seq_relationship_score = model(
input_ids=batch['input_ids'],
token_type_ids=batch['token_type_ids'],
attention_mask=batch['attention_mask'],
)
loss = criterion(
prediction_scores,
seq_relationship_score,
batch['masked_lm_labels'],
batch['next_sentence_labels'],
)
...
We provide a (mock) training script that shows how
the LDDL dataloader should be used. For example, if the balanced Parquet shards
are located at data/bert/pretrain/phase2/bin_size_64/
and the vocab file is
located at vocab/bert-large-uncased-vocab.txt
, you can run this (mock) training
script with a world size of 2
on a single machine via:
python -m paddle.distributed.launch --gpus=0,1 \
benchmarks/paddle_train.py \
--path data/bert/pretrain/phase2/bin_size_64/ \
--vocab-file vocab/bert-large-uncased-vocab.txt
Once the (mock) training processes are up and running, and the first rank starts to print output, these processes simply emulate the training loop which could take some time to go through one epoch. You can kill these processes at any time.
We welcome any form of contribution! The simplest contribution would be to try LDDL on your own NLP tasks where data preprocessing and loading is a headache for you. If you find rough edges for your specific use case, please file a GitHub issue.