Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add v2 datasets behind a version flag #1507

Merged
merged 21 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions composer/datasets/c4_hparams.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ class StreamingC4Hparams(DatasetHparams):
"""Builds a :class:`.DataSpec` for the StreamingC4 (Colossal Cleaned Common Crawl) dataset.

Args:
version (int): Which version of streaming to use. Default: ``2``.
remote (str): Remote directory (S3 or local filesystem) where dataset is stored.
Default: ``'s3://mosaicml-internal-dataset-c4/mds/1/'``
Default: ``'s3://mosaicml-internal-dataset-c4/mds/2/'``
local (str): Local filesystem directory where dataset is cached during operation.
Default: ``'/tmp/mds-cache/mds-c4/'``
split (str): What split of the dataset to use. Either ``'train'`` or ``'val'``. Default: ``'train'``.
tokenizer_name (str): The name of the HuggingFace tokenizer to preprocess text with. Default: ``'bert-base-uncased'``.
tokenizer_name (str): The name of the HuggingFace tokenizer to preprocess text with. Default:
``'bert-base-uncased'``.
max_seq_len (int): The max sequence length of each token sample. Default: ``512``.
group_method (str): How to group text samples into token samples. Currently only `truncate` is supported.
mlm (bool): Whether or not to use masked language modeling. Default: ``False``.
Expand All @@ -38,8 +40,9 @@ class StreamingC4Hparams(DatasetHparams):
timeout (float): How long to wait for shard to download before raising an exception. Default: 120 sec.
"""

version: int = hp.optional('Version of streaming (1 or 2)', default=2)
knighton marked this conversation as resolved.
Show resolved Hide resolved
remote: str = hp.optional('Remote directory (S3 or local filesystem) where dataset is stored',
default='s3://mosaicml-internal-dataset-c4/mds/1/')
default='s3://mosaicml-internal-dataset-c4/mds/2/')
local: str = hp.optional('Local filesystem directory where dataset is cached during operation',
default='/tmp/mds-cache/mds-c4/')
split: str = hp.optional('What split of the dataset to use. Either `train` or `val`.', default='train')
Expand Down Expand Up @@ -72,16 +75,35 @@ def initialize_object(self, batch_size: int, dataloader_hparams: DataLoaderHpara
raise MissingConditionalImportError(extra_deps_group='nlp', conda_package='transformers') from e

# Get StreamingC4 dataset
dataset = StreamingC4(remote=self.remote,
local=self.local,
split=self.split,
shuffle=self.shuffle,
tokenizer_name=self.tokenizer_name,
max_seq_len=self.max_seq_len,
group_method=self.group_method,
max_retries=self.max_retries,
timeout=self.timeout,
batch_size=batch_size)
if self.version == 1:
dataset = StreamingC4(remote=self.remote,
local=self.local,
split=self.split,
shuffle=self.shuffle,
tokenizer_name=self.tokenizer_name,
max_seq_len=self.max_seq_len,
group_method=self.group_method,
max_retries=self.max_retries,
timeout=self.timeout,
batch_size=batch_size)
elif self.version == 2:
try:
from streaming.text import C4
except ImportError as e:
raise MissingConditionalImportError(extra_deps_group='streaming',
conda_package='mosaicml-streaming') from e
dataset = C4(tokenizer_name=self.tokenizer_name,
max_seq_len=self.max_seq_len,
group_method=self.group_method,
local=self.local,
remote=self.remote,
split=self.split,
shuffle=self.shuffle,
retry=self.max_retries,
timeout=self.timeout,
batch_size=batch_size)
else:
raise ValueError(f'Invalid streaming version: {self.version}')

# Get collate_fn
collate_fn = transformers.DataCollatorForLanguageModeling(tokenizer=dataset.tokenizer,
Expand Down
56 changes: 44 additions & 12 deletions composer/datasets/cifar_hparams.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@
from composer.datasets.synthetic import SyntheticBatchPairDataset
from composer.datasets.synthetic_hparams import SyntheticHparamsMixin
from composer.utils import dist
from composer.utils.import_helpers import MissingConditionalImportError

__all__ = ['CIFAR10DatasetHparams', 'StreamingCIFAR10Hparams']

log = logging.getLogger(__name__)

# CIFAR10 mean and standard deviation for normalization.
CIFAR10_MEAN = 0.4914, 0.4822, 0.4465
CIFAR10_STD = 0.247, 0.243, 0.261


@dataclass
class CIFAR10DatasetHparams(DatasetHparams, SyntheticHparamsMixin):
Expand Down Expand Up @@ -146,20 +151,17 @@ def initialize_object(self, batch_size: int, dataloader_hparams: DataLoaderHpara
if self.datadir is None:
raise ValueError('datadir is required if use_synthetic is False')

cifar10_mean = 0.4914, 0.4822, 0.4465
cifar10_std = 0.247, 0.243, 0.261

if self.is_train:
transformation = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(cifar10_mean, cifar10_std),
transforms.Normalize(CIFAR10_MEAN, CIFAR10_STD),
])
else:
transformation = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(cifar10_mean, cifar10_std),
transforms.Normalize(CIFAR10_MEAN, CIFAR10_STD),
])

with dist.run_local_rank_zero_first():
Expand All @@ -183,25 +185,55 @@ class StreamingCIFAR10Hparams(DatasetHparams):
"""Streaming CIFAR10 hyperparameters.

Args:
version (int): Which version of streaming to use. Default: ``2``.
remote (str): Remote directory (S3 or local filesystem) where dataset is stored.
Default: ``'s3://mosaicml-internal-dataset-cifar10/mds/1/'``
Default: ``'s3://mosaicml-internal-dataset-cifar10/mds/2/'``
local (str): Local filesystem directory where dataset is cached during operation.
Default: ``'/tmp/mds-cache/mds-cifar10/'``
split (str): The dataset split to use, either 'train' or 'val'. Default: ``'train'``.
"""

version: int = hp.optional('Version of streaming (1 or 2)', default=2)
knighton marked this conversation as resolved.
Show resolved Hide resolved
remote: str = hp.optional('Remote directory (S3 or local filesystem) where dataset is stored',
default='s3://mosaicml-internal-dataset-cifar10/mds/1/')
default='s3://mosaicml-internal-dataset-cifar10/mds/2/')
local: str = hp.optional('Local filesystem directory where dataset is cached during operation',
default='/tmp/mds-cache/mds-cifar10/')
split: str = hp.optional("Which split of the dataset to use. Either ['train', 'val']", default='train')

def initialize_object(self, batch_size: int, dataloader_hparams: DataLoaderHparams) -> DataLoader:
dataset = StreamingCIFAR10(remote=self.remote,
local=self.local,
split=self.split,
shuffle=self.shuffle,
batch_size=batch_size)
if self.version == 1:
dataset = StreamingCIFAR10(remote=self.remote,
local=self.local,
split=self.split,
shuffle=self.shuffle,
batch_size=batch_size)
elif self.version == 2:
try:
from streaming.vision import CIFAR10
except ImportError as e:
raise MissingConditionalImportError(extra_deps_group='streaming',
conda_package='mosaicml-streaming') from e
if self.split == 'train':
transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(CIFAR10_MEAN, CIFAR10_STD),
])
else:
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(CIFAR10_MEAN, CIFAR10_STD),
])
dataset = CIFAR10(local=self.local,
remote=self.remote,
split=self.split,
shuffle=self.shuffle,
transform=transform,
batch_size=batch_size)
else:
raise ValueError(f'Invalid streaming version: {self.version}')

return dataloader_hparams.initialize_object(dataset,
batch_size=batch_size,
sampler=None,
Expand Down
53 changes: 44 additions & 9 deletions composer/datasets/imagenet_hparams.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from composer.datasets.synthetic_hparams import SyntheticHparamsMixin
from composer.datasets.utils import NormalizationFn, pil_image_collate
from composer.utils import dist
from composer.utils.import_helpers import MissingConditionalImportError

# ImageNet normalization values from torchvision: https://pytorch.org/vision/stable/models.html
IMAGENET_CHANNEL_MEAN = (0.485 * 255, 0.456 * 255, 0.406 * 255)
Expand Down Expand Up @@ -211,31 +212,65 @@ class StreamingImageNet1kHparams(DatasetHparams):
"""DatasetHparams for creating an instance of StreamingImageNet1k.

Args:
version (int): Which version of streaming to use. Default: ``2``.
remote (str): Remote directory (S3 or local filesystem) where dataset is stored.
Default: ``'s3://mosaicml-internal-dataset-imagenet1k/mds/1/```
Default: ``'s3://mosaicml-internal-dataset-imagenet1k/mds/2/```
local (str): Local filesystem directory where dataset is cached during operation.
Default: ``'/tmp/mds-cache/mds-imagenet1k/```
split (str): The dataset split to use, either 'train' or 'val'. Default: ``'train```.
resize_size (int, optional): The resize size to use. Use -1 to not resize. Default: ``-1``.
crop size (int): The crop size to use. Default: ``224``.
"""

version: int = hp.optional('Version of streaming (1 or 2)', default=2)
knighton marked this conversation as resolved.
Show resolved Hide resolved
remote: str = hp.optional('Remote directory (S3 or local filesystem) where dataset is stored',
default='s3://mosaicml-internal-dataset-imagenet1k/mds/1/')
default='s3://mosaicml-internal-dataset-imagenet1k/mds/2/')
local: str = hp.optional('Local filesystem directory where dataset is cached during operation',
default='/tmp/mds-cache/mds-imagenet1k/')
split: str = hp.optional("Which split of the dataset to use. Either ['train', 'val']", default='train')
resize_size: int = hp.optional('Resize size. Set to -1 to not resize', default=-1)
crop_size: int = hp.optional('Crop size', default=224)

def initialize_object(self, batch_size: int, dataloader_hparams: DataLoaderHparams) -> DataSpec:
dataset = StreamingImageNet1k(remote=self.remote,
local=self.local,
split=self.split,
shuffle=self.shuffle,
resize_size=self.resize_size,
crop_size=self.crop_size,
batch_size=batch_size)
if self.version == 1:
dataset = StreamingImageNet1k(remote=self.remote,
local=self.local,
split=self.split,
shuffle=self.shuffle,
resize_size=self.resize_size,
crop_size=self.crop_size,
batch_size=batch_size)
elif self.version == 2:
try:
from streaming.vision import ImageNet
except ImportError as e:
raise MissingConditionalImportError(extra_deps_group='streaming',
conda_package='mosaicml-streaming') from e
transform = []
if self.split == 'train':
# include fixed-size resize before RandomResizedCrop in training only
# if requested (by specifying a size > 0)
if self.resize_size > 0:
transform.append(transforms.Resize(self.resize_size))
# always include RandomResizedCrop and RandomHorizontalFlip
transform += [
transforms.RandomResizedCrop(self.crop_size, scale=(0.08, 1.0), ratio=(0.75, 4.0 / 3.0)),
transforms.RandomHorizontalFlip()
]
else:
if self.resize_size > 0:
transform.append(transforms.Resize(self.resize_size))
transform.append(transforms.CenterCrop(self.crop_size))
transform.append(lambda image: image.convert('RGB'))
transform = transforms.Compose(transform)
dataset = ImageNet(local=self.local,
remote=self.remote,
split=self.split,
shuffle=self.shuffle,
transform=transform,
batch_size=batch_size)
else:
raise ValueError(f'Invalid streaming version: {self.version}')
collate_fn = pil_image_collate
device_transform_fn = NormalizationFn(mean=IMAGENET_CHANNEL_MEAN, std=IMAGENET_CHANNEL_STD)
return DataSpec(dataloader=dataloader_hparams.initialize_object(
Expand Down
6 changes: 4 additions & 2 deletions composer/yamls/models/bert-base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ model:
# Train the model on the English C4 corpus
train_dataset:
streaming_c4:
remote: s3://allenai-c4/mds/1-gz/
version: 2
remote: s3://mosaicml-internal-dataset-c4/mds/2/
local: /tmp/mds-cache/mds-c4/
split: train
shuffle: true
Expand All @@ -31,7 +32,8 @@ evaluators:
- label: bert_pre_training
eval_dataset:
streaming_c4:
remote: s3://allenai-c4/mds/1-gz/
version: 2
remote: s3://mosaicml-internal-dataset-c4/mds/2/
local: /tmp/mds-cache/mds-c4/
split: val
shuffle: false
Expand Down
2 changes: 2 additions & 0 deletions composer/yamls/models/resnet50_streaming.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
train_dataset:
streaming_imagenet1k:
version: 2
remote: your-bucket-here
local: /tmp/mds-cache/mds-imagenet1k/
split: train
Expand All @@ -9,6 +10,7 @@ train_dataset:
drop_last: true
val_dataset:
streaming_imagenet1k:
version: 2
remote: your-bucket-here
local: /tmp/mds-cache/mds-imagenet1k/
split: val
Expand Down
2 changes: 2 additions & 0 deletions composer/yamls/models/resnet56_streaming_cifar10.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
train_dataset:
streaming_cifar10:
version: 2
remote: your-bucket-here
local: /tmp/mds-cache/mds-cifar10/
split: train
shuffle: true
drop_last: true
val_dataset:
streaming_cifar10:
version: 2
remote: your-bucket-here
local: /tmp/mds-cache/mds-cifar10/
split: val
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def package_files(prefix: str, directory: str, extension: str):
]

extra_deps['streaming'] = [
'mosaicml-streaming',
'boto3>=1.21.45,<2',
'paramiko>=2.11.0,<3',
]
Expand Down