Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a regression test for StreamingDataset using cloud providers #319

Merged
merged 74 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 67 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
bab13a1
add venv to pyproject.toml
b-chu Jun 27, 2023
f929587
Fix style
b-chu Jun 27, 2023
bc4a281
Remove pyproject.toml changes
b-chu Jun 27, 2023
609fc57
Remove pyproject.toml changes
b-chu Jun 27, 2023
2e43dd1
Fix tests
b-chu Jun 28, 2023
794f613
Add basic flow test
b-chu Jun 23, 2023
1a61589
Fix arg
b-chu Jun 28, 2023
51f0522
Use same download directory for all ranks
b-chu Jun 28, 2023
08e9abc
Fix import order
b-chu Jun 28, 2023
f5762a3
Add yaml
b-chu Jun 28, 2023
4e70b38
Edit yaml
b-chu Jun 29, 2023
3bca53a
Add cloning without ssh to yaml
b-chu Jun 29, 2023
e090164
Add cloning without ssh to yaml
b-chu Jun 29, 2023
1c2f74c
Merge branch 'mosaicml:main' into test_basic_flow
b-chu Jun 29, 2023
4308739
Check for local directory
b-chu Jun 28, 2023
415707d
Fix tests
b-chu Jun 29, 2023
69dfca5
Add local_directory_timeout since download_timeout may be initialized…
b-chu Jun 29, 2023
0396c24
Create folder after initializing timeout
b-chu Jun 29, 2023
f89ef54
Create folder before keep zip
b-chu Jun 29, 2023
a412840
Fix arguments
b-chu Jun 29, 2023
826e4d7
Fix writer filepath in test
b-chu Jun 29, 2023
cb0036e
Add default argument
b-chu Jun 29, 2023
d392617
Run basic flow test with various options
b-chu Jun 30, 2023
d8a6c93
Fix union for lint
b-chu Jun 30, 2023
e1e209f
Add regression test for gcs
b-chu Jun 30, 2023
fece14d
Rename iterate data file
b-chu Jul 3, 2023
b149bec
Rename iterate data yaml
b-chu Jul 3, 2023
49c79ed
Fix error message
b-chu Jun 29, 2023
a8062e2
Change wait and name for local directory creation
b-chu Jul 3, 2023
588f0e1
Change encryption for test and local directory
b-chu Jul 3, 2023
1658516
Fix tests
b-chu Jul 3, 2023
7ea73d2
Fix tests
b-chu Jul 3, 2023
9da2e5a
Add test for exception and move dist initialization
b-chu Jul 3, 2023
225ab9c
Rename iterate data
b-chu Jul 3, 2023
62a3036
Rename iterate data
b-chu Jul 3, 2023
2050ea0
Add debug prints
b-chu Jul 3, 2023
96cbb1c
Debug
b-chu Jul 3, 2023
b65dee8
Debug
b-chu Jul 3, 2023
4b7bf63
Fix arg parse, set yaml for mosaicml, rename dataset file
b-chu Jul 3, 2023
dfc6d44
Add regression test for gcs
b-chu Jun 30, 2023
1079a31
Uncomment bucket exist check
b-chu Jul 14, 2023
c674ad5
Change dataset to synthetic_dataset
b-chu Jul 14, 2023
f05e242
Add cloud providers and yaml
b-chu Jul 17, 2023
71f06c3
Debug
b-chu Jul 3, 2023
4b90926
Debug
b-chu Jul 3, 2023
cd2179f
Bump fastapi from 0.98.0 to 0.100.0 (#322)
dependabot[bot] Jul 17, 2023
5923764
Bump uvicorn from 0.22.0 to 0.23.0 (#327)
dependabot[bot] Jul 17, 2023
35b3e95
Bump gitpython from 3.1.31 to 3.1.32 (#329)
dependabot[bot] Jul 17, 2023
54dca29
Bump pydantic from 1.10.9 to 1.10.11 (#328)
dependabot[bot] Jul 17, 2023
4ca6c60
Sync tmp directory (#316)
b-chu Jul 18, 2023
5bde48a
Fix style
b-chu Jun 27, 2023
e1847b7
Check for local directory
b-chu Jun 28, 2023
d5db881
Add local_directory_timeout since download_timeout may be initialized…
b-chu Jun 29, 2023
eb8c2f6
Change wait and name for local directory creation
b-chu Jul 3, 2023
716d0a6
Debug
b-chu Jul 3, 2023
ccba09d
Test iterating over compressed dataset
b-chu Jul 18, 2023
bb02393
Change repo to mosaicml
b-chu Jul 18, 2023
3a8b788
Comment out cluster
b-chu Jul 18, 2023
874ba5f
Add regression test for gcs
b-chu Jun 30, 2023
de11bc5
Rename iterate data file
b-chu Jul 3, 2023
f6cc6eb
Rename iterate data yaml
b-chu Jul 3, 2023
d37b3cb
Debug
b-chu Jul 3, 2023
2f59987
Add check for number of files
b-chu Jul 18, 2023
d7d508e
Add download number of files check to yaml
b-chu Jul 18, 2023
ad74036
Initialize process group for iterate_data
b-chu Jul 19, 2023
baae6b1
Merge branch 'main' into test_cloud_providers
b-chu Jul 19, 2023
46ff4d1
Uncomment check_bucket_exists
b-chu Jul 19, 2023
af7cb24
Increase number of shards
b-chu Jul 19, 2023
5854077
Increase sample size
b-chu Jul 20, 2023
1bcdfab
Increase number of shards
b-chu Jul 20, 2023
300d7cb
Modify yaml for PR
b-chu Jul 20, 2023
afd0c0c
Merge branch 'mosaicml:main' into test_cloud_providers
b-chu Aug 1, 2023
32b7daf
Move cloud urls to yaml
b-chu Aug 1, 2023
92af3f6
Use env vars for cloud urls
b-chu Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions regression/cloud_providers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: streaming-regression-test-cloud-providers
compute:
cluster: r1z1
gpu_num: 8
command: |-
pip uninstall -y mosaicml-streaming
cd streaming
pip install -e '.[dev]'
composer -n 1 regression/synthetic_dataset.py --create --cloud gs
b-chu marked this conversation as resolved.
Show resolved Hide resolved
composer -n 2 regression/iterate_data.py --cloud gs --check_download
composer -n 1 regression/synthetic_dataset.py --delete --cloud gs
composer -n 1 regression/synthetic_dataset.py --create --cloud s3
composer -n 4 regression/iterate_data.py --cloud s3 --check_download
composer -n 1 regression/synthetic_dataset.py --delete --cloud s3
composer -n 1 regression/synthetic_dataset.py --create --cloud oci
composer -n 8 regression/iterate_data.py --cloud oci --check_download
composer -n 1 regression/synthetic_dataset.py --delete --cloud oci

image: mosaicml/composer:0.15.0
scheduling:
resumable: true
priority: medium
integrations:
- integration_type: git_repo
git_repo: b-chu/streaming
git_branch: test_cloud_providers
ssh_clone: false
61 changes: 58 additions & 3 deletions regression/iterate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
import os
import shutil
import tempfile
import urllib.parse
from argparse import ArgumentParser, Namespace

import utils
from torch import distributed as dist
from torch.utils.data import DataLoader

from streaming import StreamingDataset
from streaming.base.distributed import barrier
from streaming.base.distributed import barrier, maybe_init_dist

_TRAIN_EPOCHS = 2

Expand All @@ -35,6 +38,8 @@ def parse_args() -> tuple[Namespace, dict[str, str]]:
tuple(Namespace, dict[str, str]): Command-line arguments and named arguments.
"""
args = ArgumentParser()
args.add_argument('--cloud', type=str)
args.add_argument('--check_download', default=False, action='store_true')
args.add_argument('--local', default=False, action='store_true')
b-chu marked this conversation as resolved.
Show resolved Hide resolved
args.add_argument(
'--keep_zip',
Expand All @@ -56,18 +61,57 @@ def parse_args() -> tuple[Namespace, dict[str, str]]:
return args, kwargs


def get_file_count(cloud: str) -> int:
"""Get the number of files in a remote directory.

Args:
cloud (str): Cloud provider.
"""
remote_dir = utils.get_remote_dir(cloud)
obj = urllib.parse.urlparse(remote_dir)
files = []
if cloud == 'gs':
from google.cloud.storage import Bucket, Client

service_account_path = os.environ['GOOGLE_APPLICATION_CREDENTIALS']
gcs_client = Client.from_service_account_json(service_account_path)

bucket = Bucket(gcs_client, obj.netloc)
files = bucket.list_blobs(prefix=obj.path.lstrip('/'))
elif cloud == 's3':
import boto3

s3 = boto3.resource('s3')
bucket = s3.Bucket(obj.netloc)
files = bucket.objects.filter(Prefix=obj.path.lstrip('/'))
elif cloud == 'oci':
import oci

config = oci.config.from_file()
oci_client = oci.object_storage.ObjectStorageClient(
config=config, retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY)
namespace = oci_client.get_namespace().data
objects = oci_client.list_objects(namespace, obj.netloc, prefix=obj.path.lstrip('/'))

files = objects.data.objects

return sum(1 for _ in files)


def main(args: Namespace, kwargs: dict[str, str]) -> None:
"""Benchmark time taken to generate the epoch for a given dataset.

Args:
args (Namespace): Command-line arguments.
kwargs (dict): Named arguments.
"""
# Initialize torch dist ourselves, if necessary.
destroy_dist = maybe_init_dist()

tmp_dir = tempfile.gettempdir()
tmp_remote_dir = os.path.join(tmp_dir, 'regression_remote')
tmp_download_dir = os.path.join(tmp_dir, 'test_iterate_data_download')
dataset = StreamingDataset(
remote=tmp_remote_dir,
remote=utils.get_remote_dir(args.cloud),
local=tmp_download_dir if args.local else None,
split=kwargs.get('split'),
download_retry=int(kwargs.get('download_retry', 2)),
Expand All @@ -92,12 +136,23 @@ def main(args: Namespace, kwargs: dict[str, str]) -> None:
for _ in dataloader:
pass

if args.check_download and args.cloud is not None:
num_cloud_files = get_file_count(args.cloud)
local_dir = dataset.streams[0].local
num_local_files = len([
name for name in os.listdir(local_dir) if os.path.isfile(os.path.join(local_dir, name))
])
assert num_cloud_files == num_local_files

barrier()
# Clean up directories
for stream in dataset.streams:
shutil.rmtree(stream.local, ignore_errors=True)
shutil.rmtree(tmp_download_dir, ignore_errors=True)

if destroy_dist:
dist.destroy_process_group()


if __name__ == '__main__':
args, kwargs = parse_args()
Expand Down
1 change: 0 additions & 1 deletion regression/iterate_data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ command: |-
composer -n 8 regression/iterate_data.py --batch_size 1000
composer -n 8 regression/iterate_data.py --shuffle --shuffle_algo py1b --shuffle_seed 12 --shuffle_block_size 10000
composer -n 1 regression/synthetic_dataset.py --delete

image: mosaicml/composer:0.15.0
scheduling:
resumable: true
Expand Down
73 changes: 67 additions & 6 deletions regression/synthetic_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

import os
import shutil
import tempfile
import urllib.parse
from argparse import ArgumentParser, Namespace
from typing import Union

import numpy as np
import utils

from streaming import MDSWriter

Expand All @@ -32,6 +33,7 @@ def parse_args() -> Namespace:
Namespace: Command-line arguments.
"""
args = ArgumentParser()
args.add_argument('--cloud', type=str)
args.add_argument('--create', default=False, action='store_true')
args.add_argument('--delete', default=False, action='store_true')
args.add_argument(
Expand Down Expand Up @@ -98,19 +100,71 @@ def get_dataset(num_samples: int) -> list[dict[str, Union[int, str]]]:
return samples


def delete_gcs(remote_dir: str) -> None:
"""Delete a remote directory from gcs.

Args:
remote_dir (str): Location of the remote directory.
"""
from google.cloud.storage import Bucket, Client

service_account_path = os.environ['GOOGLE_APPLICATION_CREDENTIALS']
gcs_client = Client.from_service_account_json(service_account_path)
obj = urllib.parse.urlparse(remote_dir)

bucket = Bucket(gcs_client, obj.netloc)
blobs = bucket.list_blobs(prefix=obj.path.lstrip('/'))

for blob in blobs:
blob.delete()


def delete_s3(remote_dir: str) -> None:
"""Delete a remote directory from s3.

Args:
remote_dir (str): Location of the remote directory.
"""
import boto3

obj = urllib.parse.urlparse(remote_dir)

s3 = boto3.resource('s3')
bucket = s3.Bucket(obj.netloc)
bucket.objects.filter(Prefix=obj.path.lstrip('/')).delete()


def delete_oci(remote_dir: str) -> None:
"""Delete a remote directory from oci.

Args:
remote_dir (str): Location of the remote directory.
"""
import oci

obj = urllib.parse.urlparse(remote_dir)

config = oci.config.from_file()
oci_client = oci.object_storage.ObjectStorageClient(
config=config, retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY)
namespace = oci_client.get_namespace().data
objects = oci_client.list_objects(namespace, obj.netloc, prefix=obj.path.lstrip('/'))

for filenames in objects.data.objects:
oci_client.delete_object(namespace, obj.netloc, filenames.name)


def main(args: Namespace) -> None:
"""Benchmark time taken to generate the epoch for a given dataset.

Args:
args (Namespace): Command-line arguments.
"""
tmp_dir = tempfile.gettempdir()
tmp_remote_dir = os.path.join(tmp_dir, 'regression_remote')

remote_dir = utils.get_remote_dir(args.cloud)
if args.create:
dataset = get_dataset(_NUM_SAMPLES)
with MDSWriter(
out=tmp_remote_dir,
out=remote_dir,
columns=_COLUMNS,
compression=args.compression,
hashes=args.hashes,
Expand All @@ -119,7 +173,14 @@ def main(args: Namespace) -> None:
for sample in dataset:
out.write(sample)
if args.delete:
shutil.rmtree(tmp_remote_dir, ignore_errors=True)
if args.cloud is None:
shutil.rmtree(remote_dir, ignore_errors=True)
elif args.cloud == 'gs':
delete_gcs(remote_dir)
elif args.cloud == 's3':
delete_s3(remote_dir)
elif args.cloud == 'oci':
delete_oci(remote_dir)


if __name__ == '__main__':
Expand Down
36 changes: 36 additions & 0 deletions regression/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Utility and helper functions for regression testing."""

import os
import tempfile
from typing import Optional

_CLOUD_REMOTE_LOCATIONS = {
'gs': 'gs://mosaicml-composer-tests/streaming/regression/',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to fetch the path during runtime ? I mean user would need to provide a remote_dir to run! We would like avoid putting any cloud location publicly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm wouldn't it still go in the yaml then? And can be seen publicly anyways? Also it seems like we expose our cloud location elsewhere

'remote': 's3://mosaicml-internal-dataset-ade20k/mds/2/',

's3': 's3://streaming-upload-test-bucket/streaming/regression/',
'oci': 'oci://streaming-test/regression'
}


def get_remote_dir(storage: Optional[str]) -> str:
"""Get an remote directory.

Args:
storage (str): Type of storage to use.

Returns:
str: Remote directory.
"""
if storage is None:
return get_local_remote_dir()
else:
return _CLOUD_REMOTE_LOCATIONS[storage]


def get_local_remote_dir() -> str:
"""Get a local remote directory."""
tmp_dir = tempfile.gettempdir()
tmp_remote_dir = os.path.join(tmp_dir, 'regression_remote')
return tmp_remote_dir